diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 5a4e430753..7f12d98ee1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -18,7 +18,6 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; import java.time.Duration; -import java.util.List; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; @@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; -import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -190,8 +188,7 @@ public class KafkaAutoConfiguration { private void applyKafkaConnectionDetailsForConsumer(Map properties, KafkaConnectionDetails connectionDetails) { - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - nodesToStringList(connectionDetails.getConsumerBootstrapNodes())); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getConsumerBootstrapServers()); if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); } @@ -199,8 +196,7 @@ public class KafkaAutoConfiguration { private void applyKafkaConnectionDetailsForProducer(Map properties, KafkaConnectionDetails connectionDetails) { - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - nodesToStringList(connectionDetails.getProducerBootstrapNodes())); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getProducerBootstrapServers()); if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); } @@ -208,17 +204,12 @@ public class KafkaAutoConfiguration { private void applyKafkaConnectionDetailsForAdmin(Map properties, KafkaConnectionDetails connectionDetails) { - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, - nodesToStringList(connectionDetails.getAdminBootstrapNodes())); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdminBootstrapNodes()); if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); } } - private List nodesToStringList(List nodes) { - return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList(); - } - private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; if (delay > 0) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java index 3e1421d445..cbcb535467 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java @@ -34,48 +34,38 @@ public interface KafkaConnectionDetails extends ConnectionDetails { * Returns the list of bootstrap nodes. * @return the list of bootstrap nodes */ - List getBootstrapNodes(); + List getBootstrapServers(); /** * Returns the list of bootstrap nodes used for consumers. * @return the list of bootstrap nodes used for consumers */ - default List getConsumerBootstrapNodes() { - return getBootstrapNodes(); + default List getConsumerBootstrapServers() { + return getBootstrapServers(); } /** * Returns the list of bootstrap nodes used for producers. * @return the list of bootstrap nodes used for producers */ - default List getProducerBootstrapNodes() { - return getBootstrapNodes(); + default List getProducerBootstrapServers() { + return getBootstrapServers(); } /** * Returns the list of bootstrap nodes used for the admin. * @return the list of bootstrap nodes used for the admin */ - default List getAdminBootstrapNodes() { - return getBootstrapNodes(); + default List getAdminBootstrapNodes() { + return getBootstrapServers(); } /** * Returns the list of bootstrap nodes used for Kafka Streams. * @return the list of bootstrap nodes used for Kafka Streams */ - default List getStreamsBootstrapNodes() { - return getBootstrapNodes(); - } - - /** - * A Kafka node. - * - * @param host the hostname - * @param port the port - */ - record Node(String host, int port) { - + default List getStreamsBootstrapServers() { + return getBootstrapServers(); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 5f425c6d1f..28e35d3699 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -16,7 +16,6 @@ package org.springframework.boot.autoconfigure.kafka; -import java.util.List; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; @@ -30,7 +29,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node; import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -87,17 +85,12 @@ class KafkaStreamsAnnotationDrivenConfiguration { private void applyKafkaConnectionDetailsForStreams(Map properties, KafkaConnectionDetails connectionDetails) { - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - nodesToStringList(connectionDetails.getStreamsBootstrapNodes())); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getStreamsBootstrapServers()); if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); } } - private List nodesToStringList(List nodes) { - return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList(); - } - // Separate class required to avoid BeanCurrentlyInCreationException static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java index 83152cd917..26ea6ad67f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java @@ -27,8 +27,6 @@ import java.util.List; */ class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails { - private final int DEFAULT_PORT = 9092; - private final KafkaProperties properties; PropertiesKafkaConnectionDetails(KafkaProperties properties) { @@ -36,40 +34,27 @@ class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails { } @Override - public List getBootstrapNodes() { - return asNodes(this.properties.getBootstrapServers()); + public List getBootstrapServers() { + return this.properties.getBootstrapServers(); } @Override - public List getConsumerBootstrapNodes() { - return bootstrapNodes(this.properties.getConsumer().getBootstrapServers()); + public List getConsumerBootstrapServers() { + return getServers(this.properties.getConsumer().getBootstrapServers()); } @Override - public List getProducerBootstrapNodes() { - return bootstrapNodes(this.properties.getProducer().getBootstrapServers()); + public List getProducerBootstrapServers() { + return getServers(this.properties.getProducer().getBootstrapServers()); } @Override - public List getStreamsBootstrapNodes() { - return bootstrapNodes(this.properties.getStreams().getBootstrapServers()); - } - - private List bootstrapNodes(List bootstrapServers) { - return (bootstrapServers != null) ? asNodes(bootstrapServers) : getBootstrapNodes(); - } - - private List asNodes(List bootstrapServers) { - return bootstrapServers.stream().map(this::asNode).toList(); + public List getStreamsBootstrapServers() { + return getServers(this.properties.getStreams().getBootstrapServers()); } - private Node asNode(String bootstrapNode) { - int separatorIndex = bootstrapNode.indexOf(':'); - if (separatorIndex == -1) { - return new Node(bootstrapNode, this.DEFAULT_PORT); - } - return new Node(bootstrapNode.substring(0, separatorIndex), - Integer.parseInt(bootstrapNode.substring(separatorIndex + 1))); + private List getServers(List servers) { + return (servers != null) ? servers : getBootstrapServers(); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 0d561322fe..f274a3b517 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -839,8 +839,8 @@ class KafkaAutoConfigurationTests { return new KafkaConnectionDetails() { @Override - public List getBootstrapNodes() { - return List.of(new Node("kafka.example.com", 12345)); + public List getBootstrapServers() { + return List.of("kafka.example.com:12345"); } }; diff --git a/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/kafka/KafkaContainerConnectionDetailsFactory.java b/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/kafka/KafkaContainerConnectionDetailsFactory.java index 9fd8c74526..1f7b7f89a2 100644 --- a/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/kafka/KafkaContainerConnectionDetailsFactory.java +++ b/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/kafka/KafkaContainerConnectionDetailsFactory.java @@ -16,7 +16,6 @@ package org.springframework.boot.testcontainers.service.connection.kafka; -import java.net.URI; import java.util.List; import org.testcontainers.containers.KafkaContainer; @@ -53,9 +52,8 @@ class KafkaContainerConnectionDetailsFactory } @Override - public List getBootstrapNodes() { - URI uri = URI.create(getContainer().getBootstrapServers()); - return List.of(new Node(uri.getHost(), uri.getPort())); + public List getBootstrapServers() { + return List.of(getContainer().getBootstrapServers()); } } diff --git a/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/redpanda/RedpandaContainerConnectionDetailsFactory.java b/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/redpanda/RedpandaContainerConnectionDetailsFactory.java index 846b235987..48ec53bec1 100644 --- a/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/redpanda/RedpandaContainerConnectionDetailsFactory.java +++ b/spring-boot-project/spring-boot-testcontainers/src/main/java/org/springframework/boot/testcontainers/service/connection/redpanda/RedpandaContainerConnectionDetailsFactory.java @@ -16,7 +16,6 @@ package org.springframework.boot.testcontainers.service.connection.redpanda; -import java.net.URI; import java.util.List; import org.testcontainers.redpanda.RedpandaContainer; @@ -52,9 +51,8 @@ class RedpandaContainerConnectionDetailsFactory } @Override - public List getBootstrapNodes() { - URI uri = URI.create(getContainer().getBootstrapServers()); - return List.of(new Node(uri.getHost(), uri.getPort())); + public List getBootstrapServers() { + return List.of(getContainer().getBootstrapServers()); } }