Merge branch '1.5.x'

pull/6782/merge
Andy Wilkinson 8 years ago
commit 0b94d71671

@ -46,11 +46,13 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* instance to configure * instance to configure
* @param consumerFactory the {@link ConsumerFactory} to use * @param consumerFactory the {@link ConsumerFactory} to use
*/ */
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) { ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory); listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener(); Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory.getContainerProperties(); ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
if (container.getAckMode() != null) { if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode()); containerProperties.setAckMode(container.getAckMode());
} }

@ -54,13 +54,11 @@ class KafkaAnnotationDrivenConfiguration {
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory); configurer.configure(factory, kafkaConsumerFactory);
return factory; return factory;
} }
@EnableKafka @EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration { protected static class EnableKafkaConfiguration {

@ -54,8 +54,8 @@ public class KafkaAutoConfiguration {
public KafkaTemplate<?, ?> kafkaTemplate( public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory, ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) { ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
new KafkaTemplate<Object, Object>(kafkaProducerFactory); kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate; return kafkaTemplate;

@ -36,8 +36,8 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
/** /**
* Configuration properties for Spring for Apache Kafka. * Configuration properties for Spring for Apache Kafka.
* <p/> * <p>
* Users should refer to kafka documentation for complete descriptions of these * Users should refer to Kafka documentation for complete descriptions of these
* properties. * properties.
* *
* @author Gary Russell * @author Gary Russell
@ -63,8 +63,8 @@ public class KafkaProperties {
* Comma-delimited list of host:port pairs to use for establishing the initial * Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster. * connection to the Kafka cluster.
*/ */
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList( private List<String> bootstrapServers = new ArrayList<String>(
"localhost:9092")); Collections.singletonList("localhost:9092"));
/** /**
* Id to pass to the server when making requests; used for server-side logging. * Id to pass to the server when making requests; used for server-side logging.
@ -110,7 +110,8 @@ public class KafkaProperties {
private Map<String, Object> buildCommonProperties() { private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>(); Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) { if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
this.bootstrapServers);
} }
if (this.clientId != null) { if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
@ -139,10 +140,11 @@ public class KafkaProperties {
/** /**
* Create an initial map of consumer properties from the state of this instance. * Create an initial map of consumer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the * <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaConsumerFactory bean. * default kafkaConsumerFactory bean.
* @return the consumer properties initialized with the customizations defined on * @return the consumer properties initialized with the customizations defined on this
* this instance * instance
*/ */
public Map<String, Object> buildConsumerProperties() { public Map<String, Object> buildConsumerProperties() {
Map<String, Object> props = buildCommonProperties(); Map<String, Object> props = buildCommonProperties();
@ -152,10 +154,11 @@ public class KafkaProperties {
/** /**
* Create an initial map of producer properties from the state of this instance. * Create an initial map of producer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the * <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaProducerFactory bean. * default kafkaProducerFactory bean.
* @return the producer properties initialized with the customizations defined on * @return the producer properties initialized with the customizations defined on this
* this instance * instance
*/ */
public Map<String, Object> buildProducerProperties() { public Map<String, Object> buildProducerProperties() {
Map<String, Object> props = buildCommonProperties(); Map<String, Object> props = buildCommonProperties();
@ -168,8 +171,9 @@ public class KafkaProperties {
return resource.getFile().getAbsolutePath(); return resource.getFile().getAbsolutePath();
} }
catch (IOException ex) { catch (IOException ex) {
throw new IllegalStateException(String.format( throw new IllegalStateException(
"Resource '%s' must be on a file system", resource), ex); String.format("Resource '%s' must be on a file system", resource),
ex);
} }
} }
@ -178,8 +182,8 @@ public class KafkaProperties {
private final Ssl ssl = new Ssl(); private final Ssl ssl = new Ssl();
/** /**
* Frequency in milliseconds that the consumer offsets are auto-committed to * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka
* Kafka if 'enable.auto.commit' true. * if 'enable.auto.commit' true.
*/ */
private Long autoCommitInterval; private Long autoCommitInterval;
@ -332,22 +336,27 @@ public class KafkaProperties {
public Map<String, Object> buildProperties() { public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>(); Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) { if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
this.autoCommitInterval);
} }
if (this.autoOffsetReset != null) { if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.autoOffsetReset);
} }
if (this.bootstrapServers != null) { if (this.bootstrapServers != null) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.bootstrapServers);
} }
if (this.clientId != null) { if (this.clientId != null) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
} }
if (this.enableAutoCommit != null) { if (this.enableAutoCommit != null) {
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.enableAutoCommit);
} }
if (this.fetchMaxWait != null) { if (this.fetchMaxWait != null) {
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
this.fetchMaxWait);
} }
if (this.fetchMinSize != null) { if (this.fetchMinSize != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
@ -356,29 +365,36 @@ public class KafkaProperties {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
} }
if (this.heartbeatInterval != null) { if (this.heartbeatInterval != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval); properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
this.heartbeatInterval);
} }
if (this.keyDeserializer != null) { if (this.keyDeserializer != null) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
this.keyDeserializer);
} }
if (this.ssl.getKeyPassword() != null) { if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
} }
if (this.ssl.getKeystoreLocation() != null) { if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation())); properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
} }
if (this.ssl.getKeystorePassword() != null) { if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
} }
if (this.ssl.getTruststoreLocation() != null) { if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation())); resourceToPath(this.ssl.getTruststoreLocation()));
} }
if (this.ssl.getTruststorePassword() != null) { if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
} }
if (this.valueDeserializer != null) { if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
} }
return properties; return properties;
} }
@ -407,8 +423,8 @@ public class KafkaProperties {
private List<String> bootstrapServers; private List<String> bootstrapServers;
/** /**
* Total bytes of memory the producer can use to buffer records waiting to be * Total bytes of memory the producer can use to buffer records waiting to be sent
* sent to the server. * to the server.
*/ */
private Long bufferMemory; private Long bufferMemory;
@ -522,7 +538,8 @@ public class KafkaProperties {
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
} }
if (this.bootstrapServers != null) { if (this.bootstrapServers != null) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.bootstrapServers);
} }
if (this.bufferMemory != null) { if (this.bufferMemory != null) {
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
@ -531,32 +548,39 @@ public class KafkaProperties {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
} }
if (this.compressionType != null) { if (this.compressionType != null) {
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
this.compressionType);
} }
if (this.keySerializer != null) { if (this.keySerializer != null) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
this.keySerializer);
} }
if (this.retries != null) { if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
} }
if (this.ssl.getKeyPassword() != null) { if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
} }
if (this.ssl.getKeystoreLocation() != null) { if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation())); properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
} }
if (this.ssl.getKeystorePassword() != null) { if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
} }
if (this.ssl.getTruststoreLocation() != null) { if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation())); resourceToPath(this.ssl.getTruststoreLocation()));
} }
if (this.ssl.getTruststorePassword() != null) { if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
} }
if (this.valueSerializer != null) { if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer);
} }
return properties; return properties;
} }

