Polish "Combine Pulsar smoke tests"

See gh-37196
pull/37197/head
Phillip Webb 1 year ago
parent eacf92b1b2
commit 9497f3d91c

@ -31,7 +31,7 @@ import org.springframework.pulsar.core.PulsarTopic;
@Profile("smoketest.pulsar.imperative")
class ImperativeAppConfig {
private static final Log LOG = LogFactory.getLog(ImperativeAppConfig.class);
private static final Log logger = LogFactory.getLog(ImperativeAppConfig.class);
private static final String TOPIC = "pulsar-smoke-test-topic";
@ -45,14 +45,14 @@ class ImperativeAppConfig {
return (args) -> {
for (int i = 0; i < 10; i++) {
template.send(TOPIC, new SampleMessage(i, "message:" + i));
LOG.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
logger.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
}
};
}
@PulsarListener(topics = TOPIC)
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
LOG.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
logger.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
}
}

@ -34,7 +34,7 @@ import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
@Profile("smoketest.pulsar.reactive")
class ReactiveAppConfig {
private static final Log LOG = LogFactory.getLog(ReactiveAppConfig.class);
private static final Log logger = LogFactory.getLog(ReactiveAppConfig.class);
private static final String TOPIC = "pulsar-reactive-smoke-test-topic";
@ -49,14 +49,14 @@ class ReactiveAppConfig {
.map((i) -> new SampleMessage(i, "message:" + i))
.map(MessageSpec::of)
.as((msgs) -> template.send(TOPIC, msgs))
.doOnNext((sendResult) -> LOG
.doOnNext((sendResult) -> logger
.info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------"))
.subscribe();
}
@ReactivePulsarListener(topics = TOPIC)
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
LOG.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
logger.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
return Mono.empty();
}

@ -44,27 +44,29 @@ import static org.assertj.core.api.Assertions.assertThat;
class SamplePulsarApplicationTests {
@Container
static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
.withStartupAttempts(2)
static final PulsarContainer container = new PulsarContainer(DockerImageNames.pulsar()).withStartupAttempts(2)
.withStartupTimeout(Duration.ofMinutes(3));
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
registry.add("spring.pulsar.client.service-url", container::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", container::getHttpServiceUrl);
}
@Nested
@SpringBootTest
@ActiveProfiles("smoketest.pulsar.imperative")
class ImperativeApp {
abstract class PulsarApplication {
private final String type;
PulsarApplication(String type) {
this.type = type;
}
@Test
void appProducesAndConsumesMessages(CapturedOutput output) {
List<String> expectedOutput = new ArrayList<>();
IntStream.range(0, 10).forEachOrdered((i) -> {
expectedOutput.add("++++++PRODUCE IMPERATIVE:(" + i + ")------");
expectedOutput.add("++++++CONSUME IMPERATIVE:(" + i + ")------");
expectedOutput.add("++++++PRODUCE %s:(%s)------".formatted(this.type, i));
expectedOutput.add("++++++CONSUME %s:(%s)------".formatted(this.type, i));
});
Awaitility.waitAtMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
@ -72,20 +74,24 @@ class SamplePulsarApplicationTests {
}
@Nested
@SpringBootTest
@ActiveProfiles("smoketest.pulsar.imperative")
class ImperativePulsarApplication extends PulsarApplication {
ImperativePulsarApplication() {
super("IMPERATIVE");
}
}
@Nested
@SpringBootTest
@ActiveProfiles("smoketest.pulsar.reactive")
class ReactiveApp {
class ReactivePulsarApplication extends PulsarApplication {
@Test
void appProducesAndConsumesMessagesReactively(CapturedOutput output) {
List<String> expectedOutput = new ArrayList<>();
IntStream.range(0, 10).forEachOrdered((i) -> {
expectedOutput.add("++++++PRODUCE REACTIVE:(" + i + ")------");
expectedOutput.add("++++++CONSUME REACTIVE:(" + i + ")------");
});
Awaitility.waitAtMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
ReactivePulsarApplication() {
super("REACTIVE");
}
}

Loading…
Cancel
Save