|
|
|
@ -121,38 +121,38 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
.getBean(DefaultKafkaConsumerFactory.class);
|
|
|
|
|
Map<String, Object> configs = consumerFactory.getConfigurationProperties();
|
|
|
|
|
// common
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.isEqualTo(Collections.singletonList("foo:1234"));
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
|
|
|
|
Collections.singletonList("foo:1234"));
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "p1");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLoc");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "p2");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLoc");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)).isEqualTo("TLSv1.2");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "p3");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
|
|
|
|
|
// consumer
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo(Boolean.FALSE);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).isEqualTo(123);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(LongDeserializer.class);
|
|
|
|
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(IntegerDeserializer.class);
|
|
|
|
|
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
|
|
|
|
|
assertThat(configs.get("foo")).isEqualTo("bar");
|
|
|
|
|
assertThat(configs.get("baz")).isEqualTo("qux");
|
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.CLIENT_ID_CONFIG, "ccid"); // override
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 123);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 456);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.GROUP_ID_CONFIG, "bar");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 234);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
|
|
|
|
LongDeserializer.class);
|
|
|
|
|
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
|
|
|
|
IntegerDeserializer.class);
|
|
|
|
|
assertThat(configs).containsEntry(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 42);
|
|
|
|
|
assertThat(configs).containsEntry("foo", "bar");
|
|
|
|
|
assertThat(configs).containsEntry("baz", "qux");
|
|
|
|
|
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
|
|
|
|
|
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -177,33 +177,33 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
.getBean(DefaultKafkaProducerFactory.class);
|
|
|
|
|
Map<String, Object> configs = producerFactory.getConfigurationProperties();
|
|
|
|
|
// common
|
|
|
|
|
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.CLIENT_ID_CONFIG, "cid");
|
|
|
|
|
// producer
|
|
|
|
|
assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(2048);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.isEqualTo(Collections.singletonList("bar:1234")); // override
|
|
|
|
|
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(4096L);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
|
|
|
|
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.ACKS_CONFIG, "all");
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.BATCH_SIZE_CONFIG, 2048);
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
|
|
|
|
Collections.singletonList("bar:1234")); // override
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.BUFFER_MEMORY_CONFIG, 4096L);
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
|
|
|
|
|
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "p4");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "p5");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)).isEqualTo("TLSv1.2");
|
|
|
|
|
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
|
|
|
|
|
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
|
|
|
|
|
.isEqualTo(IntegerSerializer.class);
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "p6");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.RETRIES_CONFIG, 2);
|
|
|
|
|
assertThat(configs).containsEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
|
|
|
|
IntegerSerializer.class);
|
|
|
|
|
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).isEmpty();
|
|
|
|
|
assertThat(context.getBeansOfType(KafkaTransactionManager.class)).isEmpty();
|
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
|
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
|
|
|
|
|
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -221,22 +221,22 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
|
|
|
|
Map<String, Object> configs = admin.getConfigurationProperties();
|
|
|
|
|
// common
|
|
|
|
|
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
|
|
|
|
assertThat(configs).containsEntry(AdminClientConfig.CLIENT_ID_CONFIG, "cid");
|
|
|
|
|
// admin
|
|
|
|
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
|
|
|
|
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "p4");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "p5");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)).isEqualTo("TLSv1.2");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "p6");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
|
|
|
|
|
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).isEmpty();
|
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
|
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
|
|
|
|
|
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
|
|
|
|
|
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
|
|
|
|
|
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", true);
|
|
|
|
|
});
|
|
|
|
@ -263,24 +263,24 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
.asProperties();
|
|
|
|
|
assertThat((List<String>) configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.containsExactly("localhost:9092", "localhost:9093");
|
|
|
|
|
assertThat(configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)).isEqualTo(1024);
|
|
|
|
|
assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)).isEqualTo("override");
|
|
|
|
|
assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).isEqualTo(2);
|
|
|
|
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
|
|
|
|
assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)).isEqualTo("/tmp/state");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p7");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024);
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.CLIENT_ID_CONFIG, "override");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
|
|
|
|
|
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "p7");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "ksLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p8");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "p8");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
|
|
|
|
|
.endsWith(File.separator + "tsLocP");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p9");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).isEqualTo("PKCS12");
|
|
|
|
|
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)).isEqualTo("TLSv1.2");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "p9");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
|
|
|
|
|
assertThat(configs).containsEntry(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
|
|
|
|
|
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).isEmpty();
|
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
|
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
|
|
|
|
|
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
|
|
|
|
|
assertThat(context.getBean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME))
|
|
|
|
|
.isNotNull();
|
|
|
|
|
});
|
|
|
|
@ -300,7 +300,7 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
.asProperties();
|
|
|
|
|
assertThat((List<String>) configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.containsExactly("localhost:9092", "localhost:9093");
|
|
|
|
|
assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)).isEqualTo("my-test-app");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-app");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -316,9 +316,9 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
.getBean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
|
|
|
|
|
KafkaStreamsConfiguration.class)
|
|
|
|
|
.asProperties();
|
|
|
|
|
assertThat(configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
|
|
|
|
|
.isEqualTo("localhost:9094, localhost:9095");
|
|
|
|
|
assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)).isEqualTo("test-id");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
|
|
|
|
|
"localhost:9094, localhost:9095");
|
|
|
|
|
assertThat(configs).containsEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-id");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -479,7 +479,7 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
|
|
|
|
|
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
|
|
|
|
|
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
|
|
|
|
|
assertThat(containerProperties.isAsyncAcks()).isEqualTo(true);
|
|
|
|
|
assertThat(containerProperties.isAsyncAcks()).isTrue();
|
|
|
|
|
assertThat(containerProperties.getClientId()).isEqualTo("client");
|
|
|
|
|
assertThat(containerProperties.getAckCount()).isEqualTo(123);
|
|
|
|
|
assertThat(containerProperties.getAckTime()).isEqualTo(456L);
|
|
|
|
@ -672,10 +672,10 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
DefaultKafkaProducerFactory<?, ?> producerFactory = context
|
|
|
|
|
.getBean(DefaultKafkaProducerFactory.class);
|
|
|
|
|
Map<String, Object> producerConfigs = producerFactory.getConfigurationProperties();
|
|
|
|
|
assertThat(producerConfigs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
|
|
|
|
assertThat(producerConfigs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
|
|
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
|
|
|
|
Map<String, Object> configs = admin.getConfigurationProperties();
|
|
|
|
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("PLAINTEXT");
|
|
|
|
|
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|