diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 0000000000..cedc264253 --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,122 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; +import org.springframework.util.Assert; + +/** + * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * + * @param the container factory type. + * @author Gary Russell + * @author Stephane Nicoll + * @since 2.0.0 + */ +public abstract class AbstractRabbitListenerContainerFactoryConfigurer< + T extends AbstractRabbitListenerContainerFactory> { + + private MessageConverter messageConverter; + + private MessageRecoverer messageRecoverer; + + private RabbitProperties rabbitProperties; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + protected void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. + * @param messageRecoverer the {@link MessageRecoverer} + */ + protected void setMessageRecoverer(MessageRecoverer messageRecoverer) { + this.messageRecoverer = messageRecoverer; + } + + /** + * Set the {@link RabbitProperties} to use. + * @param rabbitProperties the {@link RabbitProperties} + */ + protected void setRabbitProperties(RabbitProperties rabbitProperties) { + this.rabbitProperties = rabbitProperties; + } + + protected final RabbitProperties getRabbitProperties() { + return this.rabbitProperties; + } + + /** + * Configure the specified rabbit listener container factory. The factory can be + * further tuned and default settings can be overridden. + * @param factory the {@link AbstractRabbitListenerContainerFactory} instance to + * configure + * @param connectionFactory the {@link ConnectionFactory} to use + */ + public abstract void configure(T factory, ConnectionFactory connectionFactory); + + + protected void configure(T factory, ConnectionFactory connectionFactory, + RabbitProperties.AmqpContainer configuration) { + Assert.notNull(factory, "Factory must not be null"); + Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + Assert.notNull(configuration, "Configuration must not be null"); + factory.setConnectionFactory(connectionFactory); + if (this.messageConverter != null) { + factory.setMessageConverter(this.messageConverter); + } + factory.setAutoStartup(configuration.isAutoStartup()); + if (configuration.getAcknowledgeMode() != null) { + factory.setAcknowledgeMode(configuration.getAcknowledgeMode()); + } + if (configuration.getPrefetch() != null) { + factory.setPrefetchCount(configuration.getPrefetch()); + } + if (configuration.getDefaultRequeueRejected() != null) { + factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected()); + } + if (configuration.getIdleEventInterval() != null) { + factory.setIdleEventInterval(configuration.getIdleEventInterval()); + } + ListenerRetry retryConfig = configuration.getRetry(); + if (retryConfig.isEnabled()) { + RetryInterceptorBuilder builder = (retryConfig.isStateless() + ? RetryInterceptorBuilder.stateless() + : RetryInterceptorBuilder.stateful()); + builder.maxAttempts(retryConfig.getMaxAttempts()); + builder.backOffOptions(retryConfig.getInitialInterval(), + retryConfig.getMultiplier(), retryConfig.getMaxInterval()); + MessageRecoverer recoverer = (this.messageRecoverer != null + ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); + builder.recoverer(recoverer); + factory.setAdviceChain(builder.build()); + } + } + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 0000000000..07879468dc --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,42 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; + +/** + * Configure {@link DirectRabbitListenerContainerFactoryConfigurer} with sensible defaults. + * + * @author Gary Russell + * @author Stephane Nicoll + * @since 2.0 + */ +public final class DirectRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { + + @Override + public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) { + RabbitProperties.DirectContainer config = getRabbitProperties().getListener() + .getDirect(); + configure(factory, connectionFactory, config); + if (config.getConsumersPerQueue() != null) { + factory.setConsumersPerQueue(config.getConsumersPerQueue()); + } + } + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 18c107ca2d..7da43ccf28 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -25,6 +26,7 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -55,7 +57,7 @@ class RabbitAnnotationDrivenConfiguration { @Bean @ConditionalOnMissingBean - public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { + public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() { SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); @@ -63,9 +65,10 @@ class RabbitAnnotationDrivenConfiguration { return configurer; } - @Bean + @Bean(name = "rabbitListenerContainerFactory") @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) + public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); @@ -73,6 +76,29 @@ class RabbitAnnotationDrivenConfiguration { return factory; } + + @Bean + @ConditionalOnMissingBean + public DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer() { + DirectRabbitListenerContainerFactoryConfigurer configurer = + new DirectRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } + + @Bean(name = "rabbitListenerContainerFactory") + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct") + public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory( + DirectRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } + @EnableRabbit @ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableRabbitConfiguration { diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index f5b8f883cd..d2d0eb30db 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -22,7 +22,6 @@ import java.util.List; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -465,112 +464,54 @@ public class RabbitProperties { } - public static class Listener { - - @NestedConfigurationProperty - private final AmqpContainer simple = new AmqpContainer(); - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup") - @Deprecated - public boolean isAutoStartup() { - return getSimple().isAutoStartup(); - } - - @Deprecated - public void setAutoStartup(boolean autoStartup) { - getSimple().setAutoStartup(autoStartup); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.acknowledge-mode") - @Deprecated - public AcknowledgeMode getAcknowledgeMode() { - return getSimple().getAcknowledgeMode(); - } - - @Deprecated - public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { - getSimple().setAcknowledgeMode(acknowledgeMode); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.concurrency") - @Deprecated - public Integer getConcurrency() { - return getSimple().getConcurrency(); - } + public enum ContainerType { - @Deprecated - public void setConcurrency(Integer concurrency) { - getSimple().setConcurrency(concurrency); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.max-concurrency") - @Deprecated - public Integer getMaxConcurrency() { - return getSimple().getMaxConcurrency(); - } - - @Deprecated - public void setMaxConcurrency(Integer maxConcurrency) { - getSimple().setMaxConcurrency(maxConcurrency); - } + /** + * Legacy container where the RabbitMQ consumer dispatches messages to an + * invoker thread. + */ + SIMPLE, - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.prefetch") - @Deprecated - public Integer getPrefetch() { - return getSimple().getPrefetch(); - } + /** + * Container where the listener is invoked directly on the RabbitMQ consumer + * thread. + */ + DIRECT - @Deprecated - public void setPrefetch(Integer prefetch) { - getSimple().setPrefetch(prefetch); - } + } - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.transaction-size") - @Deprecated - public Integer getTransactionSize() { - return getSimple().getTransactionSize(); - } + public static class Listener { - @Deprecated - public void setTransactionSize(Integer transactionSize) { - getSimple().setTransactionSize(transactionSize); - } + /** + * Listener container type. + */ + private ContainerType type = ContainerType.SIMPLE; - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.default-requeue-rejected") - @Deprecated - public Boolean getDefaultRequeueRejected() { - return getSimple().getDefaultRequeueRejected(); - } + @NestedConfigurationProperty + private final SimpleContainer simple = new SimpleContainer(); - @Deprecated - public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { - getSimple().setDefaultRequeueRejected(defaultRequeueRejected); - } + @NestedConfigurationProperty + private final DirectContainer direct = new DirectContainer(); - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.idle-event-interval") - @Deprecated - public Long getIdleEventInterval() { - return getSimple().getIdleEventInterval(); + public ContainerType getType() { + return this.type; } - @Deprecated - public void setIdleEventInterval(Long idleEventInterval) { - getSimple().setIdleEventInterval(idleEventInterval); + public void setType(ContainerType containerType) { + this.type = containerType; } - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.retry") - @Deprecated - public ListenerRetry getRetry() { - return getSimple().getRetry(); + public SimpleContainer getSimple() { + return this.simple; } - public AmqpContainer getSimple() { - return this.simple; + public DirectContainer getDirect() { + return this.direct; } } - public static class AmqpContainer { + public static abstract class AmqpContainer { /** * Start the container automatically on startup. @@ -582,28 +523,12 @@ public class RabbitProperties { */ private AcknowledgeMode acknowledgeMode; - /** - * Minimum number of consumers. - */ - private Integer concurrency; - - /** - * Maximum number of consumers. - */ - private Integer maxConcurrency; - /** * Number of messages to be handled in a single request. It should be greater than * or equal to the transaction size (if used). */ private Integer prefetch; - /** - * Number of messages to be processed in a transaction. For best results it should - * be less than or equal to the prefetch count. - */ - private Integer transactionSize; - /** * Whether rejected deliveries are requeued by default; default true. */ @@ -636,6 +561,58 @@ public class RabbitProperties { this.acknowledgeMode = acknowledgeMode; } + public Integer getPrefetch() { + return this.prefetch; + } + + public void setPrefetch(Integer prefetch) { + this.prefetch = prefetch; + } + + public Boolean getDefaultRequeueRejected() { + return this.defaultRequeueRejected; + } + + public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { + this.defaultRequeueRejected = defaultRequeueRejected; + } + + public Long getIdleEventInterval() { + return this.idleEventInterval; + } + + public void setIdleEventInterval(Long idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + + public ListenerRetry getRetry() { + return this.retry; + } + + } + + /** + * Configuration properties for {@code SimpleMessageListenerContainer}. + */ + public static class SimpleContainer extends AmqpContainer { + + /** + * Minimum number of listener invoker threads. + */ + private Integer concurrency; + + /** + * Maximum number of listener invoker threads. + */ + private Integer maxConcurrency; + + /** + * Number of messages to be processed in a transaction; number of messages + * between acks. For best results it should + * be less than or equal to the prefetch count. + */ + private Integer transactionSize; + public Integer getConcurrency() { return this.concurrency; } @@ -652,14 +629,6 @@ public class RabbitProperties { this.maxConcurrency = maxConcurrency; } - public Integer getPrefetch() { - return this.prefetch; - } - - public void setPrefetch(Integer prefetch) { - this.prefetch = prefetch; - } - public Integer getTransactionSize() { return this.transactionSize; } @@ -668,24 +637,24 @@ public class RabbitProperties { this.transactionSize = transactionSize; } - public Boolean getDefaultRequeueRejected() { - return this.defaultRequeueRejected; - } + } - public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { - this.defaultRequeueRejected = defaultRequeueRejected; - } + /** + * Configuration properties for {@code DirectMessageListenerContainer}. + */ + public static class DirectContainer extends AmqpContainer { - public Long getIdleEventInterval() { - return this.idleEventInterval; - } + /** + * Number of consumers per queue. + */ + private Integer consumersPerQueue; - public void setIdleEventInterval(Long idleEventInterval) { - this.idleEventInterval = idleEventInterval; + public Integer getConsumersPerQueue() { + return this.consumersPerQueue; } - public ListenerRetry getRetry() { - return this.retry; + public void setConsumersPerQueue(Integer consumersPerQueue) { + this.consumersPerQueue = consumersPerQueue; } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index 52b6509039..a31db8b094 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -16,109 +16,33 @@ package org.springframework.boot.autoconfigure.amqp; -import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.retry.MessageRecoverer; -import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; -import org.springframework.util.Assert; /** - * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * Configure {@link SimpleRabbitListenerContainerFactoryConfigurer} with sensible defaults. * * @author Stephane Nicoll * @author Gary Russell * @since 1.3.3 */ -public final class SimpleRabbitListenerContainerFactoryConfigurer { +public final class SimpleRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { - private MessageConverter messageConverter; - - private MessageRecoverer messageRecoverer; - - private RabbitProperties rabbitProperties; - - /** - * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box - * converter should be used. - * @param messageConverter the {@link MessageConverter} - */ - void setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; - } - - /** - * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. - * @param messageRecoverer the {@link MessageRecoverer} - */ - void setMessageRecoverer(MessageRecoverer messageRecoverer) { - this.messageRecoverer = messageRecoverer; - } - - /** - * Set the {@link RabbitProperties} to use. - * @param rabbitProperties the {@link RabbitProperties} - */ - void setRabbitProperties(RabbitProperties rabbitProperties) { - this.rabbitProperties = rabbitProperties; - } - - /** - * Configure the specified rabbit listener container factory. The factory can be - * further tuned and default settings can be overridden. - * @param factory the {@link SimpleRabbitListenerContainerFactory} instance to - * configure - * @param connectionFactory the {@link ConnectionFactory} to use - */ - public void configure(SimpleRabbitListenerContainerFactory factory, - ConnectionFactory connectionFactory) { - Assert.notNull(factory, "Factory must not be null"); - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - factory.setConnectionFactory(connectionFactory); - if (this.messageConverter != null) { - factory.setMessageConverter(this.messageConverter); - } - RabbitProperties.AmqpContainer config = this.rabbitProperties.getListener() + @Override + public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) { + RabbitProperties.SimpleContainer config = getRabbitProperties().getListener() .getSimple(); - factory.setAutoStartup(config.isAutoStartup()); - if (config.getAcknowledgeMode() != null) { - factory.setAcknowledgeMode(config.getAcknowledgeMode()); - } + configure(factory, connectionFactory, config); if (config.getConcurrency() != null) { factory.setConcurrentConsumers(config.getConcurrency()); } if (config.getMaxConcurrency() != null) { factory.setMaxConcurrentConsumers(config.getMaxConcurrency()); } - if (config.getPrefetch() != null) { - factory.setPrefetchCount(config.getPrefetch()); - } if (config.getTransactionSize() != null) { factory.setTxSize(config.getTransactionSize()); } - if (config.getDefaultRequeueRejected() != null) { - factory.setDefaultRequeueRejected(config.getDefaultRequeueRejected()); - } - if (config.getIdleEventInterval() != null) { - factory.setIdleEventInterval(config.getIdleEventInterval()); - } - ListenerRetry retryConfig = config.getRetry(); - if (retryConfig.isEnabled()) { - RetryInterceptorBuilder builder = (retryConfig.isStateless() - ? RetryInterceptorBuilder.stateless() - : RetryInterceptorBuilder.stateful()); - builder.maxAttempts(retryConfig.getMaxAttempts()); - builder.backOffOptions(retryConfig.getInitialInterval(), - retryConfig.getMultiplier(), retryConfig.getMaxInterval()); - MessageRecoverer recoverer = (this.messageRecoverer != null - ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); - builder.recoverer(recoverer); - factory.setAdviceChain(builder.build()); - } - } } diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 447876ff98..9bb77b8370 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -363,72 +363,6 @@ "description": "Create an AmqpAdmin bean.", "defaultValue": true }, - { - "name": "spring.rabbitmq.listener.retry.enabled", - "type": "java.lang.Boolean", - "description": "Whether or not publishing retries are enabled.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": false, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.enabled" - } - }, - { - "name": "spring.rabbitmq.listener.retry.initial-interval", - "type": "java.lang.Long", - "description": "Interval between the first and second attempt to publish or deliver a message.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 1000, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.initial-interval" - } - }, - { - "name": "spring.rabbitmq.listener.retry.max-attempts", - "type": "java.lang.Integer", - "description": "Maximum number of attempts to publish or deliver a message.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 3, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.max-attempts" - } - }, - { - "name": "spring.rabbitmq.listener.retry.max-interval", - "type": "java.lang.Long", - "description": "Maximum interval between attempts.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 10000, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.max-interval" - } - }, - { - "name": "spring.rabbitmq.listener.retry.multiplier", - "type": "java.lang.Double", - "description": "A multiplier to apply to the previous retry interval.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 1, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.multiplier" - } - }, - { - "name": "spring.rabbitmq.listener.retry.stateless", - "type": "java.lang.Boolean", - "description": "Whether or not retries are stateless or stateful.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": true, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.stateless" - } - }, { "name": "spring.session.hazelcast.flush-mode", "defaultValue": "on-save" diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index d1a056f74d..98058a3ceb 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -30,6 +30,7 @@ import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; @@ -292,28 +293,10 @@ public class RabbitAutoConfigurationTests { assertThat(adviceChain).isNull(); } - @Test - @Deprecated - public void testSimpleRabbitListenerContainerFactoryWithCustomDeprecatedSettings() { - testSimpleRabbitListenerContainerFactoryWithCustomSettings( - "spring.rabbitmq.listener.retry.enabled:true", - "spring.rabbitmq.listener.retry.maxAttempts:4", - "spring.rabbitmq.listener.retry.initialInterval:2000", - "spring.rabbitmq.listener.retry.multiplier:1.5", - "spring.rabbitmq.listener.retry.maxInterval:5000", - "spring.rabbitmq.listener.autoStartup:false", - "spring.rabbitmq.listener.acknowledgeMode:manual", - "spring.rabbitmq.listener.concurrency:5", - "spring.rabbitmq.listener.maxConcurrency:10", - "spring.rabbitmq.listener.prefetch:40", - "spring.rabbitmq.listener.defaultRequeueRejected:false", - "spring.rabbitmq.listener.idleEventInterval:5", - "spring.rabbitmq.listener.transactionSize:20"); - } - @Test public void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { - testSimpleRabbitListenerContainerFactoryWithCustomSettings( + load(new Class[] { MessageConvertersConfiguration.class, + MessageRecoverersConfiguration.class }, "spring.rabbitmq.listener.simple.retry.enabled:true", "spring.rabbitmq.listener.simple.retry.maxAttempts:4", "spring.rabbitmq.listener.simple.retry.initialInterval:2000", @@ -327,26 +310,91 @@ public class RabbitAutoConfigurationTests { "spring.rabbitmq.listener.simple.defaultRequeueRejected:false", "spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.transactionSize:20"); + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + .getBean("rabbitListenerContainerFactory", + SimpleRabbitListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); + assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); + assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); + checkCommonProps(dfa); } - private void testSimpleRabbitListenerContainerFactoryWithCustomSettings(String... environment) { + @Test + public void testDirectRabbitListenerContainerFactoryWithCustomSettings() { load(new Class[] { MessageConvertersConfiguration.class, - MessageRecoverersConfiguration.class }, environment); - SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + MessageRecoverersConfiguration.class }, + "spring.rabbitmq.listener.type:direct", + "spring.rabbitmq.listener.direct.retry.enabled:true", + "spring.rabbitmq.listener.direct.retry.maxAttempts:4", + "spring.rabbitmq.listener.direct.retry.initialInterval:2000", + "spring.rabbitmq.listener.direct.retry.multiplier:1.5", + "spring.rabbitmq.listener.direct.retry.maxInterval:5000", + "spring.rabbitmq.listener.direct.autoStartup:false", + "spring.rabbitmq.listener.direct.acknowledgeMode:manual", + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40", + "spring.rabbitmq.listener.direct.defaultRequeueRejected:false", + "spring.rabbitmq.listener.direct.idleEventInterval:5"); + DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context .getBean("rabbitListenerContainerFactory", - SimpleRabbitListenerContainerFactory.class); + DirectRabbitListenerContainerFactory.class); DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("consumersPerQueue")).isEqualTo(5); checkCommonProps(dfa); } + @Test + public void testRabbitListenerContainerFactoryConfigurersAreAvailable() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.simple.concurrency:5", + "spring.rabbitmq.listener.simple.maxConcurrency:10", + "spring.rabbitmq.listener.simple.prefetch:40", + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40"); + assertThat(this.context.getBeansOfType( + SimpleRabbitListenerContainerFactoryConfigurer.class)).hasSize(1); + assertThat(this.context.getBeansOfType( + DirectRabbitListenerContainerFactoryConfigurer.class)).hasSize(1); + } + + @Test + public void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.type:direct", // listener type is irrelevant + "spring.rabbitmq.listener.simple.concurrency:5", + "spring.rabbitmq.listener.simple.maxConcurrency:10", + "spring.rabbitmq.listener.simple.prefetch:40"); + SimpleRabbitListenerContainerFactoryConfigurer configurer = this.context + .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); + SimpleRabbitListenerContainerFactory factory = + mock(SimpleRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + verify(factory).setConcurrentConsumers(5); + verify(factory).setMaxConcurrentConsumers(10); + verify(factory).setPrefetchCount(40); + } + + @Test + public void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.type:simple", // listener type is irrelevant + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40"); + DirectRabbitListenerContainerFactoryConfigurer configurer = this.context + .getBean(DirectRabbitListenerContainerFactoryConfigurer.class); + DirectRabbitListenerContainerFactory factory = + mock(DirectRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + verify(factory).setConsumersPerQueue(5); + verify(factory).setPrefetchCount(40); + } + private void checkCommonProps(DirectFieldAccessor dfa) { assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE); assertThat(dfa.getPropertyValue("acknowledgeMode")) .isEqualTo(AcknowledgeMode.MANUAL); - assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); - assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40); - assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); assertThat(dfa.getPropertyValue("messageConverter")) .isSameAs(this.context.getBean("myMessageConverter")); assertThat(dfa.getPropertyValue("defaultRequeueRejected")) diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f6166bbc17..8ceeb8818d 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -975,12 +975,13 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite. spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean. spring.rabbitmq.host=localhost # RabbitMQ host. + spring.rabbitmq.listener.direct.consumers-per-queue= # Number of consumers per queue. spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup. - spring.rabbitmq.listener.simple.concurrency= # Minimum number of consumers. - spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. + spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads. + spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures. spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds. - spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of consumers. + spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker. spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. @@ -988,7 +989,8 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful. - spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. + spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count. + spring.rabbitmq.listener.type=simple # Listener container type. spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 6ea3bce6da..0b2e81ad0a 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4653,9 +4653,10 @@ the broker connection is lost. Retries are disabled by default. ==== Receiving a message When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory` -has been defined, a default one is configured automatically. If a `MessageConverter` or -`MessageRecoverer` beans are defined, they are associated automatically to the default -factory. +has been defined, a default `SimpleRabbitListenerContainerFactory` is configured +automatically and you can switch to a direct container using the +`spring.rabbitmq.listener.type` property. If a `MessageConverter` or `MessageRecoverer` +beans are defined, they are associated automatically to the default factory. The following component creates a listener endpoint on the `someQueue` queue: @@ -4677,9 +4678,13 @@ for more details. If you need to create more `RabbitListenerContainerFactory` instances or if you want to override the default, Spring Boot provides a -`SimpleRabbitListenerContainerFactoryConfigurer` that you can use to initialize a -`SimpleRabbitListenerContainerFactory` with the same settings as the one that is -auto-configured. +`SimpleRabbitListenerContainerFactoryConfigurer` and +`DirectRabbitListenerContainerFactoryConfigurer` that you can use to initialize a +`SimpleRabbitListenerContainerFactory` and `DirectRabbitListenerContainerFactory` with the +same settings as the one used by the auto-configuration. + +TIP: It doesn't matter which container type you've chosen, those two beans are exposed by +the auto-configuration. For instance, the following exposes another factory that uses a specific `MessageConverter`: