Merge pull request #23636 from adrien-ben

* pr/23636:
  Polish "Add configuration options for Kafka Stream's CleanupConfig"
  Add configuration options for Kafka Stream's CleanupConfig

Closes gh-23636
pull/23694/head
Stephane Nicoll 4 years ago
commit 37c4314eea

@ -685,6 +685,8 @@ public class KafkaProperties {
private final Security security = new Security(); private final Security security = new Security();
private final Cleanup cleanup = new Cleanup();
/** /**
* Kafka streams application.id property; default spring.application.name. * Kafka streams application.id property; default spring.application.name.
*/ */
@ -735,6 +737,10 @@ public class KafkaProperties {
return this.security; return this.security;
} }
public Cleanup getCleanup() {
return this.cleanup;
}
public String getApplicationId() { public String getApplicationId() {
return this.applicationId; return this.applicationId;
} }
@ -1234,6 +1240,36 @@ public class KafkaProperties {
} }
public static class Cleanup {
/**
* Cleanup the applications local state directory on startup.
*/
private boolean onStartup = false;
/**
* Cleanup the applications local state directory on shutdown.
*/
private boolean onShutdown = true;
public boolean isOnStartup() {
return this.onStartup;
}
public void setOnStartup(boolean onStartup) {
this.onStartup = onStartup;
}
public boolean isOnShutdown() {
return this.onShutdown;
}
public void setOnShutdown(boolean onShutdown) {
this.onShutdown = onShutdown;
}
}
public enum IsolationLevel { public enum IsolationLevel {
/** /**

@ -34,6 +34,7 @@ import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
/** /**
* Configuration for Kafka Streams annotation-driven support. * Configuration for Kafka Streams annotation-driven support.
@ -91,6 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown());
this.factoryBean.setCleanupConfig(cleanupConfig);
} }
} }

@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests {
}); });
} }
@Test
void streamsWithCleanupConfig() {
this.contextRunner
.withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true",
"spring.kafka.streams.cleanup.on-shutdown=false")
.run((context) -> {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean(StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean)
.extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class))
.satisfies((cleanupConfig) -> {
assertThat(cleanupConfig.cleanupOnStart()).isTrue();
assertThat(cleanupConfig.cleanupOnStop()).isFalse();
});
});
}
@Test @Test
void streamsApplicationIdIsMandatory() { void streamsApplicationIdIsMandatory() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> {

@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -48,4 +50,12 @@ class KafkaPropertiesTests {
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
} }
@Test
void cleanupConfigDefaultValuesAreConsistent() {
CleanupConfig cleanupConfig = new CleanupConfig();
Cleanup cleanup = new KafkaProperties().getStreams().getCleanup();
assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart());
assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop());
}
} }

Loading…
Cancel
Save