Add Apache Kafka support

See gh-6961
pull/7234/merge
Gary Russell 9 years ago committed by Stephane Nicoll
parent 9423b9831c
commit c4188c8e4a

@ -472,6 +472,11 @@
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-spring-service-connector</artifactId>
@ -605,6 +610,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-junit-runners</artifactId>

@ -0,0 +1,72 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
/**
* Default configurer for Kafka listener container factories.
*
* @author Gary Russell
* @since 1.5
*
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
*/
void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param listenerContainerFactory the {@link SimpleKafkaListenerContainerFactory} instance to
* configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties();
if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode());
}
if (container.getAckCount() != null) {
containerProperties.setAckCount(container.getAckCount());
}
if (container.getAckTime() != null) {
containerProperties.setAckTime(container.getAckTime());
}
if (container.getPollTimeout() != null) {
containerProperties.setPollTimeout(container.getPollTimeout());
}
if (container.getConcurrency() != null) {
listenerContainerFactory.setConcurrency(container.getConcurrency());
}
}
}

@ -0,0 +1,71 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Adds {@link EnableKafka} if present on the classpath.
*
* @author Gary Russell
* @since 1.5
*
*/
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
return configurer;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
}
}

@ -0,0 +1,85 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* Auto-configuration for Spring for Apache Kafka.
*
* @author Gary Russell
* @since 1.5
*
*/
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<Object, Object>();
}
@Configuration
protected static class ConnectionConfig {
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
return new DefaultKafkaConsumerFactory<Object, Object>(properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
return new DefaultKafkaProducerFactory<Object, Object>(properties.buildProducerProperties());
}
}
}

