diff --git a/spring-boot-project/spring-boot-autoconfigure/build.gradle b/spring-boot-project/spring-boot-autoconfigure/build.gradle
index a4717f29c5..5c19a1adb4 100644
--- a/spring-boot-project/spring-boot-autoconfigure/build.gradle
+++ b/spring-boot-project/spring-boot-autoconfigure/build.gradle
@@ -142,6 +142,7 @@ dependencies {
optional("org.springframework.session:spring-session-hazelcast")
optional("org.springframework.session:spring-session-jdbc")
optional("org.springframework.amqp:spring-rabbit")
+ optional("org.springframework.amqp:spring-rabbit-stream")
optional("org.springframework.kafka:spring-kafka")
optional("org.springframework.ws:spring-ws-core")
optional("org.thymeleaf:thymeleaf")
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java
index 16d46758a7..e1f68c7f3b 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java
@@ -88,7 +88,7 @@ import org.springframework.core.io.ResourceLoader;
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
-@Import(RabbitAnnotationDrivenConfiguration.class)
+@Import({ RabbitAnnotationDrivenConfiguration.class, RabbitStreamConfiguration.class })
public class RabbitAutoConfiguration {
@Configuration(proxyBeanMethods = false)
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
index 0a1f3ee57d..80d140f60c 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
@@ -51,6 +51,8 @@ public class RabbitProperties {
private static final int DEFAULT_PORT_SECURE = 5671;
+ private static final int DEFAULT_STREAM_PORT = 5552;
+
/**
* RabbitMQ host. Ignored if an address is set.
*/
@@ -137,6 +139,8 @@ public class RabbitProperties {
private final Template template = new Template();
+ private final Stream stream = new Stream();
+
private List
parsedAddresses;
public String getHost() {
@@ -361,6 +365,10 @@ public class RabbitProperties {
return this.template;
}
+ public Stream getStream() {
+ return this.stream;
+ }
+
public class Ssl {
private static final String SUN_X509 = "SunX509";
@@ -629,7 +637,12 @@ public class RabbitProperties {
* Container where the listener is invoked directly on the RabbitMQ consumer
* thread.
*/
- DIRECT
+ DIRECT,
+
+ /**
+ * Container that uses the RabbitMQ Stream Client.
+ */
+ STREAM
}
@@ -644,6 +657,8 @@ public class RabbitProperties {
private final DirectContainer direct = new DirectContainer();
+ private final StreamContainer stream = new StreamContainer();
+
public ContainerType getType() {
return this.type;
}
@@ -660,15 +675,31 @@ public class RabbitProperties {
return this.direct;
}
+ public StreamContainer getStream() {
+ return this.stream;
+ }
+
}
- public abstract static class AmqpContainer {
+ public abstract static class BaseContainer {
/**
* Whether to start the container automatically on startup.
*/
private boolean autoStartup = true;
+ public boolean isAutoStartup() {
+ return this.autoStartup;
+ }
+
+ public void setAutoStartup(boolean autoStartup) {
+ this.autoStartup = autoStartup;
+ }
+
+ }
+
+ public abstract static class AmqpContainer extends BaseContainer {
+
/**
* Acknowledge mode of container.
*/
@@ -701,14 +732,6 @@ public class RabbitProperties {
*/
private final ListenerRetry retry = new ListenerRetry();
- public boolean isAutoStartup() {
- return this.autoStartup;
- }
-
- public void setAutoStartup(boolean autoStartup) {
- this.autoStartup = autoStartup;
- }
-
public AcknowledgeMode getAcknowledgeMode() {
return this.acknowledgeMode;
}
@@ -871,6 +894,24 @@ public class RabbitProperties {
}
+ public static class StreamContainer extends BaseContainer {
+
+ /**
+ * Whether the container will support listeners that consume native stream
+ * messages instead of Spring AMQP messages.
+ */
+ boolean nativeListener;
+
+ public boolean isNativeListener() {
+ return this.nativeListener;
+ }
+
+ public void setNativeListener(boolean nativeListener) {
+ this.nativeListener = nativeListener;
+ }
+
+ }
+
public static class Template {
private final Retry retry = new Retry();
@@ -1128,4 +1169,62 @@ public class RabbitProperties {
}
+ public static final class Stream {
+
+ /**
+ * Host of a RabbitMQ instance with the Stream plugin enabled.
+ */
+ private String host = "localhost";
+
+ /**
+ * Stream port of a RabbitMQ instance with the Stream plugin enabled.
+ */
+ private int port = DEFAULT_STREAM_PORT;
+
+ /**
+ * Login user to authenticate to the broker. When not set,
+ * spring.rabbitmq.username is used.
+ */
+ private String username;
+
+ /**
+ * Login password to authenticate to the broker. When not set
+ * spring.rabbitmq.password is used.
+ */
+ private String password;
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUsername() {
+ return this.username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return this.password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ }
+
}
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java
new file mode 100644
index 0000000000..6626e130c6
--- /dev/null
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2021-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.boot.autoconfigure.amqp;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.EnvironmentBuilder;
+
+import org.springframework.amqp.rabbit.config.ContainerCustomizer;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.PropertyMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
+import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
+import org.springframework.rabbit.stream.listener.StreamListenerContainer;
+
+/**
+ * Configuration for Spring RabbitMQ Stream plugin support.
+ *
+ * @author Gary Russell
+ */
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
+@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream")
+class RabbitStreamConfiguration {
+
+ @Bean(name = "rabbitListenerContainerFactory")
+ @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
+ StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
+ RabbitProperties properties, ObjectProvider consumerCustomizer,
+ ObjectProvider> containerCustomizer) {
+ StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
+ rabbitStreamEnvironment);
+ factory.setNativeListener(properties.getListener().getStream().isNativeListener());
+ consumerCustomizer.ifUnique(factory::setConsumerCustomizer);
+ containerCustomizer.ifUnique(factory::setContainerCustomizer);
+ return factory;
+ }
+
+ @Bean(name = "rabbitStreamEnvironment")
+ @ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
+ Environment rabbitStreamEnvironment(RabbitProperties properties) {
+ return configure(Environment.builder(), properties).build();
+ }
+
+ static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
+ builder.lazyInitialization(true);
+ RabbitProperties.Stream stream = properties.getStream();
+ PropertyMapper mapper = PropertyMapper.get();
+ mapper.from(stream.getHost()).to(builder::host);
+ mapper.from(stream.getPort()).to(builder::port);
+ mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
+ mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
+ return builder;
+ }
+
+ private static Function withFallback(Supplier fallback) {
+ return (value) -> (value != null) ? value : fallback.get();
+ }
+
+}
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java
new file mode 100644
index 0000000000..f002d1c09b
--- /dev/null
+++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.boot.autoconfigure.amqp;
+
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.EnvironmentBuilder;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.config.ContainerCustomizer;
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
+import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
+import org.springframework.rabbit.stream.listener.StreamListenerContainer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+/**
+ * Tests for {@link RabbitStreamConfiguration}.
+ *
+ * @author Gary Russell
+ * @author Andy Wilkinson
+ */
+class RabbitStreamConfigurationTests {
+
+ private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class));
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void whenListenerTypeIsStreamThenStreamListenerContainerAndEnvironmentAreAutoConfigured() {
+ this.contextRunner.withUserConfiguration(TestConfiguration.class)
+ .withPropertyValues("spring.rabbitmq.listener.type:stream").run((context) -> {
+ RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
+ MessageListenerContainer listenerContainer = registry.getListenerContainer("test");
+ assertThat(listenerContainer).isInstanceOf(StreamListenerContainer.class);
+ assertThat(listenerContainer).extracting("consumerCustomizer").isNotNull();
+ assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
+ .extracting("nativeListener", InstanceOfAssertFactories.BOOLEAN).isFalse();
+ verify(context.getBean(ContainerCustomizer.class)).configure(listenerContainer);
+ assertThat(context).hasSingleBean(Environment.class);
+ });
+ }
+
+ @Test
+ void whenNativeListenerIsEnabledThenContainerFactoryIsConfiguredToUseNativeListeners() {
+ this.contextRunner
+ .withPropertyValues("spring.rabbitmq.listener.type:stream",
+ "spring.rabbitmq.listener.stream.native-listener:true")
+ .run((context) -> assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
+ .extracting("nativeListener", InstanceOfAssertFactories.BOOLEAN).isTrue());
+ }
+
+ @Test
+ void whenCustomEnvironmenIsDefinedThenAutoConfiguredEnvironmentBacksOff() {
+ this.contextRunner.withUserConfiguration(CustomEnvironmentConfiguration.class).run((context) -> {
+ assertThat(context).hasSingleBean(Environment.class);
+ assertThat(context.getBean(Environment.class))
+ .isSameAs(context.getBean(CustomEnvironmentConfiguration.class).environment);
+ });
+ }
+
+ @Test
+ void whenCustomMessageListenerContainerIsDefinedThenAutoConfiguredContainerBacksOff() {
+ this.contextRunner.withUserConfiguration(CustomMessageListenerContainerFactoryConfiguration.class)
+ .run((context) -> {
+ assertThat(context).hasSingleBean(RabbitListenerContainerFactory.class);
+ assertThat(context.getBean(RabbitListenerContainerFactory.class)).isSameAs(context.getBean(
+ CustomMessageListenerContainerFactoryConfiguration.class).listenerContainerFactory);
+ });
+ }
+
+ @Test
+ void environmentUsesPropertyDefaultsByDefault() {
+ EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
+ RabbitProperties properties = new RabbitProperties();
+ RabbitStreamConfiguration.configure(builder, properties);
+ verify(builder).port(5552);
+ verify(builder).host("localhost");
+ verify(builder).lazyInitialization(true);
+ verify(builder).username("guest");
+ verify(builder).password("guest");
+ verifyNoMoreInteractions(builder);
+ }
+
+ @Test
+ void whenStreamPortIsSetThenEnvironmentUsesCustomPort() {
+ EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
+ RabbitProperties properties = new RabbitProperties();
+ properties.getStream().setPort(5553);
+ RabbitStreamConfiguration.configure(builder, properties);
+ verify(builder).port(5553);
+ }
+
+ @Test
+ void whenStreamHostIsSetThenEnvironmentUsesCustomHost() {
+ EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
+ RabbitProperties properties = new RabbitProperties();
+ properties.getStream().setHost("stream.rabbit.example.com");
+ RabbitStreamConfiguration.configure(builder, properties);
+ verify(builder).host("stream.rabbit.example.com");
+ }
+
+ @Test
+ void whenStreamCredentialsAreNotSetThenEnvironmentUsesRabbitCredentials() {
+ EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
+ RabbitProperties properties = new RabbitProperties();
+ properties.setUsername("alice");
+ properties.setPassword("secret");
+ RabbitStreamConfiguration.configure(builder, properties);
+ verify(builder).username("alice");
+ verify(builder).password("secret");
+ }
+
+ @Test
+ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() {
+ EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
+ RabbitProperties properties = new RabbitProperties();
+ properties.setUsername("alice");
+ properties.setPassword("secret");
+ properties.getStream().setUsername("bob");
+ properties.getStream().setPassword("confidential");
+ RabbitStreamConfiguration.configure(builder, properties);
+ verify(builder).username("bob");
+ verify(builder).password("confidential");
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ static class TestConfiguration {
+
+ @RabbitListener(id = "test", queues = "stream", autoStartup = "false")
+ void listen(String in) {
+ }
+
+ @Bean
+ ConsumerCustomizer consumerCustomizer() {
+ return mock(ConsumerCustomizer.class);
+ }
+
+ @Bean
+ @SuppressWarnings("unchecked")
+ ContainerCustomizer containerCustomizer() {
+ return mock(ContainerCustomizer.class);
+ }
+
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ static class CustomEnvironmentConfiguration {
+
+ private final Environment environment = Environment.builder().lazyInitialization(true).build();
+
+ @Bean
+ Environment rabbitStreamEnvironment() {
+ return this.environment;
+ }
+
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ static class CustomMessageListenerContainerFactoryConfiguration {
+
+ @SuppressWarnings("rawtypes")
+ private final RabbitListenerContainerFactory listenerContainerFactory = mock(
+ RabbitListenerContainerFactory.class);
+
+ @Bean
+ @SuppressWarnings("unchecked")
+ RabbitListenerContainerFactory rabbitListenerContainerFactory() {
+ return this.listenerContainerFactory;
+ }
+
+ }
+
+}
diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle
index 98032a3db5..abd99fd91f 100644
--- a/spring-boot-project/spring-boot-dependencies/build.gradle
+++ b/spring-boot-project/spring-boot-dependencies/build.gradle
@@ -1416,6 +1416,13 @@ bom {
]
}
}
+ library("Rabbit Stream Client", "0.3.0") {
+ group("com.rabbitmq") {
+ modules = [
+ "stream-client"
+ ]
+ }
+ }
library("Reactive Streams", "1.0.3") {
group("org.reactivestreams") {
modules = [
@@ -1644,6 +1651,7 @@ bom {
modules = [
"spring-amqp",
"spring-rabbit",
+ "spring-rabbit-stream",
"spring-rabbit-junit",
"spring-rabbit-test"
]