Combine Pulsar smoke tests

* Simplify produce/consume verify via OutputCapture

* Remove spring-boot-smoke-test-pulsar-reactive as
  no other smoke tests split them out

See gh-37196
pull/37197/head
Chris Bono 1 year ago committed by Phillip Webb
parent 2ebcdb059a
commit eacf92b1b2

@ -1,15 +0,0 @@
plugins {
id "java"
id "org.springframework.boot.conventions"
}
description = "Spring Boot Pulsar smoke test"
dependencies {
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive"))
testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test"))
testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
testImplementation("org.awaitility:awaitility")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:pulsar")
}

@ -1,20 +0,0 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package smoketest.pulsar.reactive;
record SampleMessage(Integer id, String content) {
}

@ -1,43 +0,0 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package smoketest.pulsar.reactive;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
class SampleMessageConsumer {
private List<SampleMessage> consumed = new CopyOnWriteArrayList<>();
List<SampleMessage> getConsumed() {
return this.consumed;
}
@ReactivePulsarListener(topics = SampleReactivePulsarApplication.TOPIC)
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
System.out.println("**** CONSUME: " + msg);
this.consumed.add(msg);
return Mono.empty();
}
}

@ -1,66 +0,0 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package smoketest.pulsar.reactive;
import java.time.Duration;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@Testcontainers(disabledWithoutDocker = true)
class SampleReactivePulsarApplicationTests {
private static final Integer[] EXPECTED_IDS = IntStream.range(0, 10).boxed().toArray(Integer[]::new);
@Container
private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
.withStartupAttempts(2)
.withStartupTimeout(Duration.ofMinutes(3));
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
}
@Test
void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) {
Awaitility.await()
.atMost(Duration.ofMinutes(3))
.with()
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> hasExpectedIds(consumer));
}
private void hasExpectedIds(SampleMessageConsumer consumer) {
assertThat(consumer.getConsumed()).extracting(SampleMessage::id).containsExactly(EXPECTED_IDS);
}
}

@ -7,6 +7,7 @@ description = "Spring Boot Pulsar smoke test"
dependencies {
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar"))
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive"))
testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test"))
testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
testImplementation("org.awaitility:awaitility")

@ -0,0 +1,58 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package smoketest.pulsar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopic;
@Configuration(proxyBeanMethods = false)
@Profile("smoketest.pulsar.imperative")
class ImperativeAppConfig {
private static final Log LOG = LogFactory.getLog(ImperativeAppConfig.class);
private static final String TOPIC = "pulsar-smoke-test-topic";
@Bean
PulsarTopic pulsarTestTopic() {
return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build();
}
@Bean
ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate<SampleMessage> template) {
return (args) -> {
for (int i = 0; i < 10; i++) {
template.send(TOPIC, new SampleMessage(i, "message:" + i));
LOG.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
}
};
}
@PulsarListener(topics = TOPIC)
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
LOG.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
}
}