@ -0,0 +1,716 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
/**
* Spring for Apache Kafka Auto-configuration properties.
*
* Users should refer to kafka documentation for complete descriptions of these
* properties.
*
* @author Gary Russell
* @since 1.5
*/
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
private final Consumer consumer = new Consumer();
private final Producer producer = new Producer();
private final Listener listener = new Listener();
private final Template template = new Template();
private final Ssl ssl = new Ssl();
// Apache Kafka Common Properties
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList("localhost:9092"));
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
public Consumer getConsumer() {
return this.consumer;
}
public Producer getProducer() {
return this.producer;
}
public Listener getListener() {
return this.listener;
}
public Ssl getSsl() {
return this.ssl;
}
public Template getTemplate() {
return this.template;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
return properties;
}
/**
* Use this method to create an initial map of consumer properties from the
* boot properties. This allows you to add additional properties, if necessary,
* and override the default kafkaConsumerFactory bean.
* @return the properties.
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.consumer.buildProperties());
return props;
}
/**
* Use this method to create an initial map of producer properties from the
* boot properties. This allows you to add additional properties, if necessary,
* and override the default kafkaProducerFactory bean.
* @return the properties.
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.producer.buildProperties());
return props;
}
public static String resourceToPath(Resource resource) {
try {
return resource.getFile().getAbsolutePath();
}
catch (IOException e) {
throw new IllegalStateException("Resource must be on a file system", e);
}
}
public static class Consumer {
private final Ssl ssl = new Ssl();
/**
* Frequency in milliseconds that the consumer offsets are auto-committed to
* Kafka if 'enable.auto.commit' true.
*/
private Long autoCommitIntervalMs;
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers;
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
/**
* If true the consumer's offset will be periodically committed in the background.
*/
private Boolean enableAutoCommit;
/**
* Maximum amount of time the server will block before answering the fetch
* request if there isn't sufficient data to immediately satisfy the requirement
* given by fetch.min.bytes.
*/
private Integer fetchMaxWaitMs;
/**
* Minimum amount of data the server should return for a fetch request.
*/
private Integer fetchMinBytes;
/**
* Unique string that identifies the consumer group this consumer belongs to.
*/
private String groupId;
/**
* Expected time between heartbeats to the consumer coordinator.
*/
private Integer heartbeatIntervalMs;
/**
* Deserializer class for key that implements the
* org.apache.kafka.common.serialization.Deserializer interface.
*/
private Class<?> keyDeserializer = StringDeserializer.class;
/**
* Deserializer class for value that implements the
* org.apache.kafka.common.serialization.Deserializer interface.
*/
private Class<?> valueDeserializer = StringDeserializer.class;
public Ssl getSsl() {
return this.ssl;
}
public Long getAutoCommitIntervalMs() {
return this.autoCommitIntervalMs;
}
public void setAutoCommitIntervalMs(Long autoCommitIntervalMs) {
this.autoCommitIntervalMs = autoCommitIntervalMs;
}
public String getAutoOffsetReset() {
return this.autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Boolean getEnableAutoCommit() {
return this.enableAutoCommit;
}
public void setEnableAutoCommit(Boolean enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}
public Integer getFetchMaxWaitMs() {
return this.fetchMaxWaitMs;
}
public void setFetchMaxWaitMs(Integer fetchMaxWaitMs) {
this.fetchMaxWaitMs = fetchMaxWaitMs;
}
public Integer getFetchMinBytes() {
return this.fetchMinBytes;
}
public void setFetchMinBytes(Integer fetchMinBytes) {
this.fetchMinBytes = fetchMinBytes;
}
public String getGroupId() {
return this.groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public Integer getHeartbeatIntervalMs() {
return this.heartbeatIntervalMs;
}
public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
}
public Class<?> getKeyDeserializer() {
return this.keyDeserializer;
}
public void setKeyDeserializer(Class<?> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public Class<?> getValueDeserializer() {
return this.valueDeserializer;
}
public void setValueDeserializer(Class<?> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitIntervalMs != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitIntervalMs);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
}
if (this.bootstrapServers != null) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.enableAutoCommit != null) {
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
}
if (this.fetchMaxWaitMs != null) {
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWaitMs);
}
if (this.fetchMinBytes != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinBytes);
}
if (this.groupId != null) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
}
if (this.heartbeatIntervalMs != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatIntervalMs);
}
if (this.keyDeserializer != null) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
}
return properties;
}
}
public static class Producer {
private final Ssl ssl = new Ssl();
/**
* Number of acknowledgments the producer requires the leader to have
* received before considering a request complete.
*/
private String acks;
/**
* Number of records to batch before sending.
*/
private Integer batchSize;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers;
/**
* Total bytes of memory the producer can use to buffer records waiting to be
* sent to the server.
*/
private Long bufferMemory;
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
/**
* Compression type for all data generated by the producer.
*/
private String compressionType;
/**
* Serializer class for key that implements the
* org.apache.kafka.common.serialization.Serializer interface.
*/
private Class<?> keySerializer = StringSerializer.class;
/**
* When greater than zero, enables retrying of failed sends.
*/
private Integer retries;
/**
* Serializer class for value that implements the
* org.apache.kafka.common.serialization.Serializer interface.
*/
private Class<?> valueSerializer = StringSerializer.class;
public Ssl getSsl() {
return this.ssl;
}
public String getAcks() {
return this.acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
public Integer getBatchSize() {
return this.batchSize;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public Long getBufferMemory() {
return this.bufferMemory;
}
public void setBufferMemory(Long bufferMemory) {
this.bufferMemory = bufferMemory;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getCompressionType() {
return this.compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public Class<?> getKeySerializer() {
return this.keySerializer;
}
public void setKeySerializer(Class<?> keySerializer) {
this.keySerializer = keySerializer;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Class<?> getValueSerializer() {
return this.valueSerializer;
}
public void setValueSerializer(Class<?> valueSerializer) {
this.valueSerializer = valueSerializer;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.acks != null) {
properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
}
if (this.batchSize != null) {
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
}
if (this.bootstrapServers != null) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.bufferMemory != null) {
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
}
if (this.clientId != null) {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.compressionType != null) {
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType);
}
if (this.keySerializer != null) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer);
}
if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer);
}
return properties;
}
}
public static class Template {
/**
* Default topic to which messages will be sent.
*/
private String defaultTopic;
public String getDefaultTopic() {
return this.defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
}
public static class Listener {
/**
* Listener AckMode; see the spring-kafka documentation.
*/
private AckMode ackMode;
/**
* Number of threads to run in the listener containers.
*/
private Integer concurrency;
/**
* Timeout in milliseconds to use when polling the consumer.
*/
private Long pollTimeout;
/**
* Number of records between offset commits when ackMode is COUNT or COUNT_TIME.
*/
private Integer ackCount;
/**
* Time in milliseconds between offset commits when ackMode is TIME or COUNT_TIME.
*/
private Long ackTime;
public AckMode getAckMode() {
return this.ackMode;
}
public void setAckMode(AckMode ackMode) {
this.ackMode = ackMode;
}
public Integer getConcurrency() {
return this.concurrency;
}
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}
public Long getPollTimeout() {
return this.pollTimeout;
}
public void setPollTimeout(Long pollTimeout) {
this.pollTimeout = pollTimeout;
}
public Integer getAckCount() {
return this.ackCount;
}
public void setAckCount(Integer ackCount) {
this.ackCount = ackCount;
}
public Long getAckTime() {
return this.ackTime;
}
public void setAckTime(Long ackTime) {
this.ackTime = ackTime;
}
}
public static class Ssl {
/**
* Password of the private key in the key store file.
*/
private String keyPassword;
/**
* Location of the key store file.
*/
private Resource keystoreLocation;
/**
* Store password for the key store file.
*/
private String keystorePassword;
/**
* Location of the trust store file.
*/
private Resource truststoreLocation;
/**
* Store password for the trust store file.
*/
private String truststorePassword;
public String getKeyPassword() {
return this.keyPassword;
}
public void setKeyPassword(String keyPassword) {
this.keyPassword = keyPassword;
}
public Resource getKeystoreLocation() {
return this.keystoreLocation;
}
public void setKeystoreLocation(Resource keystoreLocation) {
this.keystoreLocation = keystoreLocation;
}
public String getKeystorePassword() {
return this.keystorePassword;
}
public void setKeystorePassword(String keystorePassword) {
this.keystorePassword = keystorePassword;
}
public Resource getTruststoreLocation() {
return this.truststoreLocation;
}
public void setTruststoreLocation(Resource truststoreLocation) {
this.truststoreLocation = truststoreLocation;
}
public String getTruststorePassword() {
return this.truststorePassword;
}
public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}
}
}

