Add config prop for JMS listener's sessionTransacted flag

This commit introduces `spring.jms.listener.session-transacted` property
in order to enable explicit configuration of `sessionTransacted` on the
`DefaultMessageListenerContainer`.

Prior to this commit, `sessionTransacted` would be configured implicitly
based on presence of `JtaTransactionManager`.

See gh-37473
pull/37607/head
Vedran Pavic 1 year ago committed by Andy Wilkinson
parent b7facec4a1
commit 79e2cb3ec1

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -32,6 +32,7 @@ import org.springframework.util.Assert;
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Vedran Pavic
* @since 1.3.3 * @since 1.3.3
*/ */
public final class DefaultJmsListenerContainerFactoryConfigurer { public final class DefaultJmsListenerContainerFactoryConfigurer {
@ -101,12 +102,16 @@ public final class DefaultJmsListenerContainerFactoryConfigurer {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
factory.setConnectionFactory(connectionFactory); factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(this.jmsProperties.isPubSubDomain()); factory.setPubSubDomain(this.jmsProperties.isPubSubDomain());
JmsProperties.Listener listener = this.jmsProperties.getListener();
if (this.transactionManager != null) { if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager); factory.setTransactionManager(this.transactionManager);
} }
else { else if (listener.getSessionTransacted() == null) {
factory.setSessionTransacted(true); factory.setSessionTransacted(true);
} }
if (listener.getSessionTransacted() != null) {
factory.setSessionTransacted(listener.getSessionTransacted());
}
if (this.destinationResolver != null) { if (this.destinationResolver != null) {
factory.setDestinationResolver(this.destinationResolver); factory.setDestinationResolver(this.destinationResolver);
} }
@ -116,7 +121,6 @@ public final class DefaultJmsListenerContainerFactoryConfigurer {
if (this.exceptionListener != null) { if (this.exceptionListener != null) {
factory.setExceptionListener(this.exceptionListener); factory.setExceptionListener(this.exceptionListener);
} }
JmsProperties.Listener listener = this.jmsProperties.getListener();
factory.setAutoStartup(listener.isAutoStartup()); factory.setAutoStartup(listener.isAutoStartup());
factory.setSessionAcknowledgeMode(listener.getSession().getAcknowledgeMode().getMode()); factory.setSessionAcknowledgeMode(listener.getSession().getAcknowledgeMode().getMode());
String concurrency = listener.formatConcurrency(); String concurrency = listener.formatConcurrency();

@ -141,6 +141,11 @@ public class JmsProperties {
*/ */
private boolean autoStartup = true; private boolean autoStartup = true;
/**
* Whether the container should use transacted JMS sessions.
*/
private Boolean sessionTransacted;
/** /**
* Minimum number of concurrent consumers. When max-concurrency is not specified * Minimum number of concurrent consumers. When max-concurrency is not specified
* the minimum will also be used as the maximum. * the minimum will also be used as the maximum.
@ -180,6 +185,14 @@ public class JmsProperties {
this.session.setAcknowledgeMode(acknowledgeMode); this.session.setAcknowledgeMode(acknowledgeMode);
} }
public Boolean getSessionTransacted() {
return this.sessionTransacted;
}
public void setSessionTransacted(Boolean sessionTransacted) {
this.sessionTransacted = sessionTransacted;
}
@DeprecatedConfigurationProperty(replacement = "spring.jms.listener.min-concurrency", since = "3.2.0") @DeprecatedConfigurationProperty(replacement = "spring.jms.listener.min-concurrency", since = "3.2.0")
@Deprecated(since = "3.2.0", forRemoval = true) @Deprecated(since = "3.2.0", forRemoval = true)
public Integer getConcurrency() { public Integer getConcurrency() {

@ -57,6 +57,7 @@ import static org.mockito.Mockito.mock;
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Aurélien Leboulanger * @author Aurélien Leboulanger
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Vedran Pavic
*/ */
class JmsAutoConfigurationTests { class JmsAutoConfigurationTests {
@ -143,8 +144,9 @@ class JmsAutoConfigurationTests {
void testJmsListenerContainerFactoryWithCustomSettings() { void testJmsListenerContainerFactoryWithCustomSettings() {
this.contextRunner.withUserConfiguration(EnableJmsConfiguration.class) this.contextRunner.withUserConfiguration(EnableJmsConfiguration.class)
.withPropertyValues("spring.jms.listener.autoStartup=false", .withPropertyValues("spring.jms.listener.autoStartup=false",
"spring.jms.listener.session.acknowledgeMode=client", "spring.jms.listener.minConcurrency=2", "spring.jms.listener.session.acknowledgeMode=client", "spring.jms.listener.sessionTransacted=false",
"spring.jms.listener.receiveTimeout=2s", "spring.jms.listener.maxConcurrency=10") "spring.jms.listener.minConcurrency=2", "spring.jms.listener.receiveTimeout=2s",
"spring.jms.listener.maxConcurrency=10")
.run(this::testJmsListenerContainerFactoryWithCustomSettings); .run(this::testJmsListenerContainerFactoryWithCustomSettings);
} }
@ -152,6 +154,7 @@ class JmsAutoConfigurationTests {
DefaultMessageListenerContainer container = getContainer(loaded, "jmsListenerContainerFactory"); DefaultMessageListenerContainer container = getContainer(loaded, "jmsListenerContainerFactory");
assertThat(container.isAutoStartup()).isFalse(); assertThat(container.isAutoStartup()).isFalse();
assertThat(container.getSessionAcknowledgeMode()).isEqualTo(Session.CLIENT_ACKNOWLEDGE); assertThat(container.getSessionAcknowledgeMode()).isEqualTo(Session.CLIENT_ACKNOWLEDGE);
assertThat(container.isSessionTransacted()).isFalse();
assertThat(container.getConcurrentConsumers()).isEqualTo(2); assertThat(container.getConcurrentConsumers()).isEqualTo(2);
assertThat(container.getMaxConcurrentConsumers()).isEqualTo(10); assertThat(container.getMaxConcurrentConsumers()).isEqualTo(10);
assertThat(container).hasFieldOrPropertyWithValue("receiveTimeout", 2000L); assertThat(container).hasFieldOrPropertyWithValue("receiveTimeout", 2000L);
@ -179,6 +182,18 @@ class JmsAutoConfigurationTests {
}); });
} }
@Test
void testDefaultContainerFactoryWithJtaTransactionManagerAndSessionTransactedEnabled() {
this.contextRunner.withUserConfiguration(TestConfiguration7.class, EnableJmsConfiguration.class)
.withPropertyValues("spring.jms.listener.sessionTransacted=true")
.run((context) -> {
DefaultMessageListenerContainer container = getContainer(context, "jmsListenerContainerFactory");
assertThat(container.isSessionTransacted()).isTrue();
assertThat(container).hasFieldOrPropertyWithValue("transactionManager",
context.getBean(JtaTransactionManager.class));
});
}
@Test @Test
void testDefaultContainerFactoryNonJtaTransactionManager() { void testDefaultContainerFactoryNonJtaTransactionManager() {
this.contextRunner.withUserConfiguration(TestConfiguration8.class, EnableJmsConfiguration.class) this.contextRunner.withUserConfiguration(TestConfiguration8.class, EnableJmsConfiguration.class)
@ -198,6 +213,17 @@ class JmsAutoConfigurationTests {
}); });
} }
@Test
void testDefaultContainerFactoryNoTransactionManagerAndSessionTransactedDisabled() {
this.contextRunner.withUserConfiguration(EnableJmsConfiguration.class)
.withPropertyValues("spring.jms.listener.sessionTransacted=false")
.run((context) -> {
DefaultMessageListenerContainer container = getContainer(context, "jmsListenerContainerFactory");
assertThat(container.isSessionTransacted()).isFalse();
assertThat(container).hasFieldOrPropertyWithValue("transactionManager", null);
});
}
@Test @Test
void testDefaultContainerFactoryWithMessageConverters() { void testDefaultContainerFactoryWithMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class, EnableJmsConfiguration.class) this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class, EnableJmsConfiguration.class)

Loading…
Cancel
Save