diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 7f9b95c6d4..19fc9d249d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,6 +91,8 @@ public class KafkaProperties { private final Template template = new Template(); + private final Security security = new Security(); + public List getBootstrapServers() { return this.bootstrapServers; } @@ -143,6 +145,10 @@ public class KafkaProperties { return this.template; } + public Security getSecurity() { + return this.security; + } + private Map buildCommonProperties() { Map properties = new HashMap<>(); if (this.bootstrapServers != null) { @@ -152,6 +158,7 @@ public class KafkaProperties { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } properties.putAll(this.ssl.buildProperties()); + properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } @@ -217,6 +224,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Security security = new Security(); + /** * Frequency with which the consumer offsets are auto-committed to Kafka if * 'enable.auto.commit' is set to true. @@ -297,6 +306,10 @@ public class KafkaProperties { return this.ssl; } + public Security getSecurity() { + return this.security; + } + public Duration getAutoCommitInterval() { return this.autoCommitInterval; } @@ -426,7 +439,7 @@ public class KafkaProperties { map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); - return properties.with(this.ssl, this.properties); + return properties.with(this.ssl, this.security, this.properties); } } @@ -435,6 +448,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Security security = new Security(); + /** * Number of acknowledgments the producer requires the leader to have received * before considering a request complete. @@ -498,6 +513,10 @@ public class KafkaProperties { return this.ssl; } + public Security getSecurity() { + return this.security; + } + public String getAcks() { return this.acks; } @@ -595,7 +614,7 @@ public class KafkaProperties { map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - return properties.with(this.ssl, this.properties); + return properties.with(this.ssl, this.security, this.properties); } } @@ -604,6 +623,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Security security = new Security(); + /** * ID to pass to the server when making requests. Used for server-side logging. */ @@ -623,6 +644,10 @@ public class KafkaProperties { return this.ssl; } + public Security getSecurity() { + return this.security; + } + public String getClientId() { return this.clientId; } @@ -647,7 +672,7 @@ public class KafkaProperties { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); - return properties.with(this.ssl, this.properties); + return properties.with(this.ssl, this.security, this.properties); } } @@ -659,6 +684,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Security security = new Security(); + /** * Kafka streams application.id property; default spring.application.name. */ @@ -705,6 +732,10 @@ public class KafkaProperties { return this.ssl; } + public Security getSecurity() { + return this.security; + } + public String getApplicationId() { return this.applicationId; } @@ -775,7 +806,7 @@ public class KafkaProperties { map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG)); map.from(this::getReplicationFactor).to(properties.in("replication.factor")); map.from(this::getStateDir).to(properties.in("state.dir")); - return properties.with(this.ssl, this.properties); + return properties.with(this.ssl, this.security, this.properties); } } @@ -1167,6 +1198,30 @@ public class KafkaProperties { } + public static class Security { + + /** + * Security protocol used to communicate with brokers. + */ + private String protocol; + + public String getProtocol() { + return this.protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public Map buildProperties() { + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + return properties; + } + + } + @SuppressWarnings("serial") private static class Properties extends HashMap { @@ -1174,8 +1229,9 @@ public class KafkaProperties { return (value) -> put(key, value); } - Properties with(Ssl ssl, Map properties) { + Properties with(Ssl ssl, Security security, Map properties) { putAll(ssl.buildProperties()); + putAll(security.buildProperties()); putAll(properties); return this; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index fe2da15973..887bb4aab3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -25,6 +25,7 @@ import java.util.Properties; import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -106,6 +107,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB", "spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.isolation-level = read-committed", + "spring.kafka.consumer.security.protocol = SSL", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") .run((context) -> { @@ -137,6 +139,7 @@ class KafkaAutoConfigurationTests { assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed"); assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(LongDeserializer.class); + assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL"); assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerDeserializer.class); assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); @@ -156,7 +159,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.producer.buffer-memory=4KB", "spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.retries=2", "spring.kafka.producer.properties.fiz.buz=fix.fox", - "spring.kafka.producer.ssl.key-password=p4", + "spring.kafka.producer.security.protocol=SSL", "spring.kafka.producer.ssl.key-password=p4", "spring.kafka.producer.ssl.key-store-location=classpath:ksLocP", "spring.kafka.producer.ssl.key-store-password=p5", "spring.kafka.producer.ssl.key-store-type=PKCS12", "spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP", @@ -177,6 +180,7 @@ class KafkaAutoConfigurationTests { assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(4096L); assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class); + assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "ksLocP"); @@ -202,7 +206,7 @@ class KafkaAutoConfigurationTests { this.contextRunner .withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox", - "spring.kafka.admin.ssl.key-password=p4", + "spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-store-location=classpath:ksLocP", "spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12", "spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP", @@ -214,6 +218,7 @@ class KafkaAutoConfigurationTests { // common assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); // admin + assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "ksLocP"); @@ -240,7 +245,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cache-max-size-buffering=1KB", "spring.kafka.streams.client-id=override", "spring.kafka.streams.properties.fiz.buz=fix.fox", "spring.kafka.streams.replication-factor=2", "spring.kafka.streams.state-dir=/tmp/state", - "spring.kafka.streams.ssl.key-password=p7", + "spring.kafka.streams.security.protocol=SSL", "spring.kafka.streams.ssl.key-password=p7", "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", "spring.kafka.streams.ssl.key-store-password=p8", "spring.kafka.streams.ssl.key-store-type=PKCS12", "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", @@ -255,6 +260,7 @@ class KafkaAutoConfigurationTests { assertThat(configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)).isEqualTo(1024); assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)).isEqualTo("override"); assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).isEqualTo(2); + assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL"); assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)).isEqualTo("/tmp/state"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p7"); assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -569,6 +575,20 @@ class KafkaAutoConfigurationTests { }); } + @Test + void specificSecurityProtocolOverridesCommonSecurityProtocol() { + this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL", + "spring.kafka.admin.security.protocol=PLAINTEXT").run((context) -> { + DefaultKafkaProducerFactory producerFactory = context + .getBean(DefaultKafkaProducerFactory.class); + Map producerConfigs = producerFactory.getConfigurationProperties(); + assertThat(producerConfigs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL"); + KafkaAdmin admin = context.getBean(KafkaAdmin.class); + Map configs = admin.getConfig(); + assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("PLAINTEXT"); + }); + } + @Configuration(proxyBeanMethods = false) static class MessageConverterConfiguration {