Migrate KafkaProperties to use PropertyMapper

pull/13076/merge
Phillip Webb 7 years ago
parent 469372c5ef
commit f9750248ef

@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.convert.DurationUnit; import org.springframework.boot.convert.DurationUnit;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
@ -192,16 +193,6 @@ public class KafkaProperties {
return properties; return properties;
} }
private static String resourceToPath(Resource resource) {
try {
return resource.getFile().getAbsolutePath();
}
catch (IOException ex) {
throw new IllegalStateException(
"Resource '" + resource + "' must be on a file system", ex);
}
}
public static class Consumer { public static class Consumer {
private final Ssl ssl = new Ssl(); private final Ssl ssl = new Ssl();
@ -382,55 +373,32 @@ public class KafkaProperties {
} }
public Map<String, Object> buildProperties() { public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>(); Properties properties = new Properties();
if (this.autoCommitInterval != null) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, map.from(this::getAutoCommitInterval).asInt(Duration::toMillis)
(int) this.autoCommitInterval.toMillis()); .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
} map.from(this::getAutoOffsetReset)
if (this.autoOffsetReset != null) { .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, map.from(this::getBootstrapServers)
this.autoOffsetReset); .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
} map.from(this::getClientId)
if (this.bootstrapServers != null) { .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, map.from(this::getEnableAutoCommit)
this.bootstrapServers); .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
} map.from(this::getFetchMaxWait).asInt(Duration::toMillis)
if (this.clientId != null) { .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG));
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); map.from(this::getFetchMinSize)
} .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG));
if (this.enableAutoCommit != null) { map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
this.enableAutoCommit); .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
} map.from(this::getKeyDeserializer)
if (this.fetchMaxWait != null) { .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, map.from(this::getValueDeserializer)
(int) this.fetchMaxWait.toMillis()); .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
} map.from(this::getMaxPollRecords)
if (this.fetchMinSize != null) { .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); return properties.with(this.ssl, this.properties);
}
if (this.groupId != null) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
}
if (this.heartbeatInterval != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
(int) this.heartbeatInterval.toMillis());
}
if (this.keyDeserializer != null) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
this.keyDeserializer);
}
if (this.valueDeserializer != null) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
}
if (this.maxPollRecords != null) {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.maxPollRecords);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
} }
} }
@ -586,41 +554,25 @@ public class KafkaProperties {
} }
public Map<String, Object> buildProperties() { public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>(); Properties properties = new Properties();
if (this.acks != null) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
properties.put(ProducerConfig.ACKS_CONFIG, this.acks); map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG));
} map.from(this::getBatchSize)
if (this.batchSize != null) { .to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG));
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); map.from(this::getBootstrapServers)
} .to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (this.bootstrapServers != null) { map.from(this::getBufferMemory)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, .to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG));
this.bootstrapServers); map.from(this::getClientId)
} .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
if (this.bufferMemory != null) { map.from(this::getCompressionType)
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); .to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG));
} map.from(this::getKeySerializer)
if (this.clientId != null) { .to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
} map.from(this::getValueSerializer)
if (this.compressionType != null) { .to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, return properties.with(this.ssl, this.properties);
this.compressionType);
}
if (this.keySerializer != null) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
this.keySerializer);
}
if (this.retries != null) {
properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
}
if (this.valueSerializer != null) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.properties);
return properties;
} }
} }
@ -669,13 +621,11 @@ public class KafkaProperties {
} }
public Map<String, Object> buildProperties() { public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>(); Properties properties = new Properties();
if (this.clientId != null) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); map.from(this::getClientId)
} .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
properties.putAll(this.ssl.buildProperties()); return properties.with(this.ssl, this.properties);
properties.putAll(this.properties);
return properties;
} }
} }
@ -969,40 +919,35 @@ public class KafkaProperties {
} }
public Map<String, Object> buildProperties() { public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>(); Properties properties = new Properties();
if (this.getKeyPassword() != null) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword()); map.from(this::getKeyPassword)
} .to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
if (this.getKeystoreLocation() != null) { map.from(this::getKeystoreLocation).as(this::resourceToPath)
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
resourceToPath(this.getKeystoreLocation())); map.from(this::getKeystorePassword)
} .to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
if (this.getKeystorePassword() != null) { map.from(this::getKeyStoreType)
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, .to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
this.getKeystorePassword()); map.from(this::getTruststoreLocation).as(this::resourceToPath)
} .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
if (this.getKeyStoreType() != null) { map.from(this::getTruststorePassword)
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, .to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
this.getKeyStoreType()); map.from(this::getTrustStoreType)
} .to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
if (this.getTruststoreLocation() != null) { map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, return properties;
resourceToPath(this.getTruststoreLocation()));
}
if (this.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.getTruststorePassword());
} }
if (this.getTrustStoreType() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, private String resourceToPath(Resource resource) {
this.getTrustStoreType()); try {
return resource.getFile().getAbsolutePath();
} }
if (this.getProtocol() != null) { catch (IOException ex) {
properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol()); throw new IllegalStateException(
"Resource '" + resource + "' must be on a file system", ex);
} }
return properties;
} }
} }
public static class Jaas { public static class Jaas {
@ -1064,4 +1009,18 @@ public class KafkaProperties {
} }
private static class Properties extends HashMap<String, Object> {
public <V> java.util.function.Consumer<V> in(String key) {
return (value) -> put(key, value);
}
public Properties with(Ssl ssl, Map<String, String> properties) {
putAll(ssl.buildProperties());
putAll(properties);
return this;
}
}
} }

Loading…
Cancel
Save