Merge pull request #9055 from garyrussell:GH-9041

* pr/9055:
  Remove deprecated spring.rabbitmq.listener.* properties
  Polish "Support direct AMQP container"
  Support direct AMQP container
pull/7241/merge
Stephane Nicoll 8 years ago
commit 2245bced9f

@ -0,0 +1,122 @@
/*
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
import org.springframework.util.Assert;
/**
* Configure {@link RabbitListenerContainerFactory} with sensible defaults.
*
* @param <T> the container factory type.
* @author Gary Russell
* @author Stephane Nicoll
* @since 2.0.0
*/
public abstract class AbstractRabbitListenerContainerFactoryConfigurer<
T extends AbstractRabbitListenerContainerFactory<?>> {
private MessageConverter messageConverter;
private MessageRecoverer messageRecoverer;
private RabbitProperties rabbitProperties;
/**
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
* converter should be used.
* @param messageConverter the {@link MessageConverter}
*/
protected void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
/**
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
* @param messageRecoverer the {@link MessageRecoverer}
*/
protected void setMessageRecoverer(MessageRecoverer messageRecoverer) {
this.messageRecoverer = messageRecoverer;
}
/**
* Set the {@link RabbitProperties} to use.
* @param rabbitProperties the {@link RabbitProperties}
*/
protected void setRabbitProperties(RabbitProperties rabbitProperties) {
this.rabbitProperties = rabbitProperties;
}
protected final RabbitProperties getRabbitProperties() {
return this.rabbitProperties;
}
/**
* Configure the specified rabbit listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param factory the {@link AbstractRabbitListenerContainerFactory} instance to
* configure
* @param connectionFactory the {@link ConnectionFactory} to use
*/
public abstract void configure(T factory, ConnectionFactory connectionFactory);
protected void configure(T factory, ConnectionFactory connectionFactory,
RabbitProperties.AmqpContainer configuration) {
Assert.notNull(factory, "Factory must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
Assert.notNull(configuration, "Configuration must not be null");
factory.setConnectionFactory(connectionFactory);
if (this.messageConverter != null) {
factory.setMessageConverter(this.messageConverter);
}
factory.setAutoStartup(configuration.isAutoStartup());
if (configuration.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(configuration.getAcknowledgeMode());
}
if (configuration.getPrefetch() != null) {
factory.setPrefetchCount(configuration.getPrefetch());
}
if (configuration.getDefaultRequeueRejected() != null) {
factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected());
}
if (configuration.getIdleEventInterval() != null) {
factory.setIdleEventInterval(configuration.getIdleEventInterval());
}
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
}
}
}

@ -0,0 +1,42 @@
/*
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
/**
* Configure {@link DirectRabbitListenerContainerFactoryConfigurer} with sensible defaults.
*
* @author Gary Russell
* @author Stephane Nicoll
* @since 2.0
*/
public final class DirectRabbitListenerContainerFactoryConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<DirectRabbitListenerContainerFactory> {
@Override
public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
RabbitProperties.DirectContainer config = getRabbitProperties().getListener()
.getDirect();
configure(factory, connectionFactory, config);
if (config.getConsumersPerQueue() != null) {
factory.setConsumersPerQueue(config.getConsumersPerQueue());
}
}
}

