Handle Kafka servers as strings and rely on Kafka's parsing

Closes gh-34770
pull/35165/head
Andy Wilkinson 2 years ago
parent dc4efaf276
commit b1d0433d74

@ -18,7 +18,6 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -190,8 +188,7 @@ public class KafkaAutoConfiguration {
private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties, private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) { KafkaConnectionDetails connectionDetails) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getConsumerBootstrapServers());
nodesToStringList(connectionDetails.getConsumerBootstrapNodes()));
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
} }
@ -199,8 +196,7 @@ public class KafkaAutoConfiguration {
private void applyKafkaConnectionDetailsForProducer(Map<String, Object> properties, private void applyKafkaConnectionDetailsForProducer(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) { KafkaConnectionDetails connectionDetails) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getProducerBootstrapServers());
nodesToStringList(connectionDetails.getProducerBootstrapNodes()));
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
} }
@ -208,17 +204,12 @@ public class KafkaAutoConfiguration {
private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties, private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) { KafkaConnectionDetails connectionDetails) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdminBootstrapNodes());
nodesToStringList(connectionDetails.getAdminBootstrapNodes()));
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
} }
} }
private List<String> nodesToStringList(List<Node> nodes) {
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
}
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
if (delay > 0) { if (delay > 0) {

@ -34,48 +34,38 @@ public interface KafkaConnectionDetails extends ConnectionDetails {
* Returns the list of bootstrap nodes. * Returns the list of bootstrap nodes.
* @return the list of bootstrap nodes * @return the list of bootstrap nodes
*/ */
List<Node> getBootstrapNodes(); List<String> getBootstrapServers();
/** /**
* Returns the list of bootstrap nodes used for consumers. * Returns the list of bootstrap nodes used for consumers.
* @return the list of bootstrap nodes used for consumers * @return the list of bootstrap nodes used for consumers
*/ */
default List<Node> getConsumerBootstrapNodes() { default List<String> getConsumerBootstrapServers() {
return getBootstrapNodes(); return getBootstrapServers();
} }
/** /**
* Returns the list of bootstrap nodes used for producers. * Returns the list of bootstrap nodes used for producers.
* @return the list of bootstrap nodes used for producers * @return the list of bootstrap nodes used for producers
*/ */
default List<Node> getProducerBootstrapNodes() { default List<String> getProducerBootstrapServers() {
return getBootstrapNodes(); return getBootstrapServers();
} }
/** /**
* Returns the list of bootstrap nodes used for the admin. * Returns the list of bootstrap nodes used for the admin.
* @return the list of bootstrap nodes used for the admin * @return the list of bootstrap nodes used for the admin
*/ */
default List<Node> getAdminBootstrapNodes() { default List<String> getAdminBootstrapNodes() {
return getBootstrapNodes(); return getBootstrapServers();
} }
/** /**
* Returns the list of bootstrap nodes used for Kafka Streams. * Returns the list of bootstrap nodes used for Kafka Streams.
* @return the list of bootstrap nodes used for Kafka Streams * @return the list of bootstrap nodes used for Kafka Streams
*/ */
default List<Node> getStreamsBootstrapNodes() { default List<String> getStreamsBootstrapServers() {
return getBootstrapNodes(); return getBootstrapServers();
}
/**
* A Kafka node.
*
* @param host the hostname
* @param port the port
*/
record Node(String host, int port) {
} }
} }

@ -16,7 +16,6 @@
package org.springframework.boot.autoconfigure.kafka; package org.springframework.boot.autoconfigure.kafka;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
@ -30,7 +29,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -87,17 +85,12 @@ class KafkaStreamsAnnotationDrivenConfiguration {
private void applyKafkaConnectionDetailsForStreams(Map<String, Object> properties, private void applyKafkaConnectionDetailsForStreams(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) { KafkaConnectionDetails connectionDetails) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getStreamsBootstrapServers());
nodesToStringList(connectionDetails.getStreamsBootstrapNodes()));
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
} }
} }
private List<String> nodesToStringList(List<Node> nodes) {
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
}
// Separate class required to avoid BeanCurrentlyInCreationException // Separate class required to avoid BeanCurrentlyInCreationException
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {

@ -27,8 +27,6 @@ import java.util.List;
*/ */
class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails { class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails {
private final int DEFAULT_PORT = 9092;
private final KafkaProperties properties; private final KafkaProperties properties;
PropertiesKafkaConnectionDetails(KafkaProperties properties) { PropertiesKafkaConnectionDetails(KafkaProperties properties) {
@ -36,40 +34,27 @@ class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails {
} }
@Override @Override
public List<Node> getBootstrapNodes() { public List<String> getBootstrapServers() {
return asNodes(this.properties.getBootstrapServers()); return this.properties.getBootstrapServers();
} }
@Override @Override
public List<Node> getConsumerBootstrapNodes() { public List<String> getConsumerBootstrapServers() {
return bootstrapNodes(this.properties.getConsumer().getBootstrapServers()); return getServers(this.properties.getConsumer().getBootstrapServers());
} }
@Override @Override
public List<Node> getProducerBootstrapNodes() { public List<String> getProducerBootstrapServers() {
return bootstrapNodes(this.properties.getProducer().getBootstrapServers()); return getServers(this.properties.getProducer().getBootstrapServers());
} }
@Override @Override
public List<Node> getStreamsBootstrapNodes() { public List<String> getStreamsBootstrapServers() {
return bootstrapNodes(this.properties.getStreams().getBootstrapServers()); return getServers(this.properties.getStreams().getBootstrapServers());
}
private List<Node> bootstrapNodes(List<String> bootstrapServers) {
return (bootstrapServers != null) ? asNodes(bootstrapServers) : getBootstrapNodes();
} }
private List<Node> asNodes(List<String> bootstrapServers) { private List<String> getServers(List<String> servers) {
return bootstrapServers.stream().map(this::asNode).toList(); return (servers != null) ? servers : getBootstrapServers();
}
private Node asNode(String bootstrapNode) {
int separatorIndex = bootstrapNode.indexOf(':');
if (separatorIndex == -1) {
return new Node(bootstrapNode, this.DEFAULT_PORT);
}
return new Node(bootstrapNode.substring(0, separatorIndex),
Integer.parseInt(bootstrapNode.substring(separatorIndex + 1)));
} }
} }

@ -839,8 +839,8 @@ class KafkaAutoConfigurationTests {
return new KafkaConnectionDetails() { return new KafkaConnectionDetails() {
@Override @Override
public List<Node> getBootstrapNodes() { public List<String> getBootstrapServers() {
return List.of(new Node("kafka.example.com", 12345)); return List.of("kafka.example.com:12345");
} }
}; };

@ -16,7 +16,6 @@
package org.springframework.boot.testcontainers.service.connection.kafka; package org.springframework.boot.testcontainers.service.connection.kafka;
import java.net.URI;
import java.util.List; import java.util.List;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;
@ -53,9 +52,8 @@ class KafkaContainerConnectionDetailsFactory
} }
@Override @Override
public List<Node> getBootstrapNodes() { public List<String> getBootstrapServers() {
URI uri = URI.create(getContainer().getBootstrapServers()); return List.of(getContainer().getBootstrapServers());
return List.of(new Node(uri.getHost(), uri.getPort()));
} }
} }

@ -16,7 +16,6 @@
package org.springframework.boot.testcontainers.service.connection.redpanda; package org.springframework.boot.testcontainers.service.connection.redpanda;
import java.net.URI;
import java.util.List; import java.util.List;
import org.testcontainers.redpanda.RedpandaContainer; import org.testcontainers.redpanda.RedpandaContainer;
@ -52,9 +51,8 @@ class RedpandaContainerConnectionDetailsFactory
} }
@Override @Override
public List<Node> getBootstrapNodes() { public List<String> getBootstrapServers() {
URI uri = URI.create(getContainer().getBootstrapServers()); return List.of(getContainer().getBootstrapServers());
return List.of(new Node(uri.getHost(), uri.getPort()));
} }
} }

Loading…
Cancel
Save