Complete SSL configuration for Kafka

See gh-13031
pull/13061/head
Toshiaki Maki 7 years ago committed by Stephane Nicoll
parent f5fabbc1b9
commit d208641c59

@ -143,25 +143,7 @@ public class KafkaProperties {
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
properties.putAll(this.ssl.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
@ -438,26 +420,6 @@ public class KafkaProperties {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
this.keyDeserializer);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
@ -466,6 +428,7 @@ public class KafkaProperties {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.maxPollRecords);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
@ -651,30 +614,11 @@ public class KafkaProperties {
if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
@ -729,26 +673,7 @@ public class KafkaProperties {
if (this.clientId != null) {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
}
@ -954,6 +879,11 @@ public class KafkaProperties {
*/
private String keystorePassword;
/**
* Type of the key store.
*/
private String keyStoreType;
/**
* Location of the trust store file.
*/
@ -964,6 +894,16 @@ public class KafkaProperties {
*/
private String truststorePassword;
/**
* Type of the trust store.
*/
private String trustStoreType;
/**
* SSL protocol to use.
*/
private String protocol;
public String getKeyPassword() {
return this.keyPassword;
}
@ -988,6 +928,14 @@ public class KafkaProperties {
this.keystorePassword = keystorePassword;
}
public String getKeyStoreType() {
return this.keyStoreType;
}
public void setKeyStoreType(String keyStoreType) {
this.keyStoreType = keyStoreType;
}
public Resource getTruststoreLocation() {
return this.truststoreLocation;
}
@ -1004,6 +952,56 @@ public class KafkaProperties {
this.truststorePassword = truststorePassword;
}
public String getTrustStoreType() {
return this.trustStoreType;
}
public void setTrustStoreType(String trustStoreType) {
this.trustStoreType = trustStoreType;
}
public String getProtocol() {
return this.protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword());
}
if (this.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.getKeystoreLocation()));
}
if (this.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.getKeystorePassword());
}
if (this.getKeyStoreType() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
this.getKeyStoreType());
}
if (this.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.getTruststoreLocation()));
}
if (this.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.getTruststorePassword());
}
if (this.getTrustStoreType() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
this.getTrustStoreType());
}
if (this.getProtocol() != null) {
properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol());
}
return properties;
}
}
public static class Jaas {

@ -77,8 +77,11 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.keystore-type=PKCS12",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.ssl.truststore-type=PKCS12",
"spring.kafka.ssl.protocol=TLSv1.2",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
@ -106,11 +109,17 @@ public class KafkaAutoConfigurationTests {
.endsWith(File.separator + "ksLoc");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p2");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p3");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
// consumer
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG))
.isEqualTo("ccid"); // override
@ -159,8 +168,11 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.keystore-type=PKCS12",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.ssl.truststore-type=PKCS12",
"spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
.run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context
@ -189,11 +201,17 @@ public class KafkaAutoConfigurationTests {
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p5");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class);
@ -209,17 +227,18 @@ public class KafkaAutoConfigurationTests {
@Test
public void adminProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6")
.run((context) -> {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.keystore-type=PKCS12",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6",
"spring.kafka.admin.ssl.truststore-type=PKCS12",
"spring.kafka.admin.ssl.protocol=TLSv1.2").run((context) -> {
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfig();
// common
@ -233,11 +252,17 @@ public class KafkaAutoConfigurationTests {
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p5");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();

@ -997,8 +997,11 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.keystore-location= # Location of the key store file.
spring.kafka.admin.ssl.keystore-password= # Store password for the key store file.
spring.kafka.admin.ssl.keystore-type= # Type of the key store.
spring.kafka.admin.ssl.truststore-location= # Location of the trust store file.
spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.admin.ssl.truststore-type= # Type of the trust store.
spring.kafka.admin.ssl.protocol= # SSL protocol to use.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
@ -1016,8 +1019,11 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.consumer.ssl.keystore-type= # Type of the key store.
spring.kafka.consumer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.consumer.ssl.truststore-type= # Type of the trust store.
spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
@ -1046,16 +1052,22 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.keystore-type= # Type of the key store.
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.ssl.truststore-type= # Type of the trust store.
spring.kafka.producer.ssl.protocol= # SSL protocol to use.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.keystore-type= # Type of the key store.
spring.kafka.ssl.truststore-location= # Location of the trust store file.
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.ssl.truststore-type= # Type of the trust store.
spring.kafka.ssl.protocol= # SSL protocol to use.
spring.kafka.template.default-topic= # Default topic to which messages are sent.
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])

Loading…
Cancel
Save