@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.amqp; package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@ -25,6 +26,7 @@ import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -55,7 +57,7 @@ class RabbitAnnotationDrivenConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
@ -63,9 +65,10 @@ class RabbitAnnotationDrivenConfiguration {
return configurer; return configurer;
} }
@Bean @Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) { ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
@ -73,6 +76,29 @@ class RabbitAnnotationDrivenConfiguration {
return factory; return factory;
} }
@Bean
@ConditionalOnMissingBean
public DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer() {
DirectRabbitListenerContainerFactoryConfigurer configurer =
new DirectRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct")
public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
DirectRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@EnableRabbit @EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration { protected static class EnableRabbitConfiguration {

@ -22,7 +22,6 @@ import java.util.List;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -465,112 +464,54 @@ public class RabbitProperties {
} }
public static class Listener { public enum ContainerType {
@NestedConfigurationProperty
private final AmqpContainer simple = new AmqpContainer();
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup")
@Deprecated
public boolean isAutoStartup() {
return getSimple().isAutoStartup();
}
@Deprecated
public void setAutoStartup(boolean autoStartup) {
getSimple().setAutoStartup(autoStartup);
}
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.acknowledge-mode")
@Deprecated
public AcknowledgeMode getAcknowledgeMode() {
return getSimple().getAcknowledgeMode();
}
@Deprecated
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
getSimple().setAcknowledgeMode(acknowledgeMode);
}
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.concurrency")
@Deprecated
public Integer getConcurrency() {
return getSimple().getConcurrency();
}
@Deprecated /**
public void setConcurrency(Integer concurrency) { * Legacy container where the RabbitMQ consumer dispatches messages to an
getSimple().setConcurrency(concurrency); * invoker thread.
} */
SIMPLE,
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.max-concurrency")
@Deprecated
public Integer getMaxConcurrency() {
return getSimple().getMaxConcurrency();
}
@Deprecated
public void setMaxConcurrency(Integer maxConcurrency) {
getSimple().setMaxConcurrency(maxConcurrency);
}
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.prefetch") /**
@Deprecated * Container where the listener is invoked directly on the RabbitMQ consumer
public Integer getPrefetch() { * thread.
return getSimple().getPrefetch(); */
} DIRECT
@Deprecated
public void setPrefetch(Integer prefetch) {
getSimple().setPrefetch(prefetch);
} }
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.transaction-size") public static class Listener {
@Deprecated
public Integer getTransactionSize() {
return getSimple().getTransactionSize();
}
@Deprecated /**
public void setTransactionSize(Integer transactionSize) { * Listener container type.
getSimple().setTransactionSize(transactionSize); */
} private ContainerType type = ContainerType.SIMPLE;
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.default-requeue-rejected") @NestedConfigurationProperty
@Deprecated private final SimpleContainer simple = new SimpleContainer();
public Boolean getDefaultRequeueRejected() {
return getSimple().getDefaultRequeueRejected();
}
@Deprecated @NestedConfigurationProperty
public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { private final DirectContainer direct = new DirectContainer();
getSimple().setDefaultRequeueRejected(defaultRequeueRejected);
}
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.idle-event-interval") public ContainerType getType() {
@Deprecated return this.type;
public Long getIdleEventInterval() {
return getSimple().getIdleEventInterval();
} }
@Deprecated public void setType(ContainerType containerType) {
public void setIdleEventInterval(Long idleEventInterval) { this.type = containerType;
getSimple().setIdleEventInterval(idleEventInterval);
} }
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.retry") public SimpleContainer getSimple() {
@Deprecated return this.simple;
public ListenerRetry getRetry() {
return getSimple().getRetry();
} }
public AmqpContainer getSimple() { public DirectContainer getDirect() {
return this.simple; return this.direct;
} }
} }
public static class AmqpContainer { public static abstract class AmqpContainer {
/** /**
* Start the container automatically on startup. * Start the container automatically on startup.
@ -582,28 +523,12 @@ public class RabbitProperties {
*/ */
private AcknowledgeMode acknowledgeMode; private AcknowledgeMode acknowledgeMode;
/**
* Minimum number of consumers.
*/
private Integer concurrency;
/**
* Maximum number of consumers.
*/
private Integer maxConcurrency;
/** /**
* Number of messages to be handled in a single request. It should be greater than * Number of messages to be handled in a single request. It should be greater than
* or equal to the transaction size (if used). * or equal to the transaction size (if used).
*/ */
private Integer prefetch; private Integer prefetch;
/**
* Number of messages to be processed in a transaction. For best results it should
* be less than or equal to the prefetch count.
*/
private Integer transactionSize;
/** /**
* Whether rejected deliveries are requeued by default; default true. * Whether rejected deliveries are requeued by default; default true.
*/ */
@ -636,6 +561,58 @@ public class RabbitProperties {
this.acknowledgeMode = acknowledgeMode; this.acknowledgeMode = acknowledgeMode;
} }
public Integer getPrefetch() {
return this.prefetch;
}
public void setPrefetch(Integer prefetch) {
this.prefetch = prefetch;
}
public Boolean getDefaultRequeueRejected() {
return this.defaultRequeueRejected;
}
public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) {
this.defaultRequeueRejected = defaultRequeueRejected;
}
public Long getIdleEventInterval() {
return this.idleEventInterval;
}
public void setIdleEventInterval(Long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}
public ListenerRetry getRetry() {
return this.retry;
}
}
/**
* Configuration properties for {@code SimpleMessageListenerContainer}.
*/
public static class SimpleContainer extends AmqpContainer {
/**
* Minimum number of listener invoker threads.
*/
private Integer concurrency;
/**
* Maximum number of listener invoker threads.
*/
private Integer maxConcurrency;
/**
* Number of messages to be processed in a transaction; number of messages
* between acks. For best results it should
* be less than or equal to the prefetch count.
*/
private Integer transactionSize;
public Integer getConcurrency() { public Integer getConcurrency() {
return this.concurrency; return this.concurrency;
} }
@ -652,14 +629,6 @@ public class RabbitProperties {
this.maxConcurrency = maxConcurrency; this.maxConcurrency = maxConcurrency;
} }
public Integer getPrefetch() {
return this.prefetch;
}
public void setPrefetch(Integer prefetch) {
this.prefetch = prefetch;
}
public Integer getTransactionSize() { public Integer getTransactionSize() {
return this.transactionSize; return this.transactionSize;
} }
@ -668,24 +637,24 @@ public class RabbitProperties {
this.transactionSize = transactionSize; this.transactionSize = transactionSize;
} }
public Boolean getDefaultRequeueRejected() {
return this.defaultRequeueRejected;
} }
public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { /**
this.defaultRequeueRejected = defaultRequeueRejected; * Configuration properties for {@code DirectMessageListenerContainer}.
} */
public static class DirectContainer extends AmqpContainer {
public Long getIdleEventInterval() { /**
return this.idleEventInterval; * Number of consumers per queue.
} */
private Integer consumersPerQueue;
public void setIdleEventInterval(Long idleEventInterval) { public Integer getConsumersPerQueue() {
this.idleEventInterval = idleEventInterval; return this.consumersPerQueue;
} }
public ListenerRetry getRetry() { public void setConsumersPerQueue(Integer consumersPerQueue) {
return this.retry; this.consumersPerQueue = consumersPerQueue;
} }
} }

@ -16,109 +16,33 @@
package org.springframework.boot.autoconfigure.amqp; package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
import org.springframework.util.Assert;
/** /**
* Configure {@link RabbitListenerContainerFactory} with sensible defaults. * Configure {@link SimpleRabbitListenerContainerFactoryConfigurer} with sensible defaults.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Gary Russell * @author Gary Russell
* @since 1.3.3 * @since 1.3.3
*/ */
public final class SimpleRabbitListenerContainerFactoryConfigurer { public final class SimpleRabbitListenerContainerFactoryConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
private MessageConverter messageConverter; @Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
private MessageRecoverer messageRecoverer; RabbitProperties.SimpleContainer config = getRabbitProperties().getListener()
private RabbitProperties rabbitProperties;
/**
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
* converter should be used.
* @param messageConverter the {@link MessageConverter}
*/
void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
/**
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
* @param messageRecoverer the {@link MessageRecoverer}
*/
void setMessageRecoverer(MessageRecoverer messageRecoverer) {
this.messageRecoverer = messageRecoverer;
}
/**
* Set the {@link RabbitProperties} to use.
* @param rabbitProperties the {@link RabbitProperties}
*/
void setRabbitProperties(RabbitProperties rabbitProperties) {
this.rabbitProperties = rabbitProperties;
}
/**
* Configure the specified rabbit listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param factory the {@link SimpleRabbitListenerContainerFactory} instance to
* configure
* @param connectionFactory the {@link ConnectionFactory} to use
*/
public void configure(SimpleRabbitListenerContainerFactory factory,
ConnectionFactory connectionFactory) {
Assert.notNull(factory, "Factory must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
factory.setConnectionFactory(connectionFactory);
if (this.messageConverter != null) {
factory.setMessageConverter(this.messageConverter);
}
RabbitProperties.AmqpContainer config = this.rabbitProperties.getListener()
.getSimple(); .getSimple();
factory.setAutoStartup(config.isAutoStartup()); configure(factory, connectionFactory, config);
if (config.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(config.getAcknowledgeMode());
}
if (config.getConcurrency() != null) { if (config.getConcurrency() != null) {
factory.setConcurrentConsumers(config.getConcurrency()); factory.setConcurrentConsumers(config.getConcurrency());
} }
if (config.getMaxConcurrency() != null) { if (config.getMaxConcurrency() != null) {
factory.setMaxConcurrentConsumers(config.getMaxConcurrency()); factory.setMaxConcurrentConsumers(config.getMaxConcurrency());
} }
if (config.getPrefetch() != null) {
factory.setPrefetchCount(config.getPrefetch());
}
if (config.getTransactionSize() != null) { if (config.getTransactionSize() != null) {
factory.setTxSize(config.getTransactionSize()); factory.setTxSize(config.getTransactionSize());
} }
if (config.getDefaultRequeueRejected() != null) {
factory.setDefaultRequeueRejected(config.getDefaultRequeueRejected());
}
if (config.getIdleEventInterval() != null) {
factory.setIdleEventInterval(config.getIdleEventInterval());
}
ListenerRetry retryConfig = config.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
}
} }
} }

