Update JMS auto-configuration to support XA
Update JMS auto-configuration for ActiveMQ and HornetQ to support XA transactions. See gh-947pull/1323/merge
parent
8219f2be4c
commit
da88bb4791
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.activemq;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Factory to create a {@link ActiveMQConnectionFactory} instance from properties defined
|
||||
* in {@link ActiveMQProperties}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @since 1.2.0
|
||||
*/
|
||||
class ActiveMQConnectionFactoryFactory {
|
||||
|
||||
private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false";
|
||||
|
||||
private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616";
|
||||
|
||||
private final ActiveMQProperties properties;
|
||||
|
||||
public ActiveMQConnectionFactoryFactory(ActiveMQProperties properties) {
|
||||
Assert.notNull(properties, "Properties must not be null");
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public <T extends ActiveMQConnectionFactory> T createConnectionFactory(
|
||||
Class<T> factoryClass) {
|
||||
try {
|
||||
return doCreateConnectionFactory(factoryClass);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new IllegalStateException("Unable to create "
|
||||
+ "ActiveMQConnectionFactory", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(
|
||||
Class<T> factoryClass) throws Exception {
|
||||
String brokerUrl = determineBrokerUrl();
|
||||
String user = this.properties.getUser();
|
||||
String password = this.properties.getPassword();
|
||||
if (StringUtils.hasLength(user) && StringUtils.hasLength(password)) {
|
||||
return factoryClass.getConstructor(String.class, String.class, String.class)
|
||||
.newInstance(user, password, brokerUrl);
|
||||
}
|
||||
return factoryClass.getConstructor(String.class).newInstance(brokerUrl);
|
||||
}
|
||||
|
||||
String determineBrokerUrl() {
|
||||
if (this.properties.getBrokerUrl() != null) {
|
||||
return this.properties.getBrokerUrl();
|
||||
}
|
||||
if (this.properties.isInMemory()) {
|
||||
return DEFAULT_EMBEDDED_BROKER_URL;
|
||||
}
|
||||
return DEFAULT_NETWORK_BROKER_URL;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.activemq;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.transaction.TransactionManager;
|
||||
|
||||
import org.apache.activemq.ActiveMQXAConnectionFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.jta.XAConnectionFactoryWrapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* Configuration for ActiveMQ XA {@link ConnectionFactory}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(TransactionManager.class)
|
||||
@ConditionalOnBean(XAConnectionFactoryWrapper.class)
|
||||
@ConditionalOnMissingBean(ConnectionFactory.class)
|
||||
class ActiveMQXAConnectionFactoryConfiguration {
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
|
||||
XAConnectionFactoryWrapper wrapper) throws Exception {
|
||||
ActiveMQXAConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(
|
||||
properties).createConnectionFactory(ActiveMQXAConnectionFactory.class);
|
||||
return wrapper.wrapConnectionFactory(connectionFactory);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.hornetq;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.hornetq.jms.client.HornetQConnectionFactory;
|
||||
import org.hornetq.jms.server.embedded.EmbeddedJMS;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* Configuration for HornetQ {@link ConnectionFactory}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(ConnectionFactory.class)
|
||||
class HornetQConnectionFactoryConfiguration {
|
||||
|
||||
private static Log logger = LogFactory
|
||||
.getLog(HornetQEmbeddedServerConfiguration.class);
|
||||
|
||||
// Ensure JMS is setup before XA
|
||||
@Autowired(required = false)
|
||||
private EmbeddedJMS embeddedJMS;
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory jmsConnectionFactory(HornetQProperties properties) {
|
||||
if (this.embeddedJMS != null && logger.isDebugEnabled()) {
|
||||
logger.debug("Using embdedded HornetQ broker");
|
||||
}
|
||||
return new HornetQConnectionFactoryFactory(properties)
|
||||
.createConnectionFactory(HornetQConnectionFactory.class);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.hornetq;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.hornetq.api.core.TransportConfiguration;
|
||||
import org.hornetq.api.core.client.HornetQClient;
|
||||
import org.hornetq.api.core.client.ServerLocator;
|
||||
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
|
||||
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
|
||||
import org.hornetq.core.remoting.impl.netty.TransportConstants;
|
||||
import org.hornetq.jms.client.HornetQConnectionFactory;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* Factory to create a {@link HornetQConnectionFactory} instance from properties defined
|
||||
* in {@link HornetQProperties}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @author Stephane Nicoll
|
||||
* @since 1.2.0
|
||||
*/
|
||||
class HornetQConnectionFactoryFactory {
|
||||
|
||||
static final String EMBEDDED_JMS_CLASS = "org.hornetq.jms.server.embedded.EmbeddedJMS";
|
||||
|
||||
private final HornetQProperties properties;
|
||||
|
||||
public HornetQConnectionFactoryFactory(HornetQProperties properties) {
|
||||
Assert.notNull(properties, "Properties must not be null");
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public <T extends HornetQConnectionFactory> T createConnectionFactory(
|
||||
Class<T> factoryClass) {
|
||||
try {
|
||||
return doCreateConnectionFactory(factoryClass);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new IllegalStateException("Unable to create "
|
||||
+ "HornetQConnectionFactory", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends HornetQConnectionFactory> T doCreateConnectionFactory(
|
||||
Class<T> factoryClass) throws Exception {
|
||||
HornetQMode mode = this.properties.getMode();
|
||||
if (mode == null) {
|
||||
mode = deduceMode();
|
||||
}
|
||||
if (mode == HornetQMode.EMBEDDED) {
|
||||
return createEmbeddedConnectionFactory(factoryClass);
|
||||
}
|
||||
return createNativeConnectionFactory(factoryClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduce the {@link HornetQMode} to use if none has been set.
|
||||
*/
|
||||
private HornetQMode deduceMode() {
|
||||
if (this.properties.getEmbedded().isEnabled()
|
||||
&& ClassUtils.isPresent(EMBEDDED_JMS_CLASS, null)) {
|
||||
return HornetQMode.EMBEDDED;
|
||||
}
|
||||
return HornetQMode.NATIVE;
|
||||
}
|
||||
|
||||
private <T extends HornetQConnectionFactory> T createEmbeddedConnectionFactory(
|
||||
Class<T> factoryClass) throws Exception {
|
||||
try {
|
||||
TransportConfiguration transportConfiguration = new TransportConfiguration(
|
||||
InVMConnectorFactory.class.getName(), this.properties.getEmbedded()
|
||||
.generateTransportParameters());
|
||||
ServerLocator serviceLocator = HornetQClient
|
||||
.createServerLocatorWithoutHA(transportConfiguration);
|
||||
return factoryClass.getConstructor(ServerLocator.class).newInstance(
|
||||
serviceLocator);
|
||||
}
|
||||
catch (NoClassDefFoundError ex) {
|
||||
throw new IllegalStateException("Unable to create InVM "
|
||||
+ "HornetQ connection, ensure that hornet-jms-server.jar "
|
||||
+ "is in the classpath", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends HornetQConnectionFactory> T createNativeConnectionFactory(
|
||||
Class<T> factoryClass) throws Exception {
|
||||
Map<String, Object> params = new HashMap<String, Object>();
|
||||
params.put(TransportConstants.HOST_PROP_NAME, this.properties.getHost());
|
||||
params.put(TransportConstants.PORT_PROP_NAME, this.properties.getPort());
|
||||
TransportConfiguration transportConfiguration = new TransportConfiguration(
|
||||
NettyConnectorFactory.class.getName(), params);
|
||||
Constructor<T> constructor = factoryClass.getConstructor(boolean.class,
|
||||
TransportConfiguration[].class);
|
||||
return constructor.newInstance(false,
|
||||
new TransportConfiguration[] { transportConfiguration });
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.hornetq;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.hornetq.jms.server.config.JMSConfiguration;
|
||||
import org.hornetq.jms.server.config.JMSQueueConfiguration;
|
||||
import org.hornetq.jms.server.config.TopicConfiguration;
|
||||
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
|
||||
import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
|
||||
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
|
||||
import org.hornetq.jms.server.embedded.EmbeddedJMS;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
|
||||
|
||||
/**
|
||||
* Configuration used to create the embedded HornetQ server.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @author Stephane Nicoll
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = HornetQConnectionFactoryFactory.EMBEDDED_JMS_CLASS)
|
||||
@ConditionalOnProperty(prefix = "spring.hornetq.embedded", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
class HornetQEmbeddedServerConfiguration {
|
||||
|
||||
@Autowired
|
||||
private HornetQProperties properties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private List<HornetQConfigurationCustomizer> configurationCustomizers;
|
||||
|
||||
@Autowired(required = false)
|
||||
private List<JMSQueueConfiguration> queuesConfiguration;
|
||||
|
||||
@Autowired(required = false)
|
||||
private List<TopicConfiguration> topicsConfiguration;
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public org.hornetq.core.config.Configuration hornetQConfiguration() {
|
||||
return new HornetQEmbeddedConfigurationFactory(this.properties)
|
||||
.createConfiguration();
|
||||
}
|
||||
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnMissingBean
|
||||
public EmbeddedJMS hornetQServer(org.hornetq.core.config.Configuration configuration,
|
||||
JMSConfiguration jmsConfiguration) {
|
||||
EmbeddedJMS server = new EmbeddedJMS();
|
||||
customize(configuration);
|
||||
server.setConfiguration(configuration);
|
||||
server.setJmsConfiguration(jmsConfiguration);
|
||||
server.setRegistry(new HornetQNoOpBindingRegistry());
|
||||
return server;
|
||||
}
|
||||
|
||||
private void customize(org.hornetq.core.config.Configuration configuration) {
|
||||
if (this.configurationCustomizers != null) {
|
||||
AnnotationAwareOrderComparator.sort(this.configurationCustomizers);
|
||||
for (HornetQConfigurationCustomizer customizer : this.configurationCustomizers) {
|
||||
customizer.customize(configuration);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public JMSConfiguration hornetQJmsConfiguration() {
|
||||
JMSConfiguration configuration = new JMSConfigurationImpl();
|
||||
addAll(configuration.getQueueConfigurations(), this.queuesConfiguration);
|
||||
addAll(configuration.getTopicConfigurations(), this.topicsConfiguration);
|
||||
addQueues(configuration, this.properties.getEmbedded().getQueues());
|
||||
addTopics(configuration, this.properties.getEmbedded().getTopics());
|
||||
return configuration;
|
||||
}
|
||||
|
||||
private <T> void addAll(List<T> list, Collection<? extends T> items) {
|
||||
if (items != null) {
|
||||
list.addAll(items);
|
||||
}
|
||||
}
|
||||
|
||||
private void addQueues(JMSConfiguration configuration, String[] queues) {
|
||||
boolean persistent = this.properties.getEmbedded().isPersistent();
|
||||
for (String queue : queues) {
|
||||
configuration.getQueueConfigurations().add(
|
||||
new JMSQueueConfigurationImpl(queue, null, persistent, "/queue/"
|
||||
+ queue));
|
||||
}
|
||||
}
|
||||
|
||||
private void addTopics(JMSConfiguration configuration, String[] topics) {
|
||||
for (String topic : topics) {
|
||||
configuration.getTopicConfigurations().add(
|
||||
new TopicConfigurationImpl(topic, "/topic/" + topic));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2012-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.hornetq;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.transaction.TransactionManager;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.hornetq.jms.client.HornetQXAConnectionFactory;
|
||||
import org.hornetq.jms.server.embedded.EmbeddedJMS;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.jta.XAConnectionFactoryWrapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* Configuration for HornetQ XA {@link ConnectionFactory}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(ConnectionFactory.class)
|
||||
@ConditionalOnClass(TransactionManager.class)
|
||||
@ConditionalOnBean(XAConnectionFactoryWrapper.class)
|
||||
class HornetQXAConnectionFactoryConfiguration {
|
||||
|
||||
private static Log logger = LogFactory
|
||||
.getLog(HornetQEmbeddedServerConfiguration.class);
|
||||
|
||||
// Ensure JMS is setup before XA
|
||||
@Autowired(required = false)
|
||||
private EmbeddedJMS embeddedJMS;
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory jmsConnectionFactory(HornetQProperties properties,
|
||||
XAConnectionFactoryWrapper wrapper) throws Exception {
|
||||
if (this.embeddedJMS != null && logger.isDebugEnabled()) {
|
||||
logger.debug("Using embdedded HornetQ broker with XA");
|
||||
}
|
||||
return wrapper.wrapConnectionFactory(new HornetQConnectionFactoryFactory(
|
||||
properties).createConnectionFactory(HornetQXAConnectionFactory.class));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue