Add Spring Integration default poller auto-config

When polling consumers or source polling channel adapters are used in
Spring Integration applications, they require some polling policy to
be configured.

This comment auto-configures a PollerMetadata bean which customized
via newly added `spring.integration.poller.*` configuration
properties or overriden completely be user-defined bean.

See gh-27992
pull/28141/head
Artem Bilan 3 years ago committed by Phillip Webb
parent 5e426394db
commit b2d1423e34

@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.integration; package org.springframework.boot.autoconfigure.integration;
import java.time.Duration;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.sql.DataSource; import javax.sql.DataSource;
@ -56,10 +58,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway; import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
@ -110,6 +116,29 @@ public class IntegrationAutoConfiguration {
@EnableIntegration @EnableIntegration
protected static class IntegrationConfiguration { protected static class IntegrationConfiguration {
@Bean(PollerMetadata.DEFAULT_POLLER)
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) {
IntegrationProperties.Poller poller = integrationProperties.getPoller();
int hasCron = poller.getCron() != null ? 1 : 0;
int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0;
int hasFixedRate = poller.getFixedRate() != null ? 1 : 0;
Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1,
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.");
PollerMetadata pollerMetadata = new PollerMetadata();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger);
map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate())
.as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> {
map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay);
trigger.setFixedRate(poller.getFixedRate() != null);
return trigger;
}).to(pollerMetadata::setTrigger);
return pollerMetadata;
}
} }
/** /**

@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.integration; package org.springframework.boot.autoconfigure.integration;
import java.net.URI; import java.net.URI;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -44,6 +45,8 @@ public class IntegrationProperties {
private final RSocket rsocket = new RSocket(); private final RSocket rsocket = new RSocket();
private final Poller poller = new Poller();
public Channel getChannel() { public Channel getChannel() {
return this.channel; return this.channel;
} }
@ -64,6 +67,10 @@ public class IntegrationProperties {
return this.rsocket; return this.rsocket;
} }
public Poller getPoller() {
return this.poller;
}
public static class Channel { public static class Channel {
/** /**
@ -295,4 +302,88 @@ public class IntegrationProperties {
} }
public static class Poller {
/**
* Maximum of messages to poll per polling cycle.
*/
private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED
/**
* How long to wait for messages on poll.
*/
private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT
/**
* Polling delay period. Mutually explusive with 'cron' and 'fixedRate'.
*/
private Duration fixedDelay;
/**
* Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'.
*/
private Duration fixedRate;
/**
* Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for
* 'cron'.
*/
private Duration initialDelay;
/**
* Cron expression for polling. Mutually explusive with 'fixedDelay' and
* 'fixedRate'.
*/
private String cron;
public int getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
public Duration getReceiveTimeout() {
return this.receiveTimeout;
}
public void setReceiveTimeout(Duration receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public Duration getFixedDelay() {
return this.fixedDelay;
}
public void setFixedDelay(Duration fixedDelay) {
this.fixedDelay = fixedDelay;
}
public Duration getFixedRate() {
return this.fixedRate;
}
public void setFixedRate(Duration fixedRate) {
this.fixedRate = fixedRate;
}
public Duration getInitialDelay() {
return this.initialDelay;
}
public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}
public String getCron() {
return this.cron;
}
public void setCron(String cron) {
this.cron = cron;
}
}
} }

@ -16,12 +16,17 @@
package org.springframework.boot.autoconfigure.integration; package org.springframework.boot.autoconfigure.integration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.sql.DataSource; import javax.sql.DataSource;
import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ResourceLoader; import org.springframework.core.io.ResourceLoader;
import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.IntegrationManagementConfigurer; import org.springframework.integration.config.IntegrationManagementConfigurer;
import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.endpoint.MessageProcessorMessageSource; import org.springframework.integration.endpoint.MessageProcessorMessageSource;
@ -55,13 +62,17 @@ import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jmx.export.MBeanExporter; import org.springframework.jmx.export.MBeanExporter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@ -390,6 +401,54 @@ class IntegrationAutoConfigurationTests {
.hasBean("customInitializer")); .hasBean("customInitializer"));
} }
@Test
void defaultPoller() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER)
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED)
.hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT)
.hasFieldOrPropertyWithValue("trigger", null);
GenericMessage<String> testMessage = new GenericMessage<>("test");
context.getBean("testChannel", QueueChannel.class).send(testMessage);
@SuppressWarnings("unchecked")
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
});
}
@Test
void customPollerProperties() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.max-messages-per-poll=1",
"spring.integration.poller.receive-timeout=10s")
.run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class)
.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class)
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L)
.hasFieldOrPropertyWithValue("receiveTimeout", 10000L)
.extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class)
.hasFieldOrPropertyWithValue("expression", "* * * ? * *");
GenericMessage<String> testMessage = new GenericMessage<>("test");
context.getBean("testChannel", QueueChannel.class).send(testMessage);
@SuppressWarnings("unchecked")
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
});
}
@Test
void triggerPropertiesAreMutuallyExclusive() {
this.contextRunner
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.fixed-delay=1s")
.run((context) -> assertThat(context).hasFailed().getFailure()
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining(
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."));
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter { static class CustomMBeanExporter {
@ -478,4 +537,25 @@ class IntegrationAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
static class PollingConsumerConfiguration {
@Bean
QueueChannel testChannel() {
return new QueueChannel();
}
@Bean
BlockingQueue<Message<?>> sink() {
return new LinkedBlockingQueue<>();
}
@ServiceActivator(inputChannel = "testChannel")
@Bean
MessageHandler handler(BlockingQueue<Message<?>> sink) {
return sink::add;
}
}
} }

@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport
If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation. If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation.
Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>. Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>.
The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties.
Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules. Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules.
If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX. If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.

Loading…
Cancel
Save