Merge branch '2.2.x'

Closes gh-20919
pull/20923/head
Stephane Nicoll 5 years ago
commit 73ca703007

@ -906,7 +906,7 @@ public class KafkaProperties {
* Whether the container should fail to start if at least one of the configured * Whether the container should fail to start if at least one of the configured
* topics are not present on the broker. * topics are not present on the broker.
*/ */
private boolean missingTopicsFatal = true; private boolean missingTopicsFatal = false;
public Type getType() { public Type getType() {
return this.type; return this.type;

@ -39,7 +39,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -370,7 +369,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=false", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
.run((context) -> { .run((context) -> {
@ -395,7 +394,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isFalse(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1); assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
@ -409,17 +408,6 @@ class KafkaAutoConfigurationTests {
}); });
} }
@Test
void listenerPropertiesMatchDefaults() {
this.contextRunner.run((context) -> {
Listener listenerProperties = new KafkaProperties().getListener();
AbstractKafkaListenerContainerFactory<?, ?, ?> kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory<?, ?, ?>) context
.getBean(KafkaListenerContainerFactory.class);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.isMissingTopicsFatal()).isEqualTo(listenerProperties.isMissingTopicsFatal());
});
}
@Test @Test
void testKafkaTemplateRecordMessageConverters() { void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)

@ -19,6 +19,8 @@ package org.springframework.boot.autoconfigure.kafka;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -40,4 +42,11 @@ class KafkaPropertiesTests {
assertThat(original).hasSize(IsolationLevel.values().length); assertThat(original).hasSize(IsolationLevel.values().length);
} }
@Test
void listenerDefaultValuesAreConsistent() {
ContainerProperties container = new ContainerProperties("test");
Listener listenerProperties = new KafkaProperties().getListener();
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
}
} }

Loading…
Cancel
Save