Merge branch '1.5.x'

pull/6782/merge
Stephane Nicoll 8 years ago
commit e3688455fa

@ -486,6 +486,11 @@
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-spring-service-connector</artifactId>
@ -619,6 +624,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-junit-runners</artifactId>

@ -0,0 +1,71 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
/**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
*
* @author Gary Russell
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
*/
void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
* instance to configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties();
if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode());
}
if (container.getAckCount() != null) {
containerProperties.setAckCount(container.getAckCount());
}
if (container.getAckTime() != null) {
containerProperties.setAckTime(container.getAckTime());
}
if (container.getPollTimeout() != null) {
containerProperties.setPollTimeout(container.getPollTimeout());
}
if (container.getConcurrency() != null) {
listenerContainerFactory.setConcurrency(container.getConcurrency());
}
}
}

@ -0,0 +1,70 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Configuration for Kafka annotation-driven support.
*
* @author Gary Russell
* @since 1.5.0
*/
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
return configurer;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
}
}

@ -0,0 +1,84 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
*
* @author Gary Russell
* @since 1.5.0
*/
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate =
new KafkaTemplate<Object, Object>(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<Object, Object>();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<Object, Object>(
this.properties.buildProducerProperties());
}
}

@ -0,0 +1,723 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
/**
* Configuration properties for Spring for Apache Kafka.
* <p/>
* Users should refer to kafka documentation for complete descriptions of these
* properties.
*
* @author Gary Russell
* @author Stephane Nicoll
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
private final Consumer consumer = new Consumer();
private final Producer producer = new Producer();
private final Listener listener = new Listener();
private final Template template = new Template();
private final Ssl ssl = new Ssl();
// Apache Kafka Common Properties
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList(
"localhost:9092"));
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
public Consumer getConsumer() {
return this.consumer;
}
public Producer getProducer() {
return this.producer;
}
public Listener getListener() {
return this.listener;
}
public Ssl getSsl() {
return this.ssl;
}
public Template getTemplate() {
return this.template;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
return properties;
}
/**
* Create an initial map of consumer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the
* default kafkaConsumerFactory bean.
* @return the consumer properties initialized with the customizations defined on
* this instance
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.consumer.buildProperties());
return props;
}
/**
* Create an initial map of producer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the
* default kafkaProducerFactory bean.
* @return the producer properties initialized with the customizations defined on
* this instance
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.producer.buildProperties());
return props;
}
private static String resourceToPath(Resource resource) {
try {
return resource.getFile().getAbsolutePath();
}
catch (IOException ex) {
throw new IllegalStateException(String.format(
"Resource '%s' must be on a file system", resource), ex);
}
}
public static class Consumer {
private final Ssl ssl = new Ssl();
/**
* Frequency in milliseconds that the consumer offsets are auto-committed to
* Kafka if 'enable.auto.commit' true.
*/
private Long autoCommitInterval;
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers;
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
/**
* If true the consumer's offset will be periodically committed in the background.
*/
private Boolean enableAutoCommit;
/**
* Maximum amount of time in milliseconds the server will block before answering
* the fetch request if there isn't sufficient data to immediately satisfy the
* requirement given by "fetch.min.bytes".
*/
private Integer fetchMaxWait;
/**
* Minimum amount of data the server should return for a fetch request in bytes.
*/
private Integer fetchMinSize;
/**
* Unique string that identifies the consumer group this consumer belongs to.
*/
private String groupId;
/**
* Expected time in milliseconds between heartbeats to the consumer coordinator.
*/
private Integer heartbeatInterval;
/**
* Deserializer class for keys.
*/
private Class<?> keyDeserializer = StringDeserializer.class;
/**
* Deserializer class for values.
*/
private Class<?> valueDeserializer = StringDeserializer.class;
public Ssl getSsl() {
return this.ssl;
}
public Long getAutoCommitInterval() {
return this.autoCommitInterval;
}
public void setAutoCommitInterval(Long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}
public String getAutoOffsetReset() {
return this.autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Boolean getEnableAutoCommit() {
return this.enableAutoCommit;
}
public void setEnableAutoCommit(Boolean enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}
public Integer getFetchMaxWait() {
return this.fetchMaxWait;
}
public void setFetchMaxWait(Integer fetchMaxWait) {
this.fetchMaxWait = fetchMaxWait;
}
public Integer getFetchMinSize() {
return this.fetchMinSize;
}
public void setFetchMinSize(Integer fetchMinSize) {
this.fetchMinSize = fetchMinSize;
}
public String getGroupId() {
return this.groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public Integer getHeartbeatInterval() {
return this.heartbeatInterval;
}
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Class<?> getKeyDeserializer() {
return this.keyDeserializer;
}
public void setKeyDeserializer(Class<?> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public Class<?> getValueDeserializer() {
return this.valueDeserializer;
}
public void setValueDeserializer(Class<?> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
}
if (this.bootstrapServers != null) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.enableAutoCommit != null) {
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
}
if (this.fetchMaxWait != null) {
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait);
}
if (this.fetchMinSize != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
}
if (this.groupId != null) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
}
if (this.heartbeatInterval != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval);
}
if (this.keyDeserializer != null) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer);
}
return properties;
}
}
public static class Producer {
private final Ssl ssl = new Ssl();
/**
* Number of acknowledgments the producer requires the leader to have received
* before considering a request complete.
*/
private String acks;
/**
* Number of records to batch before sending.
*/
private Integer batchSize;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers;
/**
* Total bytes of memory the producer can use to buffer records waiting to be
* sent to the server.
*/
private Long bufferMemory;
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
/**
* Compression type for all data generated by the producer.
*/
private String compressionType;
/**
* Serializer class for keys.
*/
private Class<?> keySerializer = StringSerializer.class;
/**
* Serializer class for values.
*/
private Class<?> valueSerializer = StringSerializer.class;
/**
* When greater than zero, enables retrying of failed sends.
*/
private Integer retries;
public Ssl getSsl() {
return this.ssl;
}
public String getAcks() {
return this.acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
public Integer getBatchSize() {
return this.batchSize;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public Long getBufferMemory() {
return this.bufferMemory;
}
public void setBufferMemory(Long bufferMemory) {
this.bufferMemory = bufferMemory;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getCompressionType() {
return this.compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public Class<?> getKeySerializer() {
return this.keySerializer;
}
public void setKeySerializer(Class<?> keySerializer) {
this.keySerializer = keySerializer;
}
public Class<?> getValueSerializer() {
return this.valueSerializer;
}
public void setValueSerializer(Class<?> valueSerializer) {
this.valueSerializer = valueSerializer;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.acks != null) {
properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
}
if (this.batchSize != null) {
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
}
if (this.bootstrapServers != null) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.bufferMemory != null) {
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
}
if (this.clientId != null) {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.compressionType != null) {
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType);
}
if (this.keySerializer != null) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer);
}
if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer);
}
return properties;
}
}
public static class Template {
/**
* Default topic to which messages will be sent.
*/
private String defaultTopic;
public String getDefaultTopic() {
return this.defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
}
public static class Listener {
/**
* Listener AckMode; see the spring-kafka documentation.
*/
private AckMode ackMode;
/**
* Number of threads to run in the listener containers.
*/
private Integer concurrency;
/**
* Timeout in milliseconds to use when polling the consumer.
*/
private Long pollTimeout;
/**
* Number of records between offset commits when ackMode is "COUNT" or
* "COUNT_TIME".
*/
private Integer ackCount;
/**
* Time in milliseconds between offset commits when ackMode is "TIME" or
* "COUNT_TIME".
*/
private Long ackTime;
public AckMode getAckMode() {
return this.ackMode;
}
public void setAckMode(AckMode ackMode) {
this.ackMode = ackMode;
}
public Integer getConcurrency() {
return this.concurrency;
}
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}
public Long getPollTimeout() {
return this.pollTimeout;
}
public void setPollTimeout(Long pollTimeout) {
this.pollTimeout = pollTimeout;
}
public Integer getAckCount() {
return this.ackCount;
}
public void setAckCount(Integer ackCount) {
this.ackCount = ackCount;
}
public Long getAckTime() {
return this.ackTime;
}
public void setAckTime(Long ackTime) {
this.ackTime = ackTime;
}
}
public static class Ssl {
/**
* Password of the private key in the key store file.
*/
private String keyPassword;
/**
* Location of the key store file.
*/
private Resource keystoreLocation;
/**
* Store password for the key store file.
*/
private String keystorePassword;
/**
* Location of the trust store file.
*/
private Resource truststoreLocation;
/**
* Store password for the trust store file.
*/
private String truststorePassword;
public String getKeyPassword() {
return this.keyPassword;
}
public void setKeyPassword(String keyPassword) {
this.keyPassword = keyPassword;
}
public Resource getKeystoreLocation() {
return this.keystoreLocation;
}
public void setKeystoreLocation(Resource keystoreLocation) {
this.keystoreLocation = keystoreLocation;
}
public String getKeystorePassword() {
return this.keystorePassword;
}
public void setKeystorePassword(String keystorePassword) {
this.keystorePassword = keystorePassword;
}
public Resource getTruststoreLocation() {
return this.truststoreLocation;
}
public void setTruststoreLocation(Resource truststoreLocation) {
this.truststoreLocation = truststoreLocation;
}
public String getTruststorePassword() {
return this.truststorePassword;
}
public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Auto-configuration for Apache Kafka.
*/
package org.springframework.boot.autoconfigure.kafka;

@ -409,6 +409,76 @@
}
]
},
{
"name": "spring.kafka.consumer.auto-offset-reset",
"values": [
{
"value": "earliest",
"description": "Automatically reset the offset to the earliest offset."
},
{
"value": "latest",
"description": "Automatically reset the offset to the latest offset."
},
{
"value": "none",
"description": "Throw exception to the consumer if no previous offset is found for the consumer's group."
},
{
"value": "exception",
"description": "Throw exception to the consumer."
}
],
"providers": [
{
"name": "any"
}
]
},
{
"name": "spring.kafka.consumer.key-deserializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Deserializer"
}
}
]
},
{
"name": "spring.kafka.consumer.value-deserializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Deserializer"
}
}
]
},
{
"name": "spring.kafka.producer.key-serializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Serializer"
}
}
]
},
{
"name": "spring.kafka.producer.value-serializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Serializer"
}
}
]
},
{
"name": "spring.http.converters.preferred-json-mapper",
"values": [

@ -62,6 +62,7 @@ org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration,\
org.springframework.boot.autoconfigure.groovy.template.GroovyTemplateAutoConfiguration,\
org.springframework.boot.autoconfigure.jersey.JerseyAutoConfiguration,\
org.springframework.boot.autoconfigure.jooq.JooqAutoConfiguration,\
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,\
org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration,\
org.springframework.boot.autoconfigure.mail.MailSenderValidatorAutoConfiguration,\

@ -0,0 +1,115 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.handler.annotation.Header;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
*/
public class KafkaAutoConfigurationIntegrationTests {
private static final String TEST_TOPIC = "testTopic";
@ClassRule
public static final KafkaEmbedded kafkaEmbedded =
new KafkaEmbedded(1, true, TEST_TOPIC);
private AnnotationConfigApplicationContext context;
@After
public void close() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void testEndToEnd() throws Exception {
load(KafkaConfig.class,
"spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup",
"spring.kafka.consumer.auto-offset-reset=earliest");
@SuppressWarnings("unchecked")
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
template.send(TEST_TOPIC, "foo", "bar");
Listener listener = this.context.getBean(Listener.class);
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.key).isEqualTo("foo");
assertThat(listener.received).isEqualTo("bar");
}
private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment);
}
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs,
String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(configs);
applicationContext.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.refresh();
return applicationContext;
}
public static class KafkaConfig {
@Bean
public Listener listener() {
return new Listener();
}
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
private volatile String received;
private volatile String key;
@KafkaListener(topics = TEST_TOPIC)
public void listen(String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
this.received = foo;
this.key = key;
this.latch.countDown();
}
}
}

