bootstrapServers = new ArrayList<>(
Collections.singletonList("localhost:9092"));
@@ -79,6 +79,8 @@ public class KafkaProperties {
private final Admin admin = new Admin();
+ private final Streams streams = new Streams();
+
private final Listener listener = new Listener();
private final Ssl ssl = new Ssl();
@@ -123,6 +125,10 @@ public class KafkaProperties {
return this.admin;
}
+ public Streams getStreams() {
+ return this.streams;
+ }
+
public Ssl getSsl() {
return this.ssl;
}
@@ -193,6 +199,19 @@ public class KafkaProperties {
return properties;
}
+ /**
+ * Create an initial map of streams properties from the state of this instance.
+ *
+ * This allows you to add additional properties, if necessary.
+ * @return the streams properties initialized with the customizations defined on this
+ * instance
+ */
+ public Map buildStreamsProperties() {
+ Map properties = buildCommonProperties();
+ properties.putAll(this.streams.buildProperties());
+ return properties;
+ }
+
public static class Consumer {
private final Ssl ssl = new Ssl();
@@ -211,7 +230,7 @@ public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
- * connection to the Kafka cluster.
+ * connection to the Kafka cluster. Overrides the global property, for consumers.
*/
private List bootstrapServers;
@@ -421,7 +440,7 @@ public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
- * connection to the Kafka cluster.
+ * connection to the Kafka cluster. Overrides the global property, for producers.
*/
private List bootstrapServers;
@@ -631,6 +650,136 @@ public class KafkaProperties {
}
+ /**
+ * High (and some medium) priority Streams properties and a general properties bucket.
+ */
+ public static class Streams {
+
+ private final Ssl ssl = new Ssl();
+
+ /**
+ * Kafka streams application.id property; default spring.application.name.
+ */
+ private String applicationId;
+
+ /**
+ * Whether or not to auto-start the streams factory bean.
+ */
+ private boolean autoStartup = true;
+
+ /**
+ * Comma-delimited list of host:port pairs to use for establishing the initial
+ * connection to the Kafka cluster. Overrides the global property, for streams.
+ */
+ private List bootstrapServers;
+
+ /**
+ * Maximum number of memory bytes to be used for buffering across all threads.
+ */
+ private Integer cacheMaxBytesBuffering;
+
+ /**
+ * ID to pass to the server when making requests. Used for server-side logging.
+ */
+ private String clientId;
+
+ /**
+ * The replication factor for change log topics and repartition topics created by
+ * the stream processing application.
+ */
+ private Integer replicationFactor;
+
+ /**
+ * Directory location for the state store.
+ */
+ private String stateDir;
+
+ /**
+ * Additional Kafka properties used to configure the streams.
+ */
+ private final Map properties = new HashMap<>();
+
+ public Ssl getSsl() {
+ return this.ssl;
+ }
+
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public boolean isAutoStartup() {
+ return this.autoStartup;
+ }
+
+ public void setAutoStartup(boolean autoStartup) {
+ this.autoStartup = autoStartup;
+ }
+
+ public List getBootstrapServers() {
+ return this.bootstrapServers;
+ }
+
+ public void setBootstrapServers(List bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public Integer getCacheMaxBytesBuffering() {
+ return this.cacheMaxBytesBuffering;
+ }
+
+ public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) {
+ this.cacheMaxBytesBuffering = cacheMaxBytesBuffering;
+ }
+
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public Integer getReplicationFactor() {
+ return this.replicationFactor;
+ }
+
+ public void setReplicationFactor(Integer replicationFactor) {
+ this.replicationFactor = replicationFactor;
+ }
+
+ public String getStateDir() {
+ return this.stateDir;
+ }
+
+ public void setStateDir(String stateDir) {
+ this.stateDir = stateDir;
+ }
+
+ public Map getProperties() {
+ return this.properties;
+ }
+
+ public Map buildProperties() {
+ Properties properties = new Properties();
+ PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
+ map.from(this::getApplicationId).to(properties.in("application.id"));
+ map.from(this::getBootstrapServers)
+ .to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+ map.from(this::getCacheMaxBytesBuffering)
+ .to(properties.in("cache.max.bytes.buffering"));
+ map.from(this::getClientId)
+ .to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
+ map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
+ map.from(this::getStateDir).to(properties.in("state.dir"));
+ return properties.with(this.ssl, this.properties);
+ }
+
+ }
+
public static class Template {
/**
@@ -1011,6 +1160,7 @@ public class KafkaProperties {
}
+ @SuppressWarnings("serial")
private static class Properties extends HashMap {
public java.util.function.Consumer in(String key) {
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java
new file mode 100644
index 0000000000..7e48a87cee
--- /dev/null
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.boot.autoconfigure.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.Environment;
+import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
+import org.springframework.kafka.config.KafkaStreamsConfiguration;
+import org.springframework.kafka.core.StreamsBuilderFactoryBean;
+
+/**
+ * Configuration for Kafka Streams annotation-driven support.
+ *
+ * @author Gary Russell
+ * @author Stephane Nicoll
+ */
+@Configuration
+@ConditionalOnClass(StreamsBuilder.class)
+@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
+class KafkaStreamsAnnotationDrivenConfiguration {
+
+ private final KafkaProperties properties;
+
+ KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) {
+ this.properties = properties;
+ }
+
+ @ConditionalOnMissingBean
+ @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
+ public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
+ Map streamsProperties = this.properties.buildStreamsProperties();
+ if (this.properties.getStreams().getApplicationId() == null) {
+ String applicationName = environment.getProperty("spring.application.name");
+ if (applicationName != null) {
+ streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
+ applicationName);
+ }
+ else {
+ throw new InvalidConfigurationPropertyValueException(
+ "spring.kafka.streams.application-id", null,
+ "This property is mandatory and fallback 'spring.application.name' is not set either.");
+ }
+ }
+ return new KafkaStreamsConfiguration(streamsProperties);
+ }
+
+ @Bean
+ public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
+ StreamsBuilderFactoryBean factoryBean) {
+ return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
+ }
+
+ // Separate class required to avoid BeanCurrentlyInCreationException
+ static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
+
+ private final KafkaProperties properties;
+
+ private final StreamsBuilderFactoryBean factoryBean;
+
+ KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties,
+ StreamsBuilderFactoryBean factoryBean) {
+ this.properties = properties;
+ this.factoryBean = factoryBean;
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
+ }
+
+ }
+
+}
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
index 7a6e773c16..8dad270aec 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
@@ -28,9 +28,12 @@ import org.junit.Test;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.handler.annotation.Header;
@@ -41,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Integration tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
+ * @author Stephane Nicoll
*/
public class KafkaAutoConfigurationIntegrationTests {
@@ -83,6 +87,14 @@ public class KafkaAutoConfigurationIntegrationTests {
producer.close();
}
+ @Test
+ public void testStreams() {
+ load(KafkaStreamsConfig.class, "spring.application.name:my-app",
+ "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString());
+ assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup())
+ .isTrue();
+ }
+
private void load(Class> config, String... environment) {
this.context = doLoad(new Class>[] { config }, environment);
}
@@ -101,7 +113,8 @@ public class KafkaAutoConfigurationIntegrationTests {
return embeddedKafka.getEmbeddedKafka().getBrokersAsString();
}
- public static class KafkaConfig {
+ @Configuration
+ static class KafkaConfig {
@Bean
public Listener listener() {
@@ -115,6 +128,12 @@ public class KafkaAutoConfigurationIntegrationTests {
}
+ @Configuration
+ @EnableKafkaStreams
+ static class KafkaStreamsConfig {
+
+ }
+
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
index 3e91f1d12c..816559e886 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
@@ -18,7 +18,9 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import javax.security.auth.login.AppConfigurationEntry;
@@ -30,6 +32,8 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
@@ -37,8 +41,11 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafkaStreams;
+import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
@@ -273,6 +280,128 @@ public class KafkaAutoConfigurationTests {
});
}
+ @Test
+ public void streamsProperties() {
+ this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
+ .withPropertyValues("spring.kafka.client-id=cid",
+ "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
+ "spring.application.name=appName",
+ "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
+ "spring.kafka.streams.auto-startup=false",
+ "spring.kafka.streams.cache-max-bytes-buffering=42",
+ "spring.kafka.streams.client-id=override",
+ "spring.kafka.streams.properties.fiz.buz=fix.fox",
+ "spring.kafka.streams.replication-factor=2",
+ "spring.kafka.streams.state-dir=/tmp/state",
+ "spring.kafka.streams.ssl.key-password=p7",
+ "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
+ "spring.kafka.streams.ssl.key-store-password=p8",
+ "spring.kafka.streams.ssl.key-store-type=PKCS12",
+ "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
+ "spring.kafka.streams.ssl.trust-store-password=p9",
+ "spring.kafka.streams.ssl.trust-store-type=PKCS12",
+ "spring.kafka.streams.ssl.protocol=TLSv1.2")
+ .run((context) -> {
+ Properties configs = context.getBean(
+ KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
+ KafkaStreamsConfiguration.class).asProperties();
+ assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo("localhost:9092, localhost:9093");
+ assertThat(
+ configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
+ .isEqualTo("42");
+ assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG))
+ .isEqualTo("override");
+ assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG))
+ .isEqualTo("2");
+ assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG))
+ .isEqualTo("/tmp/state");
+ assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
+ .isEqualTo("p7");
+ assertThat(
+ (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "ksLocP");
+ assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+ .isEqualTo("p8");
+ assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
+ .isEqualTo("PKCS12");
+ assertThat((String) configs
+ .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+ .endsWith(File.separator + "tsLocP");
+ assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
+ .isEqualTo("p9");
+ assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
+ .isEqualTo("PKCS12");
+ assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
+ .isEqualTo("TLSv1.2");
+ assertThat(
+ context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
+ .isEmpty();
+ assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
+ assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
+ assertThat(context.getBean(
+ KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME))
+ .isNotNull();
+ });
+ }
+
+ @Test
+ public void streamsApplicationIdUsesMainApplicationNameByDefault() {
+ this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
+ .withPropertyValues("spring.application.name=my-test-app",
+ "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
+ "spring.kafka.streams.auto-startup=false")
+ .run((context) -> {
+ Properties configs = context.getBean(
+ KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
+ KafkaStreamsConfiguration.class).asProperties();
+ assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo("localhost:9092, localhost:9093");
+ assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG))
+ .isEqualTo("my-test-app");
+ });
+ }
+
+ @Test
+ public void streamsWithCustomKafkaConfiguration() {
+ this.contextRunner
+ .withUserConfiguration(EnableKafkaStreamsConfiguration.class,
+ TestKafkaStreamsConfiguration.class)
+ .withPropertyValues("spring.application.name=my-test-app",
+ "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
+ "spring.kafka.streams.auto-startup=false")
+ .run((context) -> {
+ Properties configs = context.getBean(
+ KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
+ KafkaStreamsConfiguration.class).asProperties();
+ assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo("localhost:9094, localhost:9095");
+ assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG))
+ .isEqualTo("test-id");
+ });
+ }
+
+ @Test
+ public void streamsApplicationIdIsMandatory() {
+ this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
+ .run((context) -> {
+ assertThat(context).hasFailed();
+ assertThat(context).getFailure()
+ .hasMessageContaining("spring.kafka.streams.application-id")
+ .hasMessageContaining(
+ "This property is mandatory and fallback 'spring.application.name' is not set either.");
+
+ });
+ }
+
+ @Test
+ public void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
+ this.contextRunner.run((context) -> {
+ assertThat(context).hasNotFailed();
+ assertThat(context).doesNotHaveBean(StreamsBuilder.class);
+ });
+ }
+
@SuppressWarnings("unchecked")
@Test
public void listenerProperties() {
@@ -404,4 +533,25 @@ public class KafkaAutoConfigurationTests {
}
+ @Configuration
+ @EnableKafkaStreams
+ protected static class EnableKafkaStreamsConfiguration {
+
+ }
+
+ @Configuration
+ protected static class TestKafkaStreamsConfiguration {
+
+ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
+ public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
+ Map streamsProperties = new HashMap<>();
+ streamsProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ "localhost:9094, localhost:9095");
+ streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-id");
+
+ return new KafkaStreamsConfiguration(streamsProperties);
+ }
+
+ }
+
}
diff --git a/spring-boot-project/spring-boot-docs/pom.xml b/spring-boot-project/spring-boot-docs/pom.xml
index 96ea4a3ff5..6bbe0cd4ef 100644
--- a/spring-boot-project/spring-boot-docs/pom.xml
+++ b/spring-boot-project/spring-boot-docs/pom.xml
@@ -397,6 +397,11 @@
commons-dbcp2
true