@ -363,72 +363,6 @@
"description": "Create an AmqpAdmin bean.", "description": "Create an AmqpAdmin bean.",
"defaultValue": true "defaultValue": true
}, },
{
"name": "spring.rabbitmq.listener.retry.enabled",
"type": "java.lang.Boolean",
"description": "Whether or not publishing retries are enabled.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": false,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.enabled"
}
},
{
"name": "spring.rabbitmq.listener.retry.initial-interval",
"type": "java.lang.Long",
"description": "Interval between the first and second attempt to publish or deliver a message.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": 1000,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.initial-interval"
}
},
{
"name": "spring.rabbitmq.listener.retry.max-attempts",
"type": "java.lang.Integer",
"description": "Maximum number of attempts to publish or deliver a message.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": 3,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.max-attempts"
}
},
{
"name": "spring.rabbitmq.listener.retry.max-interval",
"type": "java.lang.Long",
"description": "Maximum interval between attempts.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": 10000,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.max-interval"
}
},
{
"name": "spring.rabbitmq.listener.retry.multiplier",
"type": "java.lang.Double",
"description": "A multiplier to apply to the previous retry interval.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": 1,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.multiplier"
}
},
{
"name": "spring.rabbitmq.listener.retry.stateless",
"type": "java.lang.Boolean",
"description": "Whether or not retries are stateless or stateful.",
"sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry",
"defaultValue": true,
"deprecated": true,
"deprecation": {
"replacement": "spring.rabbitmq.listener.simple.retry.stateless"
}
},
{ {
"name": "spring.session.hazelcast.flush-mode", "name": "spring.session.hazelcast.flush-mode",
"defaultValue": "on-save" "defaultValue": "on-save"

@ -30,6 +30,7 @@ import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@ -292,28 +293,10 @@ public class RabbitAutoConfigurationTests {
assertThat(adviceChain).isNull(); assertThat(adviceChain).isNull();
} }
@Test
@Deprecated
public void testSimpleRabbitListenerContainerFactoryWithCustomDeprecatedSettings() {
testSimpleRabbitListenerContainerFactoryWithCustomSettings(
"spring.rabbitmq.listener.retry.enabled:true",
"spring.rabbitmq.listener.retry.maxAttempts:4",
"spring.rabbitmq.listener.retry.initialInterval:2000",
"spring.rabbitmq.listener.retry.multiplier:1.5",
"spring.rabbitmq.listener.retry.maxInterval:5000",
"spring.rabbitmq.listener.autoStartup:false",
"spring.rabbitmq.listener.acknowledgeMode:manual",
"spring.rabbitmq.listener.concurrency:5",
"spring.rabbitmq.listener.maxConcurrency:10",
"spring.rabbitmq.listener.prefetch:40",
"spring.rabbitmq.listener.defaultRequeueRejected:false",
"spring.rabbitmq.listener.idleEventInterval:5",
"spring.rabbitmq.listener.transactionSize:20");
}
@Test @Test
public void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { public void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
testSimpleRabbitListenerContainerFactoryWithCustomSettings( load(new Class<?>[] { MessageConvertersConfiguration.class,
MessageRecoverersConfiguration.class },
"spring.rabbitmq.listener.simple.retry.enabled:true", "spring.rabbitmq.listener.simple.retry.enabled:true",
"spring.rabbitmq.listener.simple.retry.maxAttempts:4", "spring.rabbitmq.listener.simple.retry.maxAttempts:4",
"spring.rabbitmq.listener.simple.retry.initialInterval:2000", "spring.rabbitmq.listener.simple.retry.initialInterval:2000",
@ -327,26 +310,91 @@ public class RabbitAutoConfigurationTests {
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false", "spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
"spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.idleEventInterval:5",
"spring.rabbitmq.listener.simple.transactionSize:20"); "spring.rabbitmq.listener.simple.transactionSize:20");
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
.getBean("rabbitListenerContainerFactory",
SimpleRabbitListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5);
assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10);
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
checkCommonProps(dfa);
} }
private void testSimpleRabbitListenerContainerFactoryWithCustomSettings(String... environment) { @Test
public void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
load(new Class<?>[] { MessageConvertersConfiguration.class, load(new Class<?>[] { MessageConvertersConfiguration.class,
MessageRecoverersConfiguration.class }, environment); MessageRecoverersConfiguration.class },
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context "spring.rabbitmq.listener.type:direct",
"spring.rabbitmq.listener.direct.retry.enabled:true",
"spring.rabbitmq.listener.direct.retry.maxAttempts:4",
"spring.rabbitmq.listener.direct.retry.initialInterval:2000",
"spring.rabbitmq.listener.direct.retry.multiplier:1.5",
"spring.rabbitmq.listener.direct.retry.maxInterval:5000",
"spring.rabbitmq.listener.direct.autoStartup:false",
"spring.rabbitmq.listener.direct.acknowledgeMode:manual",
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
"spring.rabbitmq.listener.direct.prefetch:40",
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
"spring.rabbitmq.listener.direct.idleEventInterval:5");
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context
.getBean("rabbitListenerContainerFactory", .getBean("rabbitListenerContainerFactory",
SimpleRabbitListenerContainerFactory.class); DirectRabbitListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory);
assertThat(dfa.getPropertyValue("consumersPerQueue")).isEqualTo(5);
checkCommonProps(dfa); checkCommonProps(dfa);
} }
@Test
public void testRabbitListenerContainerFactoryConfigurersAreAvailable() {
load(TestConfiguration.class,
"spring.rabbitmq.listener.simple.concurrency:5",
"spring.rabbitmq.listener.simple.maxConcurrency:10",
"spring.rabbitmq.listener.simple.prefetch:40",
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
"spring.rabbitmq.listener.direct.prefetch:40");
assertThat(this.context.getBeansOfType(
SimpleRabbitListenerContainerFactoryConfigurer.class)).hasSize(1);
assertThat(this.context.getBeansOfType(
DirectRabbitListenerContainerFactoryConfigurer.class)).hasSize(1);
}
@Test
public void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() {
load(TestConfiguration.class,
"spring.rabbitmq.listener.type:direct", // listener type is irrelevant
"spring.rabbitmq.listener.simple.concurrency:5",
"spring.rabbitmq.listener.simple.maxConcurrency:10",
"spring.rabbitmq.listener.simple.prefetch:40");
SimpleRabbitListenerContainerFactoryConfigurer configurer = this.context
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
SimpleRabbitListenerContainerFactory factory =
mock(SimpleRabbitListenerContainerFactory.class);
configurer.configure(factory, mock(ConnectionFactory.class));
verify(factory).setConcurrentConsumers(5);
verify(factory).setMaxConcurrentConsumers(10);
verify(factory).setPrefetchCount(40);
}
@Test
public void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() {
load(TestConfiguration.class,
"spring.rabbitmq.listener.type:simple", // listener type is irrelevant
"spring.rabbitmq.listener.direct.consumers-per-queue:5",
"spring.rabbitmq.listener.direct.prefetch:40");
DirectRabbitListenerContainerFactoryConfigurer configurer = this.context
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
DirectRabbitListenerContainerFactory factory =
mock(DirectRabbitListenerContainerFactory.class);
configurer.configure(factory, mock(ConnectionFactory.class));
verify(factory).setConsumersPerQueue(5);
verify(factory).setPrefetchCount(40);
}
private void checkCommonProps(DirectFieldAccessor dfa) { private void checkCommonProps(DirectFieldAccessor dfa) {
assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE); assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE);
assertThat(dfa.getPropertyValue("acknowledgeMode")) assertThat(dfa.getPropertyValue("acknowledgeMode"))
.isEqualTo(AcknowledgeMode.MANUAL); .isEqualTo(AcknowledgeMode.MANUAL);
assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5);
assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10);
assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40); assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40);
assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20);
assertThat(dfa.getPropertyValue("messageConverter")) assertThat(dfa.getPropertyValue("messageConverter"))
.isSameAs(this.context.getBean("myMessageConverter")); .isSameAs(this.context.getBean("myMessageConverter"));
assertThat(dfa.getPropertyValue("defaultRequeueRejected")) assertThat(dfa.getPropertyValue("defaultRequeueRejected"))