@ -0,0 +1,184 @@
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.After;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
* @author Stephane Nicoll
* @since 1.5
*/
public class KafkaAutoConfigurationTests {
private AnnotationConfigApplicationContext context;
@After
public void closeContext() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(consumerFactory)
.getPropertyValue("configs");
// common
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("foo:1234"));
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLoc");
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc");
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
// consumer
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE);
assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L);
assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
}
@Test
public void producerProperties() {
load("spring.kafka.clientId=cid",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(producerFactory)
.getPropertyValue("configs");
// common
assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// producer
assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("bar:1234")); // override
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP");
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class);
}
@Test
public void listenerProperties() {
load("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context
.getBean(KafkaListenerContainerFactory.class);
assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory"))
.isEqualTo(producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
}
private void load(String... environment) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(ctx, environment);
ctx.refresh();
this.context = ctx;
}
}

@ -152,6 +152,7 @@
<spring-data-releasetrain.version>Ingalls-BUILD-SNAPSHOT</spring-data-releasetrain.version>
<spring-hateoas.version>0.21.0.RELEASE</spring-hateoas.version>
<spring-integration.version>5.0.0.BUILD-SNAPSHOT</spring-integration.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
<spring-loaded.version>1.2.6.RELEASE</spring-loaded.version>
<spring-mobile.version>1.1.5.RELEASE</spring-mobile.version>
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
@ -2033,6 +2034,11 @@
<artifactId>spring-hateoas</artifactId>
<version>${spring-hateoas.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-bom</artifactId>

@ -855,6 +855,41 @@ content into your application; rather pick only the properties that you need.
spring.jms.template.receive-timeout= # Timeout to use for receive calls in milliseconds.
spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.consumer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # If true the consumer's offset will be periodically committed in the background.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request in bytes.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Number of records to batch before sending.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.truststore-location= # Location of the trust store file.
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.template.default-topic= # Default topic to which messages will be sent.
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.

@ -4052,7 +4052,7 @@ receive messages asynchronously. Spring AMQP provides a similar feature set for
'`Advanced Message Queuing Protocol`' and Spring Boot also provides auto-configuration
options for `RabbitTemplate` and RabbitMQ. There is also support for STOMP messaging
natively in Spring WebSocket and Spring Boot has support for that through starters and a
small amount of auto-configuration.
small amount of auto-configuration. Spring Boot also has support for Apache Kafka.
@ -4424,6 +4424,108 @@ reached.
[[boot-features-kafka]]
=== Apache Kafka Support
http://kafka.apache.org/[Apache Kafa] is supported by providing auto-configuration of the
`spring-kafka` project.
Kafka configuration is controlled by external configuration properties in
`spring.kafka.*`. For example, you might declare the following section in
`application.properties`:
[source,properties,indent=0]
----
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
----
See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`]
for more of the supported options.
=== Sending a Message
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own
beans:
[source,java,indent=0]
----
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
----
=== Receiving a Message
When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory`
has been defined, a default one is configured automatically with keys defined in
`spring.kafka.listener.*`.
The following component creates a listener endpoint on the `someTopic` topic:
[source,java,indent=0]
----
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
----
[[kafka-extra-props]]
=== Additional Kafka Properties
The properties supported by auto configuration are shown in
<<common-application-properties>>. Note that these properties (hyphenated or camelCase)
map directly to the Apache Kafka dotted properties for the most part, refer to the Apache
Kafka documentation for details.
The first few of these properties apply to both producers and consumers, but can be
specified at the producer or consumer level if you wish to use different values for each.
Apache Kafka designates properties with an importance: HIGH, MEDIUM and LOW. Spring Boot
auto configuration supports all HIGH importance properties, some selected MEDIUM and LOW,
and any that do not have a default value.
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
class. If you wish to configure the producer or consumer with additional properties, you
can override the producer factory and/or consumer factory bean, adding additional
properties, for example:
[source,java,indent=0]
----
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
----
[[boot-features-restclient]]
== Calling REST services
If you need to call remote REST services from your application, you can use Spring
@ -4484,8 +4586,6 @@ Lastly, the most extreme (and rarely used) option is to create your own
`RestTemplateBuilder` bean. This will switch off the auto-configuration of a
`RestTemplateBuilder` and will prevent any `RestTemplateCustomizer` beans from being used.
[[boot-features-email]]
== Sending email
The Spring Framework provides an easy abstraction for sending email using the

Loading…
Cancel
Save