From b2d1423e34906ac4a319e68b272b8b770224b174 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 14 Sep 2021 12:30:16 -0400 Subject: [PATCH] Add Spring Integration default poller auto-config When polling consumers or source polling channel adapters are used in Spring Integration applications, they require some polling policy to be configured. This comment auto-configures a PollerMetadata bean which customized via newly added `spring.integration.poller.*` configuration properties or overriden completely be user-defined bean. See gh-27992 --- .../IntegrationAutoConfiguration.java | 29 ++++++ .../integration/IntegrationProperties.java | 91 +++++++++++++++++++ .../IntegrationAutoConfigurationTests.java | 80 ++++++++++++++++ .../messaging/spring-integration.adoc | 1 + 4 files changed, 201 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java index 71dfd778e1..6c26a6351f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.integration; +import java.time.Duration; + import javax.management.MBeanServer; import javax.sql.DataSource; @@ -56,10 +58,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.scheduling.support.PeriodicTrigger; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -110,6 +116,29 @@ public class IntegrationAutoConfiguration { @EnableIntegration protected static class IntegrationConfiguration { + @Bean(PollerMetadata.DEFAULT_POLLER) + @ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) { + IntegrationProperties.Poller poller = integrationProperties.getPoller(); + int hasCron = poller.getCron() != null ? 1 : 0; + int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0; + int hasFixedRate = poller.getFixedRate() != null ? 1 : 0; + Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1, + "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."); + PollerMetadata pollerMetadata = new PollerMetadata(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll); + map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout); + map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger); + map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate()) + .as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> { + map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay); + trigger.setFixedRate(poller.getFixedRate() != null); + return trigger; + }).to(pollerMetadata::setTrigger); + return pollerMetadata; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java index ed4109a197..b22c930aae 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.integration; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -44,6 +45,8 @@ public class IntegrationProperties { private final RSocket rsocket = new RSocket(); + private final Poller poller = new Poller(); + public Channel getChannel() { return this.channel; } @@ -64,6 +67,10 @@ public class IntegrationProperties { return this.rsocket; } + public Poller getPoller() { + return this.poller; + } + public static class Channel { /** @@ -295,4 +302,88 @@ public class IntegrationProperties { } + public static class Poller { + + /** + * Maximum of messages to poll per polling cycle. + */ + private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED + + /** + * How long to wait for messages on poll. + */ + private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT + + /** + * Polling delay period. Mutually explusive with 'cron' and 'fixedRate'. + */ + private Duration fixedDelay; + + /** + * Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'. + */ + private Duration fixedRate; + + /** + * Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for + * 'cron'. + */ + private Duration initialDelay; + + /** + * Cron expression for polling. Mutually explusive with 'fixedDelay' and + * 'fixedRate'. + */ + private String cron; + + public int getMaxMessagesPerPoll() { + return this.maxMessagesPerPoll; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public Duration getReceiveTimeout() { + return this.receiveTimeout; + } + + public void setReceiveTimeout(Duration receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public Duration getFixedDelay() { + return this.fixedDelay; + } + + public void setFixedDelay(Duration fixedDelay) { + this.fixedDelay = fixedDelay; + } + + public Duration getFixedRate() { + return this.fixedRate; + } + + public void setFixedRate(Duration fixedRate) { + this.fixedRate = fixedRate; + } + + public Duration getInitialDelay() { + return this.initialDelay; + } + + public void setInitialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + } + + public String getCron() { + return this.cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java index ab1f994e36..6ae281eadc 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java @@ -16,12 +16,17 @@ package org.springframework.boot.autoconfigure.integration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; import javax.sql.DataSource; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.io.ResourceLoader; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.IntegrationManagementConfigurer; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.MessageProcessorMessageSource; @@ -55,13 +62,17 @@ import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jmx.export.MBeanExporter; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; +import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -390,6 +401,54 @@ class IntegrationAutoConfigurationTests { .hasBean("customInitializer")); } + @Test + void defaultPoller() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER) + .hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED) + .hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT) + .hasFieldOrPropertyWithValue("trigger", null); + + GenericMessage testMessage = new GenericMessage<>("test"); + context.getBean("testChannel", QueueChannel.class).send(testMessage); + @SuppressWarnings("unchecked") + BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); + assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + }); + } + + @Test + void customPollerProperties() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class) + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.max-messages-per-poll=1", + "spring.integration.poller.receive-timeout=10s") + .run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class) + .getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class) + .hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L) + .hasFieldOrPropertyWithValue("receiveTimeout", 10000L) + .extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class) + .hasFieldOrPropertyWithValue("expression", "* * * ? * *"); + + GenericMessage testMessage = new GenericMessage<>("test"); + context.getBean("testChannel", QueueChannel.class).send(testMessage); + @SuppressWarnings("unchecked") + BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); + assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + }); + } + + @Test + void triggerPropertiesAreMutuallyExclusive() { + this.contextRunner + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.fixed-delay=1s") + .run((context) -> assertThat(context).hasFailed().getFailure() + .hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining( + "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.")); + } + @Configuration(proxyBeanMethods = false) static class CustomMBeanExporter { @@ -478,4 +537,25 @@ class IntegrationAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class PollingConsumerConfiguration { + + @Bean + QueueChannel testChannel() { + return new QueueChannel(); + } + + @Bean + BlockingQueue> sink() { + return new LinkedBlockingQueue<>(); + } + + @ServiceActivator(inputChannel = "testChannel") + @Bean + MessageHandler handler(BlockingQueue> sink) { + return sink::add; + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc index 607b653bda..2a7731db9a 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc @@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation. Spring Integration polling logic relies <>. +The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties. Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules. If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.