@ -44,8 +44,8 @@ public class KafkaAutoConfigurationIntegrationTests {
private static final String TEST_TOPIC = "testTopic"; private static final String TEST_TOPIC = "testTopic";
@ClassRule @ClassRule
public static final KafkaEmbedded kafkaEmbedded = public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true,
new KafkaEmbedded(1, true, TEST_TOPIC); TEST_TOPIC);
private AnnotationConfigApplicationContext context; private AnnotationConfigApplicationContext context;
@ -63,7 +63,8 @@ public class KafkaAutoConfigurationIntegrationTests {
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.consumer.group-id=testGroup",
"spring.kafka.consumer.auto-offset-reset=earliest"); "spring.kafka.consumer.auto-offset-reset=earliest");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class); KafkaTemplate<String, String> template = this.context
.getBean(KafkaTemplate.class);
template.send(TEST_TOPIC, "foo", "bar"); template.send(TEST_TOPIC, "foo", "bar");
Listener listener = this.context.getBean(Listener.class); Listener listener = this.context.getBean(Listener.class);
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();

@ -41,13 +41,11 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
/** /**
* Tests for {@link KafkaAutoConfiguration}. * Tests for {@link KafkaAutoConfiguration}.
* *
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @since 1.5
*/ */
public class KafkaAutoConfigurationTests { public class KafkaAutoConfigurationTests {
@ -78,40 +76,50 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"); "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class); DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
.getBean(DefaultKafkaConsumerFactory.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(consumerFactory) Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(
.getPropertyValue("configs"); consumerFactory).getPropertyValue("configs");
// common // common
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("foo:1234")); .isEqualTo(Collections.singletonList("foo:1234"));
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLoc"); .endsWith(File.separator + "ksLoc");
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p2");
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc"); .endsWith(File.separator + "tsLoc");
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3"); assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p3");
// consumer // consumer
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE); assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123L); .isEqualTo(Boolean.FALSE);
assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest"); assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); .isEqualTo(123L);
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("earliest");
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG))
.isEqualTo(456);
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG))
.isEqualTo(789);
assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234); assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG))
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class); .isEqualTo(234);
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(LongDeserializer.class);
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class); .isEqualTo(IntegerDeserializer.class);
} }
@Test @Test
public void producerProperties() { public void producerProperties() {
load("spring.kafka.clientId=cid", load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20", "spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common "spring.kafka.producer.bootstrap-servers=bar:1234", // test override
// common
"spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
@ -122,10 +130,11 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6", "spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"); "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class); DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
.getBean(DefaultKafkaProducerFactory.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(producerFactory) Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(
.getPropertyValue("configs"); producerFactory).getPropertyValue("configs");
// common // common
assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// producer // producer
@ -133,18 +142,24 @@ public class KafkaAutoConfigurationTests {
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList("bar:1234")); // override .isEqualTo(Collections.singletonList("bar:1234")); // override
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG))
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); .isEqualTo(12345L);
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class); assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))
.isEqualTo("gzip");
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.isEqualTo(LongSerializer.class);
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP"); .endsWith(File.separator + "ksLocP");
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p5");
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP"); .endsWith(File.separator + "tsLocP");
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6"); assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class); assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class);
} }
@Test @Test
@ -156,21 +171,25 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000"); "spring.kafka.listener.poll-timeout=2000");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class); DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
.getBean(DefaultKafkaConsumerFactory.class);
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class); KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context
.getBean(KafkaListenerContainerFactory.class); .getBean(KafkaListenerContainerFactory.class);
assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory")) assertThat(new DirectFieldAccessor(kafkaTemplate)
.isEqualTo(producerFactory); .getPropertyValue("producerFactory")).isEqualTo(producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL); assertThat(dfa.getPropertyValue("containerProperties.ackMode"))
.isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L);
} }
private void load(String... environment) { private void load(String... environment) {

Loading…
Cancel
Save