From f9750248ef3e7733bdbf2cc4e32b3ebb18fa4904 Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Fri, 4 May 2018 19:41:46 -0700 Subject: [PATCH] Migrate KafkaProperties to use PropertyMapper --- .../autoconfigure/kafka/KafkaProperties.java | 223 +++++++----------- 1 file changed, 91 insertions(+), 132 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 1ee2a8133f..f5b71f5351 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; @@ -192,16 +193,6 @@ public class KafkaProperties { return properties; } - private static String resourceToPath(Resource resource) { - try { - return resource.getFile().getAbsolutePath(); - } - catch (IOException ex) { - throw new IllegalStateException( - "Resource '" + resource + "' must be on a file system", ex); - } - } - public static class Consumer { private final Ssl ssl = new Ssl(); @@ -382,55 +373,32 @@ public class KafkaProperties { } public Map buildProperties() { - Map properties = new HashMap<>(); - if (this.autoCommitInterval != null) { - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - (int) this.autoCommitInterval.toMillis()); - } - if (this.autoOffsetReset != null) { - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.autoOffsetReset); - } - if (this.bootstrapServers != null) { - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.bootstrapServers); - } - if (this.clientId != null) { - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); - } - if (this.enableAutoCommit != null) { - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.enableAutoCommit); - } - if (this.fetchMaxWait != null) { - properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, - (int) this.fetchMaxWait.toMillis()); - } - if (this.fetchMinSize != null) { - properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); - } - if (this.groupId != null) { - properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); - } - if (this.heartbeatInterval != null) { - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, - (int) this.heartbeatInterval.toMillis()); - } - if (this.keyDeserializer != null) { - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - this.keyDeserializer); - } - if (this.valueDeserializer != null) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - this.valueDeserializer); - } - if (this.maxPollRecords != null) { - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - this.maxPollRecords); - } - properties.putAll(this.ssl.buildProperties()); - properties.putAll(this.properties); - return properties; + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getAutoCommitInterval).asInt(Duration::toMillis) + .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + map.from(this::getAutoOffsetReset) + .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + map.from(this::getBootstrapServers) + .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + map.from(this::getClientId) + .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG)); + map.from(this::getEnableAutoCommit) + .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + map.from(this::getFetchMaxWait).asInt(Duration::toMillis) + .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)); + map.from(this::getFetchMinSize) + .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)); + map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); + map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) + .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); + map.from(this::getKeyDeserializer) + .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + map.from(this::getValueDeserializer) + .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + map.from(this::getMaxPollRecords) + .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); + return properties.with(this.ssl, this.properties); } } @@ -586,41 +554,25 @@ public class KafkaProperties { } public Map buildProperties() { - Map properties = new HashMap<>(); - if (this.acks != null) { - properties.put(ProducerConfig.ACKS_CONFIG, this.acks); - } - if (this.batchSize != null) { - properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); - } - if (this.bootstrapServers != null) { - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.bootstrapServers); - } - if (this.bufferMemory != null) { - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); - } - if (this.clientId != null) { - properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); - } - if (this.compressionType != null) { - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, - this.compressionType); - } - if (this.keySerializer != null) { - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - this.keySerializer); - } - if (this.retries != null) { - properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); - } - if (this.valueSerializer != null) { - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - this.valueSerializer); - } - properties.putAll(this.ssl.buildProperties()); - properties.putAll(this.properties); - return properties; + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG)); + map.from(this::getBatchSize) + .to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG)); + map.from(this::getBootstrapServers) + .to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + map.from(this::getBufferMemory) + .to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG)); + map.from(this::getClientId) + .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); + map.from(this::getCompressionType) + .to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + map.from(this::getKeySerializer) + .to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); + map.from(this::getValueSerializer) + .to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); + return properties.with(this.ssl, this.properties); } } @@ -669,13 +621,11 @@ public class KafkaProperties { } public Map buildProperties() { - Map properties = new HashMap<>(); - if (this.clientId != null) { - properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); - } - properties.putAll(this.ssl.buildProperties()); - properties.putAll(this.properties); - return properties; + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getClientId) + .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); + return properties.with(this.ssl, this.properties); } } @@ -969,40 +919,35 @@ public class KafkaProperties { } public Map buildProperties() { - Map properties = new HashMap<>(); - if (this.getKeyPassword() != null) { - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword()); - } - if (this.getKeystoreLocation() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, - resourceToPath(this.getKeystoreLocation())); - } - if (this.getKeystorePassword() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - this.getKeystorePassword()); - } - if (this.getKeyStoreType() != null) { - properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, - this.getKeyStoreType()); - } - if (this.getTruststoreLocation() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - resourceToPath(this.getTruststoreLocation())); - } - if (this.getTruststorePassword() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, - this.getTruststorePassword()); - } - if (this.getTrustStoreType() != null) { - properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, - this.getTrustStoreType()); - } - if (this.getProtocol() != null) { - properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol()); - } + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getKeyPassword) + .to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); + map.from(this::getKeystoreLocation).as(this::resourceToPath) + .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + map.from(this::getKeystorePassword) + .to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + map.from(this::getKeyStoreType) + .to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + map.from(this::getTruststoreLocation).as(this::resourceToPath) + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + map.from(this::getTruststorePassword) + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + map.from(this::getTrustStoreType) + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); return properties; } + private String resourceToPath(Resource resource) { + try { + return resource.getFile().getAbsolutePath(); + } + catch (IOException ex) { + throw new IllegalStateException( + "Resource '" + resource + "' must be on a file system", ex); + } + } } public static class Jaas { @@ -1064,4 +1009,18 @@ public class KafkaProperties { } + private static class Properties extends HashMap { + + public java.util.function.Consumer in(String key) { + return (value) -> put(key, value); + } + + public Properties with(Ssl ssl, Map properties) { + putAll(ssl.buildProperties()); + putAll(properties); + return this; + } + + } + }