@ -0,0 +1,21 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Auto-configuration for Apache Kafka.
*
*/
package org.springframework.boot.autoconfigure.kafka;

@ -62,6 +62,7 @@ org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration,\
org.springframework.boot.autoconfigure.groovy.template.GroovyTemplateAutoConfiguration,\
org.springframework.boot.autoconfigure.jersey.JerseyAutoConfiguration,\
org.springframework.boot.autoconfigure.jooq.JooqAutoConfiguration,\
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,\
org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderValidatorAutoConfiguration,\

@ -0,0 +1,114 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.handler.annotation.Header;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for Kafka Auto-configuration.
*
* @author Gary Russell
* @since 1.5
*
*/
public class KafkaAutoConfigurationIntegrationTests {
private static final String TEST_TOPIC = "testTopic";
@ClassRule
public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, TEST_TOPIC);
private AnnotationConfigApplicationContext context;
@After
public void close() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void testEndToEnd() throws Exception {
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup",
"spring.kafka.consumer.auto-offset-reset=earliest");
@SuppressWarnings("unchecked")
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
template.send(TEST_TOPIC, "foo", "bar");
Listener listener = this.context.getBean(Listener.class);
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.key).isEqualTo("foo");
assertThat(listener.received).isEqualTo("bar");
}
private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment);
}
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs,
String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(configs);
applicationContext.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.refresh();
return applicationContext;
}
public static class KafkaConfig {
@Bean
public Listener listener() {
return new Listener();
}
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
private volatile String received;
private volatile String key;
@KafkaListener(topics = TEST_TOPIC)
public void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
this.received = foo;
this.key = key;
this.latch.countDown();
}
}
}

