Polish Kafka properties

Closes gh-7672
pull/7718/merge
Phillip Webb 8 years ago
parent bdda470305
commit 1f7b3cad45

@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.util.CollectionUtils;
/**
* Configuration properties for Spring for Apache Kafka.
@ -47,18 +48,6 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
private final Consumer consumer = new Consumer();
private final Producer producer = new Producer();
private final Listener listener = new Listener();
private final Template template = new Template();
private final Ssl ssl = new Ssl();
// Apache Kafka Common Properties
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
@ -76,25 +65,15 @@ public class KafkaProperties {
*/
private Map<String, String> properties = new HashMap<String, String>();
public Consumer getConsumer() {
return this.consumer;
}
private final Consumer consumer = new Consumer();
public Producer getProducer() {
return this.producer;
}
private final Producer producer = new Producer();
public Listener getListener() {
return this.listener;
}
private final Listener listener = new Listener();
public Ssl getSsl() {
return this.ssl;
}
private final Ssl ssl = new Ssl();
public Template getTemplate() {
return this.template;
}
private final Template template = new Template();
public List<String> getBootstrapServers() {
return this.bootstrapServers;
@ -120,6 +99,26 @@ public class KafkaProperties {
this.properties = properties;
}
public Consumer getConsumer() {
return this.consumer;
}
public Producer getProducer() {
return this.producer;
}
public Listener getListener() {
return this.listener;
}
public Ssl getSsl() {
return this.ssl;
}
public Template getTemplate() {
return this.template;
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) {
@ -148,7 +147,7 @@ public class KafkaProperties {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.properties != null && this.properties.size() > 0) {
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
@ -163,9 +162,9 @@ public class KafkaProperties {
* instance
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.consumer.buildProperties());
return props;
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.consumer.buildProperties());
return properties;
}
/**
@ -177,9 +176,9 @@ public class KafkaProperties {
* instance
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> props = buildCommonProperties();
props.putAll(this.producer.buildProperties());
return props;
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.producer.buildProperties());
return properties;
}
private static String resourceToPath(Resource resource) {
@ -425,7 +424,8 @@ public class KafkaProperties {
this.valueDeserializer);
}
if (this.maxPollRecords != null) {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.maxPollRecords);
}
return properties;
}

@ -60,8 +60,7 @@ public class KafkaAutoConfigurationTests {
@Test
public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar",
load("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar",
"spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.key-password=p1",
@ -113,8 +112,7 @@ public class KafkaAutoConfigurationTests {
.isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG))
.isEqualTo(42);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");

@ -4646,9 +4646,12 @@ Only a subset of the properties supported by Kafka are available via the `KafkaP
class. If you wish to configure the producer or consumer with additional properties that
are not directly supported, use the following:
`spring.kafka.properties.foo.bar=baz`
[source,properties,indent=0]
----
spring.kafka.properties.foo.bar=baz
----
This sets the common `foo.bar` kafka property to `baz`.
This sets the common `foo.bar` Kafka property to `baz`.
These properties will be shared by both the consumer and producer factory beans.
If you wish to customize these components with different properties, such as to use a
@ -4659,6 +4662,8 @@ different metrics reader for each, you can override the bean definitions, as fol
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
----
[[boot-features-restclient]]
== Calling REST services
If you need to call remote REST services from your application, you can use Spring

@ -32,12 +32,11 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
/**
* Example custom kafka configuration beans used when the user wants to
* apply different common properties to the producer and consumer.
* Example custom kafka configuration beans used when the user wants to apply different
* common properties to the producer and consumer.
*
* @author Gary Russell
* @since 1.5
*
*/
public class KafkaSpecialProducerConsumerConfigExample {
@ -65,7 +64,8 @@ public class KafkaSpecialProducerConsumerConfigExample {
*/
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumererProperties = properties.buildConsumerProperties();
Map<String, Object> consumererProperties = properties
.buildConsumerProperties();
consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MyConsumerMetricsReporter.class);
return new DefaultKafkaConsumerFactory<Object, Object>(consumererProperties);

Loading…
Cancel
Save