@ -14,22 +14,29 @@
* limitations under the License.
*/
package smoketest.pulsar.reactive;
package smoketest.pulsar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
@SpringBootApplication
public class SampleReactivePulsarApplication {
@Configuration(proxyBeanMethods = false)
@Profile("smoketest.pulsar.reactive")
class ReactiveAppConfig {
static final String TOPIC = "pulsar-reactive-smoke-test-topic";
private static final Log LOG = LogFactory.getLog(ReactiveAppConfig.class);
private static final String TOPIC = "pulsar-reactive-smoke-test-topic";
@Bean
PulsarTopic pulsarTestTopic() {
@ -42,12 +49,15 @@ public class SampleReactivePulsarApplication {
.map((i) -> new SampleMessage(i, "message:" + i))
.map(MessageSpec::of)
.as((msgs) -> template.send(TOPIC, msgs))
.doOnNext((sendResult) -> System.out.println("*** PRODUCE: " + sendResult.getMessageId()))
.doOnNext((sendResult) -> LOG
.info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------"))
.subscribe();
}
public static void main(String[] args) {
SpringApplication.run(SampleReactivePulsarApplication.class, args);
@ReactivePulsarListener(topics = TOPIC)
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
LOG.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
return Mono.empty();
}
}

@ -1,40 +0,0 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package smoketest.pulsar;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
class SampleMessageConsumer {
private List<SampleMessage> consumed = new CopyOnWriteArrayList<>();
List<SampleMessage> getConsumed() {
return this.consumed;
}
@PulsarListener(topics = SamplePulsarApplication.TOPIC)
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
System.out.println("**** CONSUME: " + msg);
this.consumed.add(msg);
}
}

@ -16,35 +16,12 @@
package smoketest.pulsar;
import org.apache.pulsar.client.api.MessageId;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopic;
@SpringBootApplication
public class SamplePulsarApplication {
static final String TOPIC = "pulsar-smoke-test-topic";
@Bean
PulsarTopic pulsarTestTopic() {
return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build();
}
@Bean
ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate<SampleMessage> template) {
return (args) -> {
for (int i = 0; i < 10; i++) {
MessageId msgId = template.send(TOPIC, new SampleMessage(i, "message:" + i));
System.out.println("*** PRODUCE: " + msgId);
}
};
}
public static void main(String[] args) {
SpringApplication.run(SamplePulsarApplication.class, args);
}

@ -1 +1 @@
spring.pulsar.consumer.subscription-initial-position=earliest
spring.pulsar.consumer.subscription.initial-position=earliest

@ -17,30 +17,34 @@
package smoketest.pulsar;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@Testcontainers(disabledWithoutDocker = true)
@ExtendWith(OutputCaptureExtension.class)
class SamplePulsarApplicationTests {
private static final Integer[] EXPECTED_IDS = IntStream.range(0, 10).boxed().toArray(Integer[]::new);
@Container
private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
.withStartupAttempts(2)
.withStartupTimeout(Duration.ofMinutes(3));
@ -50,17 +54,40 @@ class SamplePulsarApplicationTests {
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
}
@Test
void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) {
Awaitility.await()
.atMost(Duration.ofMinutes(3))
.with()
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> hasExpectedIds(consumer));
@Nested
@SpringBootTest
@ActiveProfiles("smoketest.pulsar.imperative")
class ImperativeApp {
@Test
void appProducesAndConsumesMessages(CapturedOutput output) {
List<String> expectedOutput = new ArrayList<>();
IntStream.range(0, 10).forEachOrdered((i) -> {
expectedOutput.add("++++++PRODUCE IMPERATIVE:(" + i + ")------");
expectedOutput.add("++++++CONSUME IMPERATIVE:(" + i + ")------");
});
Awaitility.waitAtMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
}
}
private void hasExpectedIds(SampleMessageConsumer consumer) {
assertThat(consumer.getConsumed()).extracting(SampleMessage::id).containsExactly(EXPECTED_IDS);
@Nested
@SpringBootTest
@ActiveProfiles("smoketest.pulsar.reactive")
class ReactiveApp {
@Test
void appProducesAndConsumesMessagesReactively(CapturedOutput output) {
List<String> expectedOutput = new ArrayList<>();
IntStream.range(0, 10).forEachOrdered((i) -> {
expectedOutput.add("++++++PRODUCE REACTIVE:(" + i + ")------");
expectedOutput.add("++++++CONSUME REACTIVE:(" + i + ")------");
});
Awaitility.waitAtMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
}
}
}

@ -32,6 +32,7 @@
<suppress files="[\\/]spring-boot-docs[\\/].*MyConfiguration__BeanDefinitions" checks="JavadocMethod|SpringHideUtilityClassConstructor" />
<suppress files="[\\/]spring-boot-smoke-tests[\\/]" checks="JavadocPackage|JavadocType" />
<suppress files="[\\/]spring-boot-smoke-tests[\\/]" checks="ImportControl" />
<suppress files="[\\/]spring-boot-smoke-tests[\\/].*SamplePulsarApplicationTests" checks="SpringHideUtilityClassConstructor" />
<suppress files="[\\/]spring-boot-smoke-tests[\\/]" id="mainCodeIllegalImportCheck" />
<suppress files="[\\/]spring-boot-deployment-tests[\\/]" checks="JavadocPackage|JavadocType" />
<suppress files="[\\/]spring-boot-integration-tests[\\/]" checks="JavadocType" />

Loading…
Cancel
Save