@ -0,0 +1,178 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @since 1.5
*/
public class KafkaPropertiesTests {
private AnnotationConfigApplicationContext context;
@Before
public void load() {
this.context = new AnnotationConfigApplicationContext();
this.context.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(context,
"spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.clientId=cid",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.auto-commit-interval-ms=123",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait-ms=456",
"spring.kafka.consumer.fetch-min-bytes=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval-ms=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer",
"spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000"
);
this.context.refresh();
}
@Test
public void testConsumerProps() {
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(consumerFactory)
.getPropertyValue("configs");
// common
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Arrays.asList(new String[] { "foo:1234" }));
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLoc");
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc");
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
// consumer
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE);
assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L);
assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
}
@Test
public void testProducerProps() {
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(producerFactory)
.getPropertyValue("configs");
// common
assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// producer
assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Arrays.asList(new String[] { "bar:1234" })); // override
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP");
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class);
}
@Test
public void testInjected() {
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context
.getBean(KafkaListenerContainerFactory.class);
assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory"))
.isEqualTo(producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
DirectFieldAccessor factoryAccessor = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(factoryAccessor.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(factoryAccessor.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(factoryAccessor.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
}
}

@ -156,6 +156,7 @@
<spring-hateoas.version>0.21.0.RELEASE</spring-hateoas.version>
<spring-integration.version>4.3.5.RELEASE</spring-integration.version>
<spring-integration-java-dsl.version>1.2.0.RELEASE</spring-integration-java-dsl.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
<spring-loaded.version>1.2.6.RELEASE</spring-loaded.version>
<spring-mobile.version>1.1.5.RELEASE</spring-mobile.version>
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
@ -2123,6 +2124,11 @@
<artifactId>spring-hateoas</artifactId>
<version>${spring-hateoas.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-bom</artifactId>

@ -856,6 +856,55 @@ content into your application; rather pick only the properties that you need.
spring.jms.template.receive-timeout= # Timeout to use for receive calls in milliseconds.
spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
spring.kafka.bootstrap-servers=localhost:9092 # Comma-delimited list of host:port pairs.
spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
# Consumer-specific properties:
spring.kafka.consumer.auto-commit-interval-ms= # Frequency in milliseconds that the consumer offsets are auto-committed.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs.
spring.kafka.consumer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # If true the consumer's offset will be periodically committed in the background.
spring.kafka.consumer.fetch-max-wait-ms= # Maximum amount of time the server will block before answering the fetch request.
spring.kafka.consumer.fetch-min-bytes= # Minimum amount of data the server should return for a fetch request.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.heartbeat-interval-ms= # Expected time between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer=StringDeserializer # Deserializer class for keys.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.consumer.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.consumer.value-deserializer=StringDeserializer # Deserializer class for values.
# Listener properties - Refer to the Spring for Apache Kafka documentation
spring.kafka.listener.ack-mode=BATCH # AckMode - see the spring-kafka documentation.
spring.kafka.listener.ack-count= # Number of records between offset commits when ack-mode is COUNT or COUNT_TIME.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ack-mode is TIME or COUNT_TIME.
spring.kafka.listener.concurrency=1 # Number of threads to run in the listener container(s).
spring.kafka.listener.pollTimeout=1000 # Timeout in milliseconds to use when polling the consumer.
# Producer-specific properties:
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received.
spring.kafka.producer.batch-size= # Number of records to batch before sending.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs.
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer=StringSerializer # Serializer class for keys.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.value-serializer=StringSerializer # Serializer class for values.
# template properties
spring.kafka.template.default-topic= # Default topic to which messages are sent
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.

@ -4082,7 +4082,7 @@ receive messages asynchronously. Spring AMQP provides a similar feature set for
'`Advanced Message Queuing Protocol`' and Spring Boot also provides auto-configuration
options for `RabbitTemplate` and RabbitMQ. There is also support for STOMP messaging
natively in Spring WebSocket and Spring Boot has support for that through starters and a
small amount of auto-configuration.
small amount of auto-configuration. Spring Boot also has support for Apache Kafka.
@ -4452,7 +4452,84 @@ throw an `AmqpRejectAndDontRequeueException` to signal the message should be rej
This is the mechanism used when retries are enabled and the maximum delivery attempts are
reached.
[[boot-features-kafka]]
=== Apache Kafka Support
http://kafka.apache.org/[Apache Kafa] is supported by providing auto-configuration of the `spring-kafka` project.
Kafka configuration is controlled by external configuration properties in `spring.kafka.*`. For example, you might
declare the following section in `application.properties`:
[source,properties,indent=0]
----
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
----
See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`]
for more of the supported options.
=== Sending a Message
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own beans:
[source,java,indent=0]
----
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
----
=== Receiving a Message
[source,java,indent=0]
----
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
----
[[kafka-extra-props]]
=== Additional Kafka Properties
The properties supported by auto configuration are shown in <<common-application-properties>>.
Note that these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties, refer
to the Apache Kafka documentation for details.
The first few of these properties apply to both producers and consumers, but can be specified at the producer or
consumer level if you wish to use different values for each.
Apache Kafka designates properties with an importance: HIGH, MEDIUM and LOW.
Spring Boot auto configuration supports all HIGH importance properties, some selected MEDIUM and LOW,
and any that do not have a default value.
Only a subset of the properties supported by Kafka are available via the `KafkaProperties` class.
If you wish to configure the producer or consumer with additional properties, you can override the producer factory
and/or consumer factory bean, adding additional properties, for example:
[source,java,indent=0]
----
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
----
[[boot-features-restclient]]
== Calling REST services
@ -4514,8 +4591,6 @@ Lastly, the most extreme (and rarely used) option is to create your own
`RestTemplateBuilder` bean. This will switch off the auto-configuration of a
`RestTemplateBuilder` and will prevent any `RestTemplateCustomizer` beans from being used.
[[boot-features-email]]
== Sending email
The Spring Framework provides an easy abstraction for sending email using the

Loading…
Cancel
Save