diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/build.gradle b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/build.gradle deleted file mode 100644 index 34c98479a7..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/build.gradle +++ /dev/null @@ -1,15 +0,0 @@ -plugins { - id "java" - id "org.springframework.boot.conventions" -} - -description = "Spring Boot Pulsar smoke test" - -dependencies { - implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive")) - testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test")) - testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support")) - testImplementation("org.awaitility:awaitility") - testImplementation("org.testcontainers:junit-jupiter") - testImplementation("org.testcontainers:pulsar") -} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessage.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessage.java deleted file mode 100644 index 7fde2d1b97..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessage.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2012-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package smoketest.pulsar.reactive; - -record SampleMessage(Integer id, String content) { -} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessageConsumer.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessageConsumer.java deleted file mode 100644 index 92ed71072e..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleMessageConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2012-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package smoketest.pulsar.reactive; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import reactor.core.publisher.Mono; - -import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; -import org.springframework.stereotype.Component; - -@Component -class SampleMessageConsumer { - - private List consumed = new CopyOnWriteArrayList<>(); - - List getConsumed() { - return this.consumed; - } - - @ReactivePulsarListener(topics = SampleReactivePulsarApplication.TOPIC) - Mono consumeMessagesFromPulsarTopic(SampleMessage msg) { - System.out.println("**** CONSUME: " + msg); - this.consumed.add(msg); - return Mono.empty(); - } - -} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/resources/application.properties b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/resources/application.properties deleted file mode 100644 index cdb9707b22..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.pulsar.reactive.consumer.subscription-initial-position=earliest diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/test/java/smoketest/pulsar/reactive/SampleReactivePulsarApplicationTests.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/test/java/smoketest/pulsar/reactive/SampleReactivePulsarApplicationTests.java deleted file mode 100644 index 48643ab308..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/test/java/smoketest/pulsar/reactive/SampleReactivePulsarApplicationTests.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2012-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package smoketest.pulsar.reactive; - -import java.time.Duration; -import java.util.stream.IntStream; - -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.PulsarContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.testsupport.testcontainers.DockerImageNames; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; - -import static org.assertj.core.api.Assertions.assertThat; - -@SpringBootTest -@Testcontainers(disabledWithoutDocker = true) -class SampleReactivePulsarApplicationTests { - - private static final Integer[] EXPECTED_IDS = IntStream.range(0, 10).boxed().toArray(Integer[]::new); - - @Container - private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar()) - .withStartupAttempts(2) - .withStartupTimeout(Duration.ofMinutes(3)); - - @DynamicPropertySource - static void pulsarProperties(DynamicPropertyRegistry registry) { - registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl); - registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl); - } - - @Test - void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) { - Awaitility.await() - .atMost(Duration.ofMinutes(3)) - .with() - .pollInterval(Duration.ofMillis(500)) - .untilAsserted(() -> hasExpectedIds(consumer)); - } - - private void hasExpectedIds(SampleMessageConsumer consumer) { - assertThat(consumer.getConsumed()).extracting(SampleMessage::id).containsExactly(EXPECTED_IDS); - } - -} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/build.gradle b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/build.gradle index e60e0ba606..6058d1127a 100644 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/build.gradle +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/build.gradle @@ -7,6 +7,7 @@ description = "Spring Boot Pulsar smoke test" dependencies { implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar")) + implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive")) testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test")) testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support")) testImplementation("org.awaitility:awaitility") diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java new file mode 100644 index 0000000000..32f939bdb4 --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopic; + +@Configuration(proxyBeanMethods = false) +@Profile("smoketest.pulsar.imperative") +class ImperativeAppConfig { + + private static final Log LOG = LogFactory.getLog(ImperativeAppConfig.class); + + private static final String TOPIC = "pulsar-smoke-test-topic"; + + @Bean + PulsarTopic pulsarTestTopic() { + return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build(); + } + + @Bean + ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + template.send(TOPIC, new SampleMessage(i, "message:" + i)); + LOG.info("++++++PRODUCE IMPERATIVE:(" + i + ")------"); + } + }; + } + + @PulsarListener(topics = TOPIC) + void consumeMessagesFromPulsarTopic(SampleMessage msg) { + LOG.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------"); + } + +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleReactivePulsarApplication.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java similarity index 59% rename from spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleReactivePulsarApplication.java rename to spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java index 460bc4ba4c..0d2b303433 100644 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar-reactive/src/main/java/smoketest/pulsar/reactive/SampleReactivePulsarApplication.java +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ReactiveAppConfig.java @@ -14,22 +14,29 @@ * limitations under the License. */ -package smoketest.pulsar.reactive; +package smoketest.pulsar; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pulsar.reactive.client.api.MessageSpec; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; -@SpringBootApplication -public class SampleReactivePulsarApplication { +@Configuration(proxyBeanMethods = false) +@Profile("smoketest.pulsar.reactive") +class ReactiveAppConfig { - static final String TOPIC = "pulsar-reactive-smoke-test-topic"; + private static final Log LOG = LogFactory.getLog(ReactiveAppConfig.class); + + private static final String TOPIC = "pulsar-reactive-smoke-test-topic"; @Bean PulsarTopic pulsarTestTopic() { @@ -42,12 +49,15 @@ public class SampleReactivePulsarApplication { .map((i) -> new SampleMessage(i, "message:" + i)) .map(MessageSpec::of) .as((msgs) -> template.send(TOPIC, msgs)) - .doOnNext((sendResult) -> System.out.println("*** PRODUCE: " + sendResult.getMessageId())) + .doOnNext((sendResult) -> LOG + .info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------")) .subscribe(); } - public static void main(String[] args) { - SpringApplication.run(SampleReactivePulsarApplication.class, args); + @ReactivePulsarListener(topics = TOPIC) + Mono consumeMessagesFromPulsarTopic(SampleMessage msg) { + LOG.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------"); + return Mono.empty(); } } diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SampleMessageConsumer.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SampleMessageConsumer.java deleted file mode 100644 index 196dcbb5b4..0000000000 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SampleMessageConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2012-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package smoketest.pulsar; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.springframework.pulsar.annotation.PulsarListener; -import org.springframework.stereotype.Component; - -@Component -class SampleMessageConsumer { - - private List consumed = new CopyOnWriteArrayList<>(); - - List getConsumed() { - return this.consumed; - } - - @PulsarListener(topics = SamplePulsarApplication.TOPIC) - void consumeMessagesFromPulsarTopic(SampleMessage msg) { - System.out.println("**** CONSUME: " + msg); - this.consumed.add(msg); - } - -} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplication.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplication.java index adc801e3d7..560967bb2d 100644 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplication.java +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplication.java @@ -16,35 +16,12 @@ package smoketest.pulsar; -import org.apache.pulsar.client.api.MessageId; - -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.pulsar.core.PulsarTemplate; -import org.springframework.pulsar.core.PulsarTopic; @SpringBootApplication public class SamplePulsarApplication { - static final String TOPIC = "pulsar-smoke-test-topic"; - - @Bean - PulsarTopic pulsarTestTopic() { - return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build(); - } - - @Bean - ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate template) { - return (args) -> { - for (int i = 0; i < 10; i++) { - MessageId msgId = template.send(TOPIC, new SampleMessage(i, "message:" + i)); - System.out.println("*** PRODUCE: " + msgId); - } - }; - } - public static void main(String[] args) { SpringApplication.run(SamplePulsarApplication.class, args); } diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/resources/application.properties b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/resources/application.properties index 25502d64c7..b1ae3ec6f4 100644 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/resources/application.properties +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/main/resources/application.properties @@ -1 +1 @@ -spring.pulsar.consumer.subscription-initial-position=earliest +spring.pulsar.consumer.subscription.initial-position=earliest diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/test/java/smoketest/pulsar/SamplePulsarApplicationTests.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/test/java/smoketest/pulsar/SamplePulsarApplicationTests.java index 5877370ba7..e30cf1a640 100644 --- a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/test/java/smoketest/pulsar/SamplePulsarApplicationTests.java +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar/src/test/java/smoketest/pulsar/SamplePulsarApplicationTests.java @@ -17,30 +17,34 @@ package smoketest.pulsar; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.stream.IntStream; import org.awaitility.Awaitility; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.boot.testsupport.testcontainers.DockerImageNames; +import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import static org.assertj.core.api.Assertions.assertThat; -@SpringBootTest @Testcontainers(disabledWithoutDocker = true) +@ExtendWith(OutputCaptureExtension.class) class SamplePulsarApplicationTests { - private static final Integer[] EXPECTED_IDS = IntStream.range(0, 10).boxed().toArray(Integer[]::new); - @Container - private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar()) + static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar()) .withStartupAttempts(2) .withStartupTimeout(Duration.ofMinutes(3)); @@ -50,17 +54,40 @@ class SamplePulsarApplicationTests { registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl); } - @Test - void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) { - Awaitility.await() - .atMost(Duration.ofMinutes(3)) - .with() - .pollInterval(Duration.ofMillis(500)) - .untilAsserted(() -> hasExpectedIds(consumer)); + @Nested + @SpringBootTest + @ActiveProfiles("smoketest.pulsar.imperative") + class ImperativeApp { + + @Test + void appProducesAndConsumesMessages(CapturedOutput output) { + List expectedOutput = new ArrayList<>(); + IntStream.range(0, 10).forEachOrdered((i) -> { + expectedOutput.add("++++++PRODUCE IMPERATIVE:(" + i + ")------"); + expectedOutput.add("++++++CONSUME IMPERATIVE:(" + i + ")------"); + }); + Awaitility.waitAtMost(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(output).contains(expectedOutput)); + } + } - private void hasExpectedIds(SampleMessageConsumer consumer) { - assertThat(consumer.getConsumed()).extracting(SampleMessage::id).containsExactly(EXPECTED_IDS); + @Nested + @SpringBootTest + @ActiveProfiles("smoketest.pulsar.reactive") + class ReactiveApp { + + @Test + void appProducesAndConsumesMessagesReactively(CapturedOutput output) { + List expectedOutput = new ArrayList<>(); + IntStream.range(0, 10).forEachOrdered((i) -> { + expectedOutput.add("++++++PRODUCE REACTIVE:(" + i + ")------"); + expectedOutput.add("++++++CONSUME REACTIVE:(" + i + ")------"); + }); + Awaitility.waitAtMost(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(output).contains(expectedOutput)); + } + } } diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml index 3f7d8b5e06..f224ac2560 100644 --- a/src/checkstyle/checkstyle-suppressions.xml +++ b/src/checkstyle/checkstyle-suppressions.xml @@ -32,6 +32,7 @@ +