From bdda4703050b3d8dbac2312a44174e5e5c55716b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 16 Dec 2016 12:23:06 -0500 Subject: [PATCH 1/2] Support arbitrary Kafka properties Add support for arbitrary Kafka properties via `spring.kafka.properties.*` and also a `spring.kafka.max.poll.records` property. See gh-7672 --- .../autoconfigure/kafka/KafkaProperties.java | 32 +++++ .../kafka/KafkaAutoConfigurationTests.java | 9 ++ .../appendix-application-properties.adoc | 2 + .../main/asciidoc/spring-boot-features.adoc | 22 +-- ...aSpecialProducerConsumerConfigExample.java | 125 ++++++++++++++++++ 5 files changed, 179 insertions(+), 11 deletions(-) create mode 100644 spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index c6ed49f490..c7e9fd4529 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -71,6 +71,11 @@ public class KafkaProperties { */ private String clientId; + /** + * Additional properties used to configure the client. + */ + private Map properties = new HashMap(); + public Consumer getConsumer() { return this.consumer; } @@ -107,6 +112,14 @@ public class KafkaProperties { this.clientId = clientId; } + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + private Map buildCommonProperties() { Map properties = new HashMap(); if (this.bootstrapServers != null) { @@ -135,6 +148,9 @@ public class KafkaProperties { properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); } + if (this.properties != null && this.properties.size() > 0) { + properties.putAll(this.properties); + } return properties; } @@ -240,6 +256,11 @@ public class KafkaProperties { */ private Class valueDeserializer = StringDeserializer.class; + /** + * Maximum number of records returned in a single call to poll(). + */ + private Integer maxPollRecords; + public Ssl getSsl() { return this.ssl; } @@ -332,6 +353,14 @@ public class KafkaProperties { this.valueDeserializer = valueDeserializer; } + public Integer getMaxPollRecords() { + return this.maxPollRecords; + } + + public void setMaxPollRecords(Integer maxPollRecords) { + this.maxPollRecords = maxPollRecords; + } + public Map buildProperties() { Map properties = new HashMap(); if (this.autoCommitInterval != null) { @@ -395,6 +424,9 @@ public class KafkaProperties { properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer); } + if (this.maxPollRecords != null) { + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords); + } return properties; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index db3a8d8bae..63b92b8a5d 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -61,12 +61,16 @@ public class KafkaAutoConfigurationTests { @Test public void consumerProperties() { load("spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.properties.foo=bar", + "spring.kafka.properties.baz=qux", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.ssl.key-password=p1", "spring.kafka.ssl.keystore-location=classpath:ksLoc", "spring.kafka.ssl.keystore-password=p2", "spring.kafka.ssl.truststore-location=classpath:tsLoc", "spring.kafka.ssl.truststore-password=p3", "spring.kafka.consumer.auto-commit-interval=123", + "spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.client-id=ccid", // test override common "spring.kafka.consumer.enable-auto-commit=false", @@ -109,6 +113,11 @@ public class KafkaAutoConfigurationTests { .isEqualTo(LongDeserializer.class); assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerDeserializer.class); + assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)) + .isEqualTo(42); + assertThat(configs.get("foo")).isEqualTo("bar"); + assertThat(configs.get("baz")).isEqualTo("qux"); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); } @Test diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 196d51c2f4..47008f6b67 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -878,6 +878,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to. spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. + spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. @@ -893,6 +894,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.producer.key-serializer= # Serializer class for keys. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.value-serializer= # Serializer class for values. + spring.kafka.properties.*= # Additional properties used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.keystore-location= # Location of the key store file. spring.kafka.ssl.keystore-password= # Store password for the key store file. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 1ae03b95ab..3e9c4d5cfa 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4643,22 +4643,22 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM and any that do not have a default value. Only a subset of the properties supported by Kafka are available via the `KafkaProperties` -class. If you wish to configure the producer or consumer with additional properties, you -can override the producer factory and/or consumer factory bean, adding additional -properties, for example: +class. If you wish to configure the producer or consumer with additional properties that +are not directly supported, use the following: + +`spring.kafka.properties.foo.bar=baz` + +This sets the common `foo.bar` kafka property to `baz`. + +These properties will be shared by both the consumer and producer factory beans. +If you wish to customize these components with different properties, such as to use a +different metrics reader for each, you can override the bean definitions, as follows: [source,java,indent=0] ---- - @Bean - public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { - Map producerProperties = properties.buildProducerProperties(); - producerProperties.put("some.property", "some.value"); - return new DefaultKafkaProducerFactory(producerProperties); - } +include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration] ---- - - [[boot-features-restclient]] == Calling REST services If you need to call remote REST services from your application, you can use Spring diff --git a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java new file mode 100644 index 0000000000..43dfc60ce6 --- /dev/null +++ b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java @@ -0,0 +1,125 @@ +/* + * Copyright 2016-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.kafka; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; + +/** + * Example custom kafka configuration beans used when the user wants to + * apply different common properties to the producer and consumer. + * + * @author Gary Russell + * @since 1.5 + * + */ +public class KafkaSpecialProducerConsumerConfigExample { + + // tag::configuration[] + @Configuration + public static class CustomKafkaBeans { + + /** + * Customized ProducerFactory bean. + * @param properties the kafka properties. + * @return the bean. + */ + @Bean + public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { + Map producerProperties = properties.buildProducerProperties(); + producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MyProducerMetricsReporter.class); + return new DefaultKafkaProducerFactory(producerProperties); + } + + /** + * Customized ConsumerFactory bean. + * @param properties the kafka properties. + * @return the bean. + */ + @Bean + public ConsumerFactory kafkaConsumerFactory(KafkaProperties properties) { + Map consumererProperties = properties.buildConsumerProperties(); + consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MyConsumerMetricsReporter.class); + return new DefaultKafkaConsumerFactory(consumererProperties); + } + + } + // end::configuration[] + + public static class MyConsumerMetricsReporter implements MetricsReporter { + + @Override + public void configure(Map configs) { + } + + @Override + public void init(List metrics) { + } + + @Override + public void metricChange(KafkaMetric metric) { + } + + @Override + public void metricRemoval(KafkaMetric metric) { + } + + @Override + public void close() { + } + + } + + public static class MyProducerMetricsReporter implements MetricsReporter { + + @Override + public void configure(Map configs) { + } + + @Override + public void init(List metrics) { + } + + @Override + public void metricChange(KafkaMetric metric) { + } + + @Override + public void metricRemoval(KafkaMetric metric) { + } + + @Override + public void close() { + } + + } + +} From 1f7b3cad4515c9836753afc16f4dcf1875784a9c Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Tue, 20 Dec 2016 18:14:14 -0800 Subject: [PATCH 2/2] Polish Kafka properties Closes gh-7672 --- .../autoconfigure/kafka/KafkaProperties.java | 70 +++++++++---------- .../kafka/KafkaAutoConfigurationTests.java | 6 +- .../main/asciidoc/spring-boot-features.adoc | 9 ++- ...aSpecialProducerConsumerConfigExample.java | 8 +-- 4 files changed, 48 insertions(+), 45 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index c7e9fd4529..d4e0774400 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.util.CollectionUtils; /** * Configuration properties for Spring for Apache Kafka. @@ -47,18 +48,6 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo @ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties { - private final Consumer consumer = new Consumer(); - - private final Producer producer = new Producer(); - - private final Listener listener = new Listener(); - - private final Template template = new Template(); - - private final Ssl ssl = new Ssl(); - - // Apache Kafka Common Properties - /** * Comma-delimited list of host:port pairs to use for establishing the initial * connection to the Kafka cluster. @@ -76,25 +65,15 @@ public class KafkaProperties { */ private Map properties = new HashMap(); - public Consumer getConsumer() { - return this.consumer; - } + private final Consumer consumer = new Consumer(); - public Producer getProducer() { - return this.producer; - } + private final Producer producer = new Producer(); - public Listener getListener() { - return this.listener; - } + private final Listener listener = new Listener(); - public Ssl getSsl() { - return this.ssl; - } + private final Ssl ssl = new Ssl(); - public Template getTemplate() { - return this.template; - } + private final Template template = new Template(); public List getBootstrapServers() { return this.bootstrapServers; @@ -120,6 +99,26 @@ public class KafkaProperties { this.properties = properties; } + public Consumer getConsumer() { + return this.consumer; + } + + public Producer getProducer() { + return this.producer; + } + + public Listener getListener() { + return this.listener; + } + + public Ssl getSsl() { + return this.ssl; + } + + public Template getTemplate() { + return this.template; + } + private Map buildCommonProperties() { Map properties = new HashMap(); if (this.bootstrapServers != null) { @@ -148,7 +147,7 @@ public class KafkaProperties { properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); } - if (this.properties != null && this.properties.size() > 0) { + if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } return properties; @@ -163,9 +162,9 @@ public class KafkaProperties { * instance */ public Map buildConsumerProperties() { - Map props = buildCommonProperties(); - props.putAll(this.consumer.buildProperties()); - return props; + Map properties = buildCommonProperties(); + properties.putAll(this.consumer.buildProperties()); + return properties; } /** @@ -177,9 +176,9 @@ public class KafkaProperties { * instance */ public Map buildProducerProperties() { - Map props = buildCommonProperties(); - props.putAll(this.producer.buildProperties()); - return props; + Map properties = buildCommonProperties(); + properties.putAll(this.producer.buildProperties()); + return properties; } private static String resourceToPath(Resource resource) { @@ -425,7 +424,8 @@ public class KafkaProperties { this.valueDeserializer); } if (this.maxPollRecords != null) { - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + this.maxPollRecords); } return properties; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 63b92b8a5d..dc27e38da6 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -60,8 +60,7 @@ public class KafkaAutoConfigurationTests { @Test public void consumerProperties() { - load("spring.kafka.bootstrap-servers=foo:1234", - "spring.kafka.properties.foo=bar", + load("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar", "spring.kafka.properties.baz=qux", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.ssl.key-password=p1", @@ -113,8 +112,7 @@ public class KafkaAutoConfigurationTests { .isEqualTo(LongDeserializer.class); assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerDeserializer.class); - assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)) - .isEqualTo(42); + assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); assertThat(configs.get("foo")).isEqualTo("bar"); assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 3e9c4d5cfa..a10482548e 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4646,9 +4646,12 @@ Only a subset of the properties supported by Kafka are available via the `KafkaP class. If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following: -`spring.kafka.properties.foo.bar=baz` +[source,properties,indent=0] +---- + spring.kafka.properties.foo.bar=baz +---- -This sets the common `foo.bar` kafka property to `baz`. +This sets the common `foo.bar` Kafka property to `baz`. These properties will be shared by both the consumer and producer factory beans. If you wish to customize these components with different properties, such as to use a @@ -4659,6 +4662,8 @@ different metrics reader for each, you can override the bean definitions, as fol include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration] ---- + + [[boot-features-restclient]] == Calling REST services If you need to call remote REST services from your application, you can use Spring diff --git a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java index 43dfc60ce6..23a7aa139d 100644 --- a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java +++ b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java @@ -32,12 +32,11 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.ProducerFactory; /** - * Example custom kafka configuration beans used when the user wants to - * apply different common properties to the producer and consumer. + * Example custom kafka configuration beans used when the user wants to apply different + * common properties to the producer and consumer. * * @author Gary Russell * @since 1.5 - * */ public class KafkaSpecialProducerConsumerConfigExample { @@ -65,7 +64,8 @@ public class KafkaSpecialProducerConsumerConfigExample { */ @Bean public ConsumerFactory kafkaConsumerFactory(KafkaProperties properties) { - Map consumererProperties = properties.buildConsumerProperties(); + Map consumererProperties = properties + .buildConsumerProperties(); consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MyConsumerMetricsReporter.class); return new DefaultKafkaConsumerFactory(consumererProperties);