|
|
|
@ -79,38 +79,35 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
|
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
|
|
|
|
|
.getBean(DefaultKafkaConsumerFactory.class);
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(
|
|
|
|
|
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
|
|
|
|
|
consumerFactory).getPropertyValue("configs");
|
|
|
|
|
// common
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.isEqualTo(Collections.singletonList("foo:1234"));
|
|
|
|
|
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
|
|
|
|
|
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLoc");
|
|
|
|
|
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
|
|
|
|
|
.isEqualTo("p2");
|
|
|
|
|
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLoc");
|
|
|
|
|
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
|
|
|
|
|
.isEqualTo("p3");
|
|
|
|
|
// consumer
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
|
|
|
|
|
.isEqualTo(Boolean.FALSE);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
|
|
|
|
|
.isEqualTo(123L);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
|
|
|
|
|
.isEqualTo("earliest");
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG))
|
|
|
|
|
.isEqualTo(456);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG))
|
|
|
|
|
.isEqualTo(789);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG))
|
|
|
|
|
.isEqualTo(234);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(LongDeserializer.class);
|
|
|
|
|
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(IntegerDeserializer.class);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -119,7 +116,6 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
|
load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all",
|
|
|
|
|
"spring.kafka.producer.batch-size=20",
|
|
|
|
|
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override
|
|
|
|
|
// common
|
|
|
|
|
"spring.kafka.producer.buffer-memory=12345",
|
|
|
|
|
"spring.kafka.producer.compression-type=gzip",
|
|
|
|
|
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
|
|
|
|
@ -133,39 +129,35 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
|
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
|
|
|
|
|
.getBean(DefaultKafkaProducerFactory.class);
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(
|
|
|
|
|
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
|
|
|
|
|
producerFactory).getPropertyValue("configs");
|
|
|
|
|
// common
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
|
|
|
|
// producer
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.isEqualTo(Collections.singletonList("bar:1234")); // override
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG))
|
|
|
|
|
.isEqualTo(12345L);
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))
|
|
|
|
|
.isEqualTo("gzip");
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(LongSerializer.class);
|
|
|
|
|
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
|
|
|
|
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLocP");
|
|
|
|
|
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
|
|
|
|
|
.isEqualTo("p5");
|
|
|
|
|
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLocP");
|
|
|
|
|
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
|
|
|
|
|
.isEqualTo("p6");
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
|
|
|
|
|
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
|
|
|
|
|
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(IntegerSerializer.class);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void listenerProperties() {
|
|
|
|
|
load("spring.kafka.template.default-topic=testTopic",
|
|
|
|
|
|
|
|
|
|
"spring.kafka.listener.ack-mode=MANUAL",
|
|
|
|
|
"spring.kafka.listener.ack-count=123",
|
|
|
|
|
"spring.kafka.listener.ack-time=456",
|
|
|
|
|