@ -975,12 +975,13 @@ content into your application; rather pick only the properties that you need.
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite. spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean. spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
spring.rabbitmq.host=localhost # RabbitMQ host. spring.rabbitmq.host=localhost # RabbitMQ host.
spring.rabbitmq.listener.direct.consumers-per-queue= # Number of consumers per queue.
spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container.
spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup. spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup.
spring.rabbitmq.listener.simple.concurrency= # Minimum number of consumers. spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads.
spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures.
spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds. spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds.
spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of consumers. spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker.
spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled.
spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
@ -988,7 +989,8 @@ content into your application; rather pick only the properties that you need.
spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts.
spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful. spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful.
spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count.
spring.rabbitmq.listener.type=simple # Listener container type.
spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.port=5672 # RabbitMQ port.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.

@ -4653,9 +4653,10 @@ the broker connection is lost. Retries are disabled by default.
==== Receiving a message ==== Receiving a message
When the Rabbit infrastructure is present, any bean can be annotated with When the Rabbit infrastructure is present, any bean can be annotated with
`@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory` `@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory`
has been defined, a default one is configured automatically. If a `MessageConverter` or has been defined, a default `SimpleRabbitListenerContainerFactory` is configured
`MessageRecoverer` beans are defined, they are associated automatically to the default automatically and you can switch to a direct container using the
factory. `spring.rabbitmq.listener.type` property. If a `MessageConverter` or `MessageRecoverer`
beans are defined, they are associated automatically to the default factory.
The following component creates a listener endpoint on the `someQueue` queue: The following component creates a listener endpoint on the `someQueue` queue:
@ -4677,9 +4678,13 @@ for more details.
If you need to create more `RabbitListenerContainerFactory` instances or if you want to If you need to create more `RabbitListenerContainerFactory` instances or if you want to
override the default, Spring Boot provides a override the default, Spring Boot provides a
`SimpleRabbitListenerContainerFactoryConfigurer` that you can use to initialize a `SimpleRabbitListenerContainerFactoryConfigurer` and
`SimpleRabbitListenerContainerFactory` with the same settings as the one that is `DirectRabbitListenerContainerFactoryConfigurer` that you can use to initialize a
auto-configured. `SimpleRabbitListenerContainerFactory` and `DirectRabbitListenerContainerFactory` with the
same settings as the one used by the auto-configuration.
TIP: It doesn't matter which container type you've chosen, those two beans are exposed by
the auto-configuration.
For instance, the following exposes another factory that uses a specific For instance, the following exposes another factory that uses a specific
`MessageConverter`: `MessageConverter`:

Loading…
Cancel
Save