From a7acbbd625cf0a2265ce716b29e5f285b86f1848 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 8 Aug 2018 14:42:15 -0400 Subject: [PATCH 1/2] Add Kafka Streams auto-configuration See gh-14021 --- .../spring-boot-autoconfigure/pom.xml | 5 + .../kafka/KafkaAutoConfiguration.java | 62 +++++++ .../autoconfigure/kafka/KafkaProperties.java | 156 +++++++++++++++++- .../kafka/KafkaAutoConfigurationTests.java | 66 ++++++++ .../appendix-application-properties.adoc | 21 ++- .../main/asciidoc/spring-boot-features.adoc | 49 +++++- 6 files changed, 349 insertions(+), 10 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/pom.xml b/spring-boot-project/spring-boot-autoconfigure/pom.xml index f9e08b16b7..da28b3c8b4 100755 --- a/spring-boot-project/spring-boot-autoconfigure/pom.xml +++ b/spring-boot-project/spring-boot-autoconfigure/pom.xml @@ -132,6 +132,11 @@ jest true + + org.apache.kafka + kafka-streams + true + org.flywaydb flyway-core diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 0bc7e88e2c..5b4d253e67 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -17,7 +17,11 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; +import java.util.Map; +import org.apache.kafka.streams.StreamsBuilder; + +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -28,12 +32,17 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.core.env.Environment; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -138,4 +147,57 @@ public class KafkaAutoConfiguration { return kafkaAdmin; } + @Configuration + @ConditionalOnClass(StreamsBuilder.class) + public static class KafkaStreamsAutoConfiguration { + + @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration defaultKafkaStreamsConfig( + KafkaProperties properties, Environment environment) { + + Map streamsProperties = properties.buildStreamsProperties(); + if (properties.getStreams().getApplicationId() == null) { + if (environment.getProperty("spring.application.id") != null) { + streamsProperties.put("application.id", + environment.getProperty("spring.application.name")); + } + } + return new KafkaStreamsConfiguration(streamsProperties); + } + + @Bean + public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( + StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) { + + return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties); + } + + @Configuration + @EnableKafkaStreams + public static class EnableKafkaStreamsAutoConfiguration { + + } + + static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { + + private final StreamsBuilderFactoryBean factoryBean; + + private final KafkaProperties properties; + + KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean, + KafkaProperties properties) { + this.factoryBean = factoryBean; + this.properties = properties; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.factoryBean + .setAutoStartup(this.properties.getStreams().isAutoStartup()); + } + + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 5716bcffe8..e77cf142f6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -57,7 +57,7 @@ public class KafkaProperties { /** * Comma-delimited list of host:port pairs to use for establishing the initial - * connection to the Kafka cluster. + * connection to the Kafka cluster. Applies to all components unless overridden. */ private List bootstrapServers = new ArrayList<>( Collections.singletonList("localhost:9092")); @@ -79,6 +79,8 @@ public class KafkaProperties { private final Admin admin = new Admin(); + private final Streams streams = new Streams(); + private final Listener listener = new Listener(); private final Ssl ssl = new Ssl(); @@ -123,6 +125,10 @@ public class KafkaProperties { return this.admin; } + public Streams getStreams() { + return this.streams; + } + public Ssl getSsl() { return this.ssl; } @@ -193,6 +199,19 @@ public class KafkaProperties { return properties; } + /** + * Create an initial map of streams properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary. + * @return the streams properties initialized with the customizations defined on this + * instance + */ + public Map buildStreamsProperties() { + Map properties = buildCommonProperties(); + properties.putAll(this.streams.buildProperties()); + return properties; + } + public static class Consumer { private final Ssl ssl = new Ssl(); @@ -211,7 +230,7 @@ public class KafkaProperties { /** * Comma-delimited list of host:port pairs to use for establishing the initial - * connection to the Kafka cluster. + * connection to the Kafka cluster. Overrides the global property, for consumers. */ private List bootstrapServers; @@ -421,7 +440,7 @@ public class KafkaProperties { /** * Comma-delimited list of host:port pairs to use for establishing the initial - * connection to the Kafka cluster. + * connection to the Kafka cluster. Overrides the global property, for producers. */ private List bootstrapServers; @@ -631,6 +650,136 @@ public class KafkaProperties { } + /** + * High (and some medium) priority Streams properties and a general properties bucket. + */ + public static class Streams { + + private final Ssl ssl = new Ssl(); + + /** + * Kafka streams application.id property; default spring.application.name. + */ + private String applicationId; + + /** + * Whether or not to auto-start the streams factory bean. + */ + private boolean autoStartup; + + /** + * Comma-delimited list of host:port pairs to use for establishing the initial + * connection to the Kafka cluster. Overrides the global property, for streams. + */ + private List bootstrapServers; + + /** + * Maximum number of memory bytes to be used for buffering across all threads. + */ + private Integer cacheMaxBytesBuffering; + + /** + * ID to pass to the server when making requests. Used for server-side logging. + */ + private String clientId; + + /** + * The replication factor for change log topics and repartition topics created by + * the stream processing application. + */ + private Integer replicationFactor; + + /** + * Directory location for the state store. + */ + private String stateDir; + + /** + * Additional Kafka properties used to configure the streams. + */ + private final Map properties = new HashMap<>(); + + public Ssl getSsl() { + return this.ssl; + } + + public String getApplicationId() { + return this.applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public boolean isAutoStartup() { + return this.autoStartup; + } + + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + public List getBootstrapServers() { + return this.bootstrapServers; + } + + public void setBootstrapServers(List bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public Integer getCacheMaxBytesBuffering() { + return this.cacheMaxBytesBuffering; + } + + public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) { + this.cacheMaxBytesBuffering = cacheMaxBytesBuffering; + } + + public String getClientId() { + return this.clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public Integer getReplicationFactor() { + return this.replicationFactor; + } + + public void setReplicationFactor(Integer replicationFactor) { + this.replicationFactor = replicationFactor; + } + + public String getStateDir() { + return this.stateDir; + } + + public void setStateDir(String stateDir) { + this.stateDir = stateDir; + } + + public Map getProperties() { + return this.properties; + } + + public Map buildProperties() { + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getApplicationId).to(properties.in("application.id")); + map.from(this::getBootstrapServers) + .to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + map.from(this::getCacheMaxBytesBuffering) + .to(properties.in("cache.max.bytes.buffering")); + map.from(this::getClientId) + .to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG)); + map.from(this::getReplicationFactor).to(properties.in("replication.factor")); + map.from(this::getStateDir).to(properties.in("state.dir")); + return properties.with(this.ssl, this.properties); + } + + } + public static class Template { /** @@ -1011,6 +1160,7 @@ public class KafkaProperties { } + @SuppressWarnings("serial") private static class Properties extends HashMap { public java.util.function.Consumer in(String key) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3e91f1d12c..6608107544 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.File; import java.util.Collections; import java.util.Map; +import java.util.Properties; import javax.security.auth.login.AppConfigurationEntry; @@ -30,6 +31,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.streams.StreamsConfig; import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; @@ -37,8 +39,10 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; @@ -273,6 +277,68 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void streamsProperties() { + this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.application.name=appName", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.streams.cache-max-bytes-buffering=42", + "spring.kafka.streams.client-id=override", + "spring.kafka.streams.properties.fiz.buz=fix.fox", + "spring.kafka.streams.replication-factor=2", + "spring.kafka.streams.state-dir=/tmp/state", + "spring.kafka.streams.ssl.key-password=p7", + "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", + "spring.kafka.streams.ssl.key-store-password=p8", + "spring.kafka.streams.ssl.key-store-type=PKCS12", + "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", + "spring.kafka.streams.ssl.trust-store-password=p9", + "spring.kafka.streams.ssl.trust-store-type=PKCS12", + "spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9092, localhost:9093"); + assertThat( + configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) + .isEqualTo("42"); + assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)) + .isEqualTo("override"); + assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)) + .isEqualTo("2"); + assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)) + .isEqualTo("/tmp/state"); + assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) + .isEqualTo("p7"); + assertThat( + (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "ksLocP"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + .isEqualTo("p8"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); + assertThat((String) configs + .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "tsLocP"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p9"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); + assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)) + .isEqualTo("TLSv1.2"); + assertThat( + context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .isEmpty(); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + assertThat(context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)) + .isNotNull(); + }); + } + @SuppressWarnings("unchecked") @Test public void listenerProperties() { diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 571a7e34fe..d8b264da78 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -1039,11 +1039,11 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file. spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.admin.ssl.trust-store-type= # Type of the trust store. - spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. + spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Applies to all components unless overridden. spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. - spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. + spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for consumers. spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background. spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes". @@ -1079,7 +1079,7 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Default batch size in bytes. - spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. + spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for producers. spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server. spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.producer.compression-type= # Compression type for all data generated by the producer. @@ -1105,6 +1105,21 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. + spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean. + spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams. + spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads. + spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. + spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams. + spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application. + spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file. + spring.kafka.streams.ssl.key-store-location= # Location of the key store file. + spring.kafka.streams.ssl.key-store-password= # Store password for the key store file. + spring.kafka.streams.ssl.key-store-type= # Type of the key store. + spring.kafka.streams.ssl.protocol= # SSL protocol to use. + spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file. + spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file. + spring.kafka.streams.ssl.trust-store-type= # Type of the trust store. + spring.kafka.streams.state-dir= # Directory location for the state store. spring.kafka.template.default-topic= # Default topic to which messages are sent. # RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties]) diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index f45f4d509c..d3bfaeb479 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5634,6 +5634,44 @@ The following component creates a listener endpoint on the `someTopic` topic: } ---- +[[boot-deatures-kafka-streams]] +==== Kafka Streams + +Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and +manage the lifecycle of its streams; the factory bean is created when +`@EnableKafkaStreams` is present on a `@Configuration` class. +The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration. + +If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure +the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling +the creation of the factory bean by spring-kafka. + +There are two required Kafka properties for streams (`bootstrap.servers` and +`application.id`); by default, the `application.id` is set to the `spring.application.name` +property, if present. +The `bootstrap.servers` can be set globally or specifically overridden just for streams. +Several other properties are specifically available as boot properties; other arbitrary +Kafka properties can be set using the `spring.kafka.streams.properties` property. +See <> for more information. + +To use the factory bean, simply wire its `StreamsBuilder` into your `@Bean` s. + +==== +[source, java] +---- +@Bean +public KStream kStream(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream("ks1In"); + stream.map((k, v) -> new KeyValue(k, v.toUpperCase())) + .to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); + return stream; +} +---- +==== + +By default, the factory bean `autoStartup` property is false; to automatically start the +streams managed by the `StreamsBuilder` object it creates, set property +`spting.kafka.streams.auto-startup=true`. [[boot-features-kafka-extra-props]] @@ -5643,13 +5681,14 @@ The properties supported by auto configuration are shown in (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. Refer to the Apache Kafka documentation for details. -The first few of these properties apply to both producers and consumers but can be -specified at the producer or consumer level if you wish to use different values for each. +The first few of these properties apply to all components (producers, consumers, admins, +and streams) but can be +specified at the component level if you wish to use different values. Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value. -Only a subset of the properties supported by Kafka are available through the +Only a subset of the properties supported by Kafka are available directly through the `KafkaProperties` class. If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following properties: @@ -5659,11 +5698,13 @@ properties that are not directly supported, use the following properties: spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth + spring.kafka.streams.properties.prop.five=fifth ---- This sets the common `prop.one` Kafka property to `first` (applies to producers, consumers and admins), the `prop.two` admin property to `second`, the `prop.three` -consumer property to `third` and the `prop.four` producer property to `fourth`. +consumer property to `third`, the `prop.four` producer property to `fourth` and the +`prop.five` streams property to `fifth`. You can also configure the Spring Kafka `JsonDeserializer` as follows: From 6d4bab911cdfb79171b5b71c7e7f2785a1654a50 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 16 Aug 2018 18:15:52 +0200 Subject: [PATCH 2/2] Polish "Add Kafka Streams auto-configuration" Closes gh-14021 --- .../kafka/KafkaAutoConfiguration.java | 65 +--------- .../autoconfigure/kafka/KafkaProperties.java | 2 +- ...aStreamsAnnotationDrivenConfiguration.java | 98 +++++++++++++++ ...afkaAutoConfigurationIntegrationTests.java | 21 +++- .../kafka/KafkaAutoConfigurationTests.java | 118 +++++++++++++++--- spring-boot-project/spring-boot-docs/pom.xml | 5 + .../appendix-application-properties.adoc | 3 +- .../main/asciidoc/spring-boot-features.adoc | 50 +++----- .../docs/kafka/KafkaStreamsBeanExample.java | 53 ++++++++ 9 files changed, 302 insertions(+), 113 deletions(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java create mode 100644 spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 5b4d253e67..7750b3fd5c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -17,11 +17,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; -import java.util.Map; -import org.apache.kafka.streams.StreamsBuilder; - -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -32,17 +28,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.core.env.Environment; -import org.springframework.kafka.annotation.EnableKafkaStreams; -import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; -import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.core.StreamsBuilderFactoryBean; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -61,7 +52,8 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; @Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) -@Import(KafkaAnnotationDrivenConfiguration.class) +@Import({ KafkaAnnotationDrivenConfiguration.class, + KafkaStreamsAnnotationDrivenConfiguration.class }) public class KafkaAutoConfiguration { private final KafkaProperties properties; @@ -147,57 +139,4 @@ public class KafkaAutoConfiguration { return kafkaAdmin; } - @Configuration - @ConditionalOnClass(StreamsBuilder.class) - public static class KafkaStreamsAutoConfiguration { - - @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) - public KafkaStreamsConfiguration defaultKafkaStreamsConfig( - KafkaProperties properties, Environment environment) { - - Map streamsProperties = properties.buildStreamsProperties(); - if (properties.getStreams().getApplicationId() == null) { - if (environment.getProperty("spring.application.id") != null) { - streamsProperties.put("application.id", - environment.getProperty("spring.application.name")); - } - } - return new KafkaStreamsConfiguration(streamsProperties); - } - - @Bean - public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) { - - return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties); - } - - @Configuration - @EnableKafkaStreams - public static class EnableKafkaStreamsAutoConfiguration { - - } - - static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { - - private final StreamsBuilderFactoryBean factoryBean; - - private final KafkaProperties properties; - - KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean, - KafkaProperties properties) { - this.factoryBean = factoryBean; - this.properties = properties; - } - - @Override - public void afterPropertiesSet() throws Exception { - this.factoryBean - .setAutoStartup(this.properties.getStreams().isAutoStartup()); - } - - } - - } - } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e77cf142f6..a02450cd62 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -665,7 +665,7 @@ public class KafkaProperties { /** * Whether or not to auto-start the streams factory bean. */ - private boolean autoStartup; + private boolean autoStartup = true; /** * Comma-delimited list of host:port pairs to use for establishing the initial diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java new file mode 100644 index 0000000000..7e48a87cee --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -0,0 +1,98 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.util.Map; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; + +/** + * Configuration for Kafka Streams annotation-driven support. + * + * @author Gary Russell + * @author Stephane Nicoll + */ +@Configuration +@ConditionalOnClass(StreamsBuilder.class) +@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) +class KafkaStreamsAnnotationDrivenConfiguration { + + private final KafkaProperties properties; + + KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) { + this.properties = properties; + } + + @ConditionalOnMissingBean + @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) { + Map streamsProperties = this.properties.buildStreamsProperties(); + if (this.properties.getStreams().getApplicationId() == null) { + String applicationName = environment.getProperty("spring.application.name"); + if (applicationName != null) { + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, + applicationName); + } + else { + throw new InvalidConfigurationPropertyValueException( + "spring.kafka.streams.application-id", null, + "This property is mandatory and fallback 'spring.application.name' is not set either."); + } + } + return new KafkaStreamsConfiguration(streamsProperties); + } + + @Bean + public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( + StreamsBuilderFactoryBean factoryBean) { + return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); + } + + // Separate class required to avoid BeanCurrentlyInCreationException + static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { + + private final KafkaProperties properties; + + private final StreamsBuilderFactoryBean factoryBean; + + KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties, + StreamsBuilderFactoryBean factoryBean) { + this.properties = properties; + this.factoryBean = factoryBean; + } + + @Override + public void afterPropertiesSet() { + this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index 7a6e773c16..8dad270aec 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -28,9 +28,12 @@ import org.junit.Test; import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.messaging.handler.annotation.Header; @@ -41,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests for {@link KafkaAutoConfiguration}. * * @author Gary Russell + * @author Stephane Nicoll */ public class KafkaAutoConfigurationIntegrationTests { @@ -83,6 +87,14 @@ public class KafkaAutoConfigurationIntegrationTests { producer.close(); } + @Test + public void testStreams() { + load(KafkaStreamsConfig.class, "spring.application.name:my-app", + "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString()); + assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup()) + .isTrue(); + } + private void load(Class config, String... environment) { this.context = doLoad(new Class[] { config }, environment); } @@ -101,7 +113,8 @@ public class KafkaAutoConfigurationIntegrationTests { return embeddedKafka.getEmbeddedKafka().getBrokersAsString(); } - public static class KafkaConfig { + @Configuration + static class KafkaConfig { @Bean public Listener listener() { @@ -115,6 +128,12 @@ public class KafkaAutoConfigurationIntegrationTests { } + @Configuration + @EnableKafkaStreams + static class KafkaStreamsConfig { + + } + public static class Listener { private final CountDownLatch latch = new CountDownLatch(1); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 6608107544..816559e886 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.File; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -31,6 +32,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.junit.Test; @@ -39,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; @@ -279,23 +282,26 @@ public class KafkaAutoConfigurationTests { @Test public void streamsProperties() { - this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.application.name=appName", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.streams.cache-max-bytes-buffering=42", - "spring.kafka.streams.client-id=override", - "spring.kafka.streams.properties.fiz.buz=fix.fox", - "spring.kafka.streams.replication-factor=2", - "spring.kafka.streams.state-dir=/tmp/state", - "spring.kafka.streams.ssl.key-password=p7", - "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", - "spring.kafka.streams.ssl.key-store-password=p8", - "spring.kafka.streams.ssl.key-store-type=PKCS12", - "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", - "spring.kafka.streams.ssl.trust-store-password=p9", - "spring.kafka.streams.ssl.trust-store-type=PKCS12", - "spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.kafka.client-id=cid", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.application.name=appName", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.streams.auto-startup=false", + "spring.kafka.streams.cache-max-bytes-buffering=42", + "spring.kafka.streams.client-id=override", + "spring.kafka.streams.properties.fiz.buz=fix.fox", + "spring.kafka.streams.replication-factor=2", + "spring.kafka.streams.state-dir=/tmp/state", + "spring.kafka.streams.ssl.key-password=p7", + "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", + "spring.kafka.streams.ssl.key-store-password=p8", + "spring.kafka.streams.ssl.key-store-type=PKCS12", + "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", + "spring.kafka.streams.ssl.trust-store-password=p9", + "spring.kafka.streams.ssl.trust-store-type=PKCS12", + "spring.kafka.streams.ssl.protocol=TLSv1.2") + .run((context) -> { Properties configs = context.getBean( KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, KafkaStreamsConfiguration.class).asProperties(); @@ -339,6 +345,63 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void streamsApplicationIdUsesMainApplicationNameByDefault() { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false") + .run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9092, localhost:9093"); + assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)) + .isEqualTo("my-test-app"); + }); + } + + @Test + public void streamsWithCustomKafkaConfiguration() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, + TestKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false") + .run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9094, localhost:9095"); + assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)) + .isEqualTo("test-id"); + }); + } + + @Test + public void streamsApplicationIdIsMandatory() { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .run((context) -> { + assertThat(context).hasFailed(); + assertThat(context).getFailure() + .hasMessageContaining("spring.kafka.streams.application-id") + .hasMessageContaining( + "This property is mandatory and fallback 'spring.application.name' is not set either."); + + }); + } + + @Test + public void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() { + this.contextRunner.run((context) -> { + assertThat(context).hasNotFailed(); + assertThat(context).doesNotHaveBean(StreamsBuilder.class); + }); + } + @SuppressWarnings("unchecked") @Test public void listenerProperties() { @@ -470,4 +533,25 @@ public class KafkaAutoConfigurationTests { } + @Configuration + @EnableKafkaStreams + protected static class EnableKafkaStreamsConfiguration { + + } + + @Configuration + protected static class TestKafkaStreamsConfiguration { + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kafkaStreamsConfiguration() { + Map streamsProperties = new HashMap<>(); + streamsProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + "localhost:9094, localhost:9095"); + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-id"); + + return new KafkaStreamsConfiguration(streamsProperties); + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/pom.xml b/spring-boot-project/spring-boot-docs/pom.xml index 96ea4a3ff5..6bbe0cd4ef 100644 --- a/spring-boot-project/spring-boot-docs/pom.xml +++ b/spring-boot-project/spring-boot-docs/pom.xml @@ -397,6 +397,11 @@ commons-dbcp2 true + + org.apache.kafka + kafka-streams + true + org.apache.logging.log4j log4j-api diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index d8b264da78..2ff4350624 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -1105,7 +1105,8 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. - spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean. + spring.kafka.streams.application-id = # Kafka streams application.id property; default spring.application.name. + spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean. spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams. spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads. spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index d3bfaeb479..cc83d9ae71 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5634,44 +5634,34 @@ The following component creates a listener endpoint on the `someTopic` topic: } ---- -[[boot-deatures-kafka-streams]] +[[boot-features-kafka-streams]] ==== Kafka Streams - Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and -manage the lifecycle of its streams; the factory bean is created when -`@EnableKafkaStreams` is present on a `@Configuration` class. -The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration. +manage the lifecycle of its streams. Spring Boot auto-configures the required +`KafkaStreamsConfiguration` bean as long as `kafka-streams` in on the classpath and kafka +streams is enabled via the @EnableKafkaStreams` annotation. -If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure -the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling -the creation of the factory bean by spring-kafka. +Enabling Kafka Streams means that the application id and bootstrap servers must be set. +The former can be configured using `spring.kafka.streams.application-id`, defaulting to +`spring.application.name` if not set. The later can be set globally or +specifically overridden just for streams. -There are two required Kafka properties for streams (`bootstrap.servers` and -`application.id`); by default, the `application.id` is set to the `spring.application.name` -property, if present. -The `bootstrap.servers` can be set globally or specifically overridden just for streams. -Several other properties are specifically available as boot properties; other arbitrary -Kafka properties can be set using the `spring.kafka.streams.properties` property. -See <> for more information. +Several additional properties are available using dedicated properties; other arbitrary +Kafka properties can be set using the `spring.kafka.streams.properties` namespace. See +also <> for more information. -To use the factory bean, simply wire its `StreamsBuilder` into your `@Bean` s. +To use the factory bean, simply wire `StreamsBuilder` into your `@Bean` as shown in the +following example: -==== -[source, java] ----- -@Bean -public KStream kStream(StreamsBuilder streamsBuilder) { - KStream stream = streamsBuilder.stream("ks1In"); - stream.map((k, v) -> new KeyValue(k, v.toUpperCase())) - .to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); - return stream; -} +[source,java,indent=0] ---- -==== +include::{code-examples}/kafka/KafkaStreamsBeanExample.java[tag=configuration] +---- + +By default, the streams managed by the `StreamBuilder` object it creates are started +automatically. You can customize this behaviour using the +`spring.kafka.streams.auto-startup` property. -By default, the factory bean `autoStartup` property is false; to automatically start the -streams managed by the `StreamsBuilder` object it creates, set property -`spting.kafka.streams.auto-startup=true`. [[boot-features-kafka-extra-props]] diff --git a/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java b/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java new file mode 100644 index 0000000000..d9ddf5f195 --- /dev/null +++ b/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java @@ -0,0 +1,53 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.docs.kafka; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.support.serializer.JsonSerde; + +/** + * Example to show usage of {@link StreamsBuilder}. + * + * @author Stephane Nicoll + */ +public class KafkaStreamsBeanExample { + + // tag::configuration[] + @Configuration + @EnableKafkaStreams + static class KafkaStreamsExampleConfiguration { + + @Bean + public KStream kStream(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream("ks1In"); + stream.map((k, v) -> new KeyValue(k, v.toUpperCase())).to("ks1Out", + Produced.with(Serdes.Integer(), new JsonSerde<>())); + return stream; + } + + } + // end::configuration[] + +}