From d208641c59c599e24e2dc29f565f30d5d0ea34f3 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Thu, 3 May 2018 02:08:44 +0900 Subject: [PATCH] Complete SSL configuration for Kafka See gh-13031 --- .../autoconfigure/kafka/KafkaProperties.java | 156 +++++++++--------- .../kafka/KafkaAutoConfigurationTests.java | 47 ++++-- .../appendix-application-properties.adoc | 12 ++ 3 files changed, 125 insertions(+), 90 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 4dce8a9526..5f45d4d154 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -143,25 +143,7 @@ public class KafkaProperties { if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } - if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); - } - if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getKeystoreLocation())); - } - if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - this.ssl.getKeystorePassword()); - } - if (this.ssl.getTruststoreLocation() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getTruststoreLocation())); - } - if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - this.ssl.getTruststorePassword()); - } + properties.putAll(this.ssl.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } @@ -438,26 +420,6 @@ public class KafkaProperties { properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer); } - if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, - this.ssl.getKeyPassword()); - } - if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getKeystoreLocation())); - } - if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - this.ssl.getKeystorePassword()); - } - if (this.ssl.getTruststoreLocation() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getTruststoreLocation())); - } - if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - this.ssl.getTruststorePassword()); - } if (this.valueDeserializer != null) { properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer); @@ -466,6 +428,7 @@ public class KafkaProperties { properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords); } + properties.putAll(this.ssl.buildProperties()); properties.putAll(this.properties); return properties; } @@ -651,30 +614,11 @@ public class KafkaProperties { if (this.retries != null) { properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); } - if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, - this.ssl.getKeyPassword()); - } - if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getKeystoreLocation())); - } - if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - this.ssl.getKeystorePassword()); - } - if (this.ssl.getTruststoreLocation() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getTruststoreLocation())); - } - if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - this.ssl.getTruststorePassword()); - } if (this.valueSerializer != null) { properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer); } + properties.putAll(this.ssl.buildProperties()); properties.putAll(this.properties); return properties; } @@ -729,26 +673,7 @@ public class KafkaProperties { if (this.clientId != null) { properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); } - if (this.ssl.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, - this.ssl.getKeyPassword()); - } - if (this.ssl.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getKeystoreLocation())); - } - if (this.ssl.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - this.ssl.getKeystorePassword()); - } - if (this.ssl.getTruststoreLocation() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - resourceToPath(this.ssl.getTruststoreLocation())); - } - if (this.ssl.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - this.ssl.getTruststorePassword()); - } + properties.putAll(this.ssl.buildProperties()); properties.putAll(this.properties); return properties; } @@ -954,6 +879,11 @@ public class KafkaProperties { */ private String keystorePassword; + /** + * Type of the key store. + */ + private String keyStoreType; + /** * Location of the trust store file. */ @@ -964,6 +894,16 @@ public class KafkaProperties { */ private String truststorePassword; + /** + * Type of the trust store. + */ + private String trustStoreType; + + /** + * SSL protocol to use. + */ + private String protocol; + public String getKeyPassword() { return this.keyPassword; } @@ -988,6 +928,14 @@ public class KafkaProperties { this.keystorePassword = keystorePassword; } + public String getKeyStoreType() { + return this.keyStoreType; + } + + public void setKeyStoreType(String keyStoreType) { + this.keyStoreType = keyStoreType; + } + public Resource getTruststoreLocation() { return this.truststoreLocation; } @@ -1004,6 +952,56 @@ public class KafkaProperties { this.truststorePassword = truststorePassword; } + public String getTrustStoreType() { + return this.trustStoreType; + } + + public void setTrustStoreType(String trustStoreType) { + this.trustStoreType = trustStoreType; + } + + public String getProtocol() { + return this.protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public Map buildProperties() { + Map properties = new HashMap<>(); + if (this.getKeyPassword() != null) { + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword()); + } + if (this.getKeystoreLocation() != null) { + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + resourceToPath(this.getKeystoreLocation())); + } + if (this.getKeystorePassword() != null) { + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + this.getKeystorePassword()); + } + if (this.getKeyStoreType() != null) { + properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, + this.getKeyStoreType()); + } + if (this.getTruststoreLocation() != null) { + properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + resourceToPath(this.getTruststoreLocation())); + } + if (this.getTruststorePassword() != null) { + properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + this.getTruststorePassword()); + } + if (this.getTrustStoreType() != null) { + properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, + this.getTrustStoreType()); + } + if (this.getProtocol() != null) { + properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol()); + } + return properties; + } } public static class Jaas { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index e716e8ab70..3f061bedb0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -77,8 +77,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.ssl.key-password=p1", "spring.kafka.ssl.keystore-location=classpath:ksLoc", "spring.kafka.ssl.keystore-password=p2", + "spring.kafka.ssl.keystore-type=PKCS12", "spring.kafka.ssl.truststore-location=classpath:tsLoc", "spring.kafka.ssl.truststore-password=p3", + "spring.kafka.ssl.truststore-type=PKCS12", + "spring.kafka.ssl.protocol=TLSv1.2", "spring.kafka.consumer.auto-commit-interval=123", "spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.auto-offset-reset=earliest", @@ -106,11 +109,17 @@ public class KafkaAutoConfigurationTests { .endsWith(File.separator + "ksLoc"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) .isEqualTo("p2"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); assertThat((String) configs .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "tsLoc"); assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) .isEqualTo("p3"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); + assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)) + .isEqualTo("TLSv1.2"); // consumer assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)) .isEqualTo("ccid"); // override @@ -159,8 +168,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.producer.ssl.key-password=p4", "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP", "spring.kafka.producer.ssl.keystore-password=p5", + "spring.kafka.producer.ssl.keystore-type=PKCS12", "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", "spring.kafka.producer.ssl.truststore-password=p6", + "spring.kafka.producer.ssl.truststore-type=PKCS12", + "spring.kafka.producer.ssl.protocol=TLSv1.2", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer") .run((context) -> { DefaultKafkaProducerFactory producerFactory = context @@ -189,11 +201,17 @@ public class KafkaAutoConfigurationTests { .endsWith(File.separator + "ksLocP"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) .isEqualTo("p5"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); assertThat((String) configs .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "tsLocP"); assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) .isEqualTo("p6"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); + assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)) + .isEqualTo("TLSv1.2"); assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerSerializer.class); @@ -209,17 +227,18 @@ public class KafkaAutoConfigurationTests { @Test public void adminProperties() { - this.contextRunner - .withPropertyValues("spring.kafka.clientId=cid", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.admin.fail-fast=true", - "spring.kafka.admin.properties.fiz.buz=fix.fox", - "spring.kafka.admin.ssl.key-password=p4", - "spring.kafka.admin.ssl.keystore-location=classpath:ksLocP", - "spring.kafka.admin.ssl.keystore-password=p5", - "spring.kafka.admin.ssl.truststore-location=classpath:tsLocP", - "spring.kafka.admin.ssl.truststore-password=p6") - .run((context) -> { + this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.admin.fail-fast=true", + "spring.kafka.admin.properties.fiz.buz=fix.fox", + "spring.kafka.admin.ssl.key-password=p4", + "spring.kafka.admin.ssl.keystore-location=classpath:ksLocP", + "spring.kafka.admin.ssl.keystore-password=p5", + "spring.kafka.admin.ssl.keystore-type=PKCS12", + "spring.kafka.admin.ssl.truststore-location=classpath:tsLocP", + "spring.kafka.admin.ssl.truststore-password=p6", + "spring.kafka.admin.ssl.truststore-type=PKCS12", + "spring.kafka.admin.ssl.protocol=TLSv1.2").run((context) -> { KafkaAdmin admin = context.getBean(KafkaAdmin.class); Map configs = admin.getConfig(); // common @@ -233,11 +252,17 @@ public class KafkaAutoConfigurationTests { .endsWith(File.separator + "ksLocP"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) .isEqualTo("p5"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); assertThat((String) configs .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .endsWith(File.separator + "tsLocP"); assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) .isEqualTo("p6"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)) + .isEqualTo("PKCS12"); + assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG)) + .isEqualTo("TLSv1.2"); assertThat( context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .isEmpty(); diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 765cddb662..8d20b3fbb7 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -997,8 +997,11 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file. spring.kafka.admin.ssl.keystore-location= # Location of the key store file. spring.kafka.admin.ssl.keystore-password= # Store password for the key store file. + spring.kafka.admin.ssl.keystore-type= # Type of the key store. spring.kafka.admin.ssl.truststore-location= # Location of the trust store file. spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file. + spring.kafka.admin.ssl.truststore-type= # Type of the trust store. + spring.kafka.admin.ssl.protocol= # SSL protocol to use. spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. @@ -1016,8 +1019,11 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.consumer.ssl.keystore-location= # Location of the key store file. spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file. + spring.kafka.consumer.ssl.keystore-type= # Type of the key store. spring.kafka.consumer.ssl.truststore-location= # Location of the trust store file. spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file. + spring.kafka.consumer.ssl.truststore-type= # Type of the trust store. + spring.kafka.consumer.ssl.protocol= # SSL protocol to use. spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.jaas.control-flag=required # Control flag for login configuration. spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration. @@ -1046,16 +1052,22 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.producer.ssl.keystore-location= # Location of the key store file. spring.kafka.producer.ssl.keystore-password= # Store password for the key store file. + spring.kafka.producer.ssl.keystore-type= # Type of the key store. spring.kafka.producer.ssl.truststore-location= # Location of the trust store file. spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file. + spring.kafka.producer.ssl.truststore-type= # Type of the trust store. + spring.kafka.producer.ssl.protocol= # SSL protocol to use. spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer. spring.kafka.producer.value-serializer= # Serializer class for values. spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.keystore-location= # Location of the key store file. spring.kafka.ssl.keystore-password= # Store password for the key store file. + spring.kafka.ssl.keystore-type= # Type of the key store. spring.kafka.ssl.truststore-location= # Location of the trust store file. spring.kafka.ssl.truststore-password= # Store password for the trust store file. + spring.kafka.ssl.truststore-type= # Type of the trust store. + spring.kafka.ssl.protocol= # SSL protocol to use. spring.kafka.template.default-topic= # Default topic to which messages are sent. # RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])