|
|
@ -37,6 +37,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
@Testcontainers(disabledWithoutDocker = true)
|
|
|
|
@Testcontainers(disabledWithoutDocker = true)
|
|
|
|
class SampleReactivePulsarApplicationTests {
|
|
|
|
class SampleReactivePulsarApplicationTests {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final Integer[] EXPECTED_IDS = IntStream.range(0, 10).boxed().toArray(Integer[]::new);
|
|
|
|
|
|
|
|
|
|
|
|
@Container
|
|
|
|
@Container
|
|
|
|
private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
|
|
|
|
private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(DockerImageNames.pulsar())
|
|
|
|
.withStartupAttempts(2)
|
|
|
|
.withStartupAttempts(2)
|
|
|
@ -50,11 +52,15 @@ class SampleReactivePulsarApplicationTests {
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) {
|
|
|
|
void appProducesAndConsumesSampleMessages(@Autowired SampleMessageConsumer consumer) {
|
|
|
|
Integer[] expectedIds = IntStream.range(0, 10).boxed().toArray(Integer[]::new);
|
|
|
|
|
|
|
|
Awaitility.await()
|
|
|
|
Awaitility.await()
|
|
|
|
.atMost(Duration.ofSeconds(20))
|
|
|
|
.atMost(Duration.ofMinutes(3))
|
|
|
|
.untilAsserted(() -> assertThat(consumer.getConsumed()).extracting(SampleMessage::id)
|
|
|
|
.with()
|
|
|
|
.containsExactly(expectedIds));
|
|
|
|
.pollInterval(Duration.ofMillis(500))
|
|
|
|
|
|
|
|
.untilAsserted(() -> hasExpectedIds(consumer));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void hasExpectedIds(SampleMessageConsumer consumer) {
|
|
|
|
|
|
|
|
assertThat(consumer.getConsumed()).extracting(SampleMessage::id).containsExactly(EXPECTED_IDS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|