From c4cfc4dd0c773ed12c58d074a55917dfc3443703 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 10 May 2017 13:18:59 -0400 Subject: [PATCH] Add Kafka Kerberos Configuration Properties See gh-9151 --- .../kafka/KafkaAutoConfiguration.java | 21 ++++++ .../autoconfigure/kafka/KafkaProperties.java | 66 +++++++++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 17 ++++- .../appendix-application-properties.adoc | 4 ++ 4 files changed, 107 insertions(+), 1 deletion(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index be2481c38b..c99188757e 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -16,9 +16,13 @@ package org.springframework.boot.autoconfigure.kafka; +import java.io.IOException; + import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -81,4 +86,20 @@ public class KafkaAutoConfiguration { this.properties.buildProducerProperties()); } + @Bean + @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") + @ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class) + public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { + KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); + Jaas jaasProperties = this.properties.getJaas(); + if (jaasProperties.getControlFlag() != null) { + jaas.setControlFlag(jaasProperties.getControlFlag()); + } + if (jaasProperties.getLoginModule() != null) { + jaas.setLoginModule(jaasProperties.getLoginModule()); + } + jaas.setOptions(jaasProperties.getOptions()); + return jaas; + } + } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 239d11344e..aac811ab5c 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; /** @@ -74,6 +75,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Jaas jaas = new Jaas(); + private final Template template = new Template(); public List getBootstrapServers() { @@ -116,6 +119,10 @@ public class KafkaProperties { return this.ssl; } + public Jaas getJaas() { + return this.jaas; + } + public Template getTemplate() { return this.template; } @@ -776,4 +783,63 @@ public class KafkaProperties { } + public static class Jaas { + + /** + * Enable JAAS configuration. + */ + private boolean enabled; + + /** + * Login module. + */ + private String loginModule; + + /** + * AppConfigurationEntry.LoginModuleControlFlag value. + */ + private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = + KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; + + /** + * Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. + */ + private final Map options = new HashMap<>(); + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getLoginModule() { + return this.loginModule; + } + + public void setLoginModule(String loginModule) { + this.loginModule = loginModule; + } + + public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() { + return this.controlFlag; + } + + public void setControlFlag(KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) { + this.controlFlag = controlFlag; + } + + public Map getOptions() { + return this.options; + } + + public void setOptions(Map options) { + if (options != null) { + this.options.putAll(options); + } + } + + } + } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 5dbb753df6..3fbb699406 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.Collections; import java.util.Map; +import javax.security.auth.login.AppConfigurationEntry; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; @@ -38,6 +40,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import static org.assertj.core.api.Assertions.assertThat; @@ -160,6 +163,7 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerSerializer.class); + assertThat(this.context.containsBean("kafkaJaasInitializer")).isFalse(); } @Test @@ -169,7 +173,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000"); + "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.jaas.enabled=true", + "spring.kafka.jaas.login-module=foo", + "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true"); DefaultKafkaProducerFactory producerFactory = this.context .getBean(DefaultKafkaProducerFactory.class); DefaultKafkaConsumerFactory consumerFactory = this.context @@ -189,6 +197,13 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(this.context.containsBean("kafkaJaasInitializer")).isTrue(); + KafkaJaasLoginModuleInitializer jaas = this.context.getBean(KafkaJaasLoginModuleInitializer.class); + dfa = new DirectFieldAccessor(jaas); + assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); + assertThat(dfa.getPropertyValue("controlFlag")) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat(((Map) dfa.getPropertyValue("options")).get("useKeyTab")).isEqualTo("true"); } private void load(String... environment) { diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f5c55b3910..af61320f85 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. + spring.kafka.jaas.control-flag=REQUIRED # AppConfigurationEntry.LoginModuleControlFlag value. + spring.kafka.jaas.enabled= # Enable JAAS configuration. + spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. + spring.kafka.jaas.options= # Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".