Add Pulsar ConnectionDetails support

Add `ConnectionDetails` support for Apache Pulsar and provide adapters
for Docker Compose and Testcontainers.

See gh-37197
pull/37242/head
Chris Bono 2 years ago committed by Phillip Webb
parent db73e071cc
commit 089fef0392

@ -277,4 +277,5 @@ tasks.named("checkSpringConfigurationMetadata").configure {
test { test {
jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED" jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED"
jvmArgs += "--add-opens=java.base/sun.net=ALL-UNNAMED"
} }

@ -0,0 +1,42 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
/**
* Adapts {@link PulsarProperties} to {@link PulsarConnectionDetails}.
*
* @author Chris Bono
*/
class PropertiesPulsarConnectionDetails implements PulsarConnectionDetails {
private final PulsarProperties pulsarProperties;
PropertiesPulsarConnectionDetails(PulsarProperties pulsarProperties) {
this.pulsarProperties = pulsarProperties;
}
@Override
public String getPulsarBrokerUrl() {
return this.pulsarProperties.getClient().getServiceUrl();
}
@Override
public String getPulsarAdminUrl() {
return this.pulsarProperties.getAdmin().getServiceUrl();
}
}

@ -71,17 +71,31 @@ class PulsarConfiguration {
this.propertiesMapper = new PulsarPropertiesMapper(properties); this.propertiesMapper = new PulsarPropertiesMapper(properties);
} }
@Bean
@ConditionalOnMissingBean(PulsarConnectionDetails.class)
PropertiesPulsarConnectionDetails pulsarConnectionDetails() {
return new PropertiesPulsarConnectionDetails(this.properties);
}
@Bean @Bean
@ConditionalOnMissingBean(PulsarClientFactory.class) @ConditionalOnMissingBean(PulsarClientFactory.class)
DefaultPulsarClientFactory pulsarClientFactory(ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) { DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
List<PulsarClientBuilderCustomizer> allCustomizers = new ArrayList<>(); List<PulsarClientBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeClientBuilder); allCustomizers.add(this.propertiesMapper::customizeClientBuilder);
allCustomizers.add((clientBuilder) -> this.applyConnectionDetails(connectionDetails, clientBuilder));
allCustomizers.addAll(customizersProvider.orderedStream().toList()); allCustomizers.addAll(customizersProvider.orderedStream().toList());
DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory( DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory(
(clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder)); (clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder));
return clientFactory; return clientFactory;
} }
private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, ClientBuilder clientBuilder) {
if (connectionDetails.getPulsarBrokerUrl() != null) {
clientBuilder.serviceUrl(connectionDetails.getPulsarBrokerUrl());
}
}
private void applyClientBuilderCustomizers(List<PulsarClientBuilderCustomizer> customizers, private void applyClientBuilderCustomizers(List<PulsarClientBuilderCustomizer> customizers,
ClientBuilder clientBuilder) { ClientBuilder clientBuilder) {
customizers.forEach((customizer) -> customizer.customize(clientBuilder)); customizers.forEach((customizer) -> customizer.customize(clientBuilder));
@ -95,14 +109,21 @@ class PulsarConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
PulsarAdministration pulsarAdministration( PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarAdminBuilderCustomizer> pulsarAdminBuilderCustomizers) { ObjectProvider<PulsarAdminBuilderCustomizer> pulsarAdminBuilderCustomizers) {
List<PulsarAdminBuilderCustomizer> allCustomizers = new ArrayList<>(); List<PulsarAdminBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeAdminBuilder); allCustomizers.add(this.propertiesMapper::customizeAdminBuilder);
allCustomizers.add((adminBuilder) -> this.applyConnectionDetails(connectionDetails, adminBuilder));
allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList()); allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList());
return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder)); return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder));
} }
private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, PulsarAdminBuilder adminBuilder) {
if (connectionDetails.getPulsarAdminUrl() != null) {
adminBuilder.serviceHttpUrl(connectionDetails.getPulsarAdminUrl());
}
}
private void applyAdminBuilderCustomizers(List<PulsarAdminBuilderCustomizer> customizers, private void applyAdminBuilderCustomizers(List<PulsarAdminBuilderCustomizer> customizers,
PulsarAdminBuilder adminBuilder) { PulsarAdminBuilder adminBuilder) {
customizers.forEach((customizer) -> customizer.customize(adminBuilder)); customizers.forEach((customizer) -> customizer.customize(adminBuilder));

@ -0,0 +1,41 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
/**
* Details required to establish a connection to a Pulsar service.
*
* @author Chris Bono
* @since 3.2.0
*/
public interface PulsarConnectionDetails extends ConnectionDetails {
/**
* Returns the Pulsar service URL for the broker.
* @return the Pulsar service URL for the broker
*/
String getPulsarBrokerUrl();
/**
* Returns the Pulsar web URL for the admin endpoint.
* @return the Pulsar web URL for the admin endpoint
*/
String getPulsarAdminUrl();
}

@ -53,6 +53,7 @@ final class PulsarPropertiesMapper {
PulsarProperties.Client properties = this.properties.getClient(); PulsarProperties.Client properties = this.properties.getClient();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getServiceUrl).to(clientBuilder::serviceUrl); map.from(properties::getServiceUrl).to(clientBuilder::serviceUrl);
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));

@ -0,0 +1,46 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link PropertiesPulsarConnectionDetails}.
*
* @author Chris Bono
*/
class PropertiesPulsarConnectionDetailsTests {
@Test
void pulsarBrokerUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getClient().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarBrokerUrl()).isEqualTo("foo");
}
@Test
void pulsarAdminUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getAdmin().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarAdminUrl()).isEqualTo("foo");
}
}

@ -114,6 +114,7 @@ class PulsarAutoConfigurationTests {
@Test @Test
void autoConfiguresBeans() { void autoConfiguresBeans() {
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class) this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class)
.hasSingleBean(PulsarConnectionDetails.class)
.hasSingleBean(DefaultPulsarClientFactory.class) .hasSingleBean(DefaultPulsarClientFactory.class)
.hasSingleBean(PulsarClient.class) .hasSingleBean(PulsarClient.class)
.hasSingleBean(PulsarAdministration.class) .hasSingleBean(PulsarAdministration.class)

@ -51,6 +51,7 @@ import org.springframework.pulsar.function.PulsarFunctionAdministration;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry; import static org.assertj.core.api.Assertions.entry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
/** /**
@ -67,6 +68,15 @@ class PulsarConfigurationTests {
.withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) .withConfiguration(AutoConfigurations.of(PulsarConfiguration.class))
.withBean(PulsarClient.class, () -> mock(PulsarClient.class)); .withBean(PulsarClient.class, () -> mock(PulsarClient.class));
@Test
void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() {
PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class);
this.contextRunner
.withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails)
.run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class)
.isSameAs(customConnectionDetails));
}
@Nested @Nested
class ClientTests { class ClientTests {
@ -86,17 +96,36 @@ class PulsarConfigurationTests {
.run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient));
} }
@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer"))
.callsInOrder(ClientBuilder::serviceUrl, "fromPropsCustomizer");
});
}
@Test @Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn("fromConnectionDetailsCustomizer");
PulsarConfigurationTests.this.contextRunner PulsarConfigurationTests.this.contextRunner
.withUserConfiguration(PulsarClientBuilderCustomizersConfig.class) .withUserConfiguration(PulsarClientBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer") .withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.run((context) -> { .run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize); .of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder( assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder(
ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromCustomizer1", "fromCustomizer2"); ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromConnectionDetailsCustomizer",
"fromCustomizer1", "fromCustomizer2");
}); });
} }
@ -133,17 +162,35 @@ class PulsarConfigurationTests {
.isSameAs(pulsarAdministration)); .isSameAs(pulsarAdministration));
} }
@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers"))
.callsInOrder(PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer");
});
}
@Test @Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn("fromConnectionDetailsCustomizer");
this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class) this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer") .withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.run((context) -> { .run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class); PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize); .of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder( assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder(
PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer", "fromCustomizer1", PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer",
"fromCustomizer2"); "fromConnectionDetailsCustomizer", "fromCustomizer1", "fromCustomizer2");
}); });
} }

@ -0,0 +1,74 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.docker.compose.service.connection.pulsar;
import org.springframework.boot.autoconfigure.pulsar.PulsarConnectionDetails;
import org.springframework.boot.docker.compose.core.RunningService;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource;
/**
* {@link DockerComposeConnectionDetailsFactory} to create {@link PulsarConnectionDetails}
* for a {@code pulsar} service.
*
* @author Chris Bono
*/
class PulsarDockerComposeConnectionDetailsFactory
extends DockerComposeConnectionDetailsFactory<PulsarConnectionDetails> {
private static final int PULSAR_BROKER_PORT = 6650;
private static final int PULSAR_ADMIN_PORT = 8080;
PulsarDockerComposeConnectionDetailsFactory() {
super("apachepulsar/pulsar");
}
@Override
protected PulsarConnectionDetails getDockerComposeConnectionDetails(DockerComposeConnectionSource source) {
return new PulsarDockerComposeConnectionDetails(source.getRunningService());
}
/**
* {@link PulsarConnectionDetails} backed by a {@code pulsar} {@link RunningService}.
*/
static class PulsarDockerComposeConnectionDetails extends DockerComposeConnectionDetails
implements PulsarConnectionDetails {
private final String brokerUrl;
private final String adminUrl;
PulsarDockerComposeConnectionDetails(RunningService service) {
super(service);
this.brokerUrl = "pulsar://%s:%s".formatted(service.host(), service.ports().get(PULSAR_BROKER_PORT));
this.adminUrl = "http://%s:%s".formatted(service.host(), service.ports().get(PULSAR_ADMIN_PORT));
}
@Override
public String getPulsarBrokerUrl() {
return this.brokerUrl;
}
@Override
public String getPulsarAdminUrl() {
return this.adminUrl;
}
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Auto-configuration for docker compose Pulsar service connections.
*/
package org.springframework.boot.docker.compose.service.connection.pulsar;

@ -19,9 +19,9 @@ org.springframework.boot.docker.compose.service.connection.oracle.OracleJdbcDock
org.springframework.boot.docker.compose.service.connection.oracle.OracleR2dbcDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.oracle.OracleR2dbcDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.postgres.PostgresJdbcDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.postgres.PostgresJdbcDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.postgres.PostgresR2dbcDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.postgres.PostgresR2dbcDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.pulsar.PulsarDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.rabbit.RabbitDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.rabbit.RabbitDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.redis.RedisDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.redis.RedisDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.sqlserver.SqlServerJdbcDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.sqlserver.SqlServerJdbcDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.sqlserver.SqlServerR2dbcDockerComposeConnectionDetailsFactory,\ org.springframework.boot.docker.compose.service.connection.sqlserver.SqlServerR2dbcDockerComposeConnectionDetailsFactory,\
org.springframework.boot.docker.compose.service.connection.zipkin.ZipkinDockerComposeConnectionDetailsFactory org.springframework.boot.docker.compose.service.connection.zipkin.ZipkinDockerComposeConnectionDetailsFactory

@ -0,0 +1,46 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.docker.compose.service.connection.pulsar;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.pulsar.PulsarConnectionDetails;
import org.springframework.boot.docker.compose.service.connection.test.AbstractDockerComposeIntegrationTests;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration test for {@link PulsarDockerComposeConnectionDetailsFactory}.
*
* @author Chris Bono
*/
class PulsarDockerComposeConnectionDetailsFactoryIntegrationTests extends AbstractDockerComposeIntegrationTests {
PulsarDockerComposeConnectionDetailsFactoryIntegrationTests() {
super("pulsar-compose.yaml", DockerImageNames.pulsar());
}
@Test
void runCreatesConnectionDetails() {
PulsarConnectionDetails connectionDetails = run(PulsarConnectionDetails.class);
assertThat(connectionDetails).isNotNull();
assertThat(connectionDetails.getPulsarBrokerUrl()).matches("^pulsar:\\/\\/\\S+:\\d+");
assertThat(connectionDetails.getPulsarAdminUrl()).matches("^http:\\/\\/\\S+:\\d+");
}
}

@ -0,0 +1,9 @@
services:
pulsar:
image: '{imageName}'
ports:
- '8080'
- '6650'
command: bin/pulsar standalone
healthcheck:
test: curl http://127.0.0.1:8080/admin/v2/namespaces/public/default

@ -76,6 +76,9 @@ The following service connections are currently supported:
| `MongoConnectionDetails` | `MongoConnectionDetails`
| Containers named "mongo" | Containers named "mongo"
| `PulsarConnectionDetails`
| Containers named "apachepulsar/pulsar"
| `R2dbcConnectionDetails` | `R2dbcConnectionDetails`
| Containers named "gvenzl/oracle-xe", "mariadb", "mssql/server", "mysql", or "postgres" | Containers named "gvenzl/oracle-xe", "mariadb", "mssql/server", "mysql", or "postgres"

@ -992,6 +992,9 @@ The following service connection factories are provided in the `spring-boot-test
| `Neo4jConnectionDetails` | `Neo4jConnectionDetails`
| Containers of type `Neo4jContainer` | Containers of type `Neo4jContainer`
| `PulsarConnectionDetails`
| Containers of type `PulsarContainer`
| `R2dbcConnectionDetails` | `R2dbcConnectionDetails`
| Containers of type `MariaDBContainer`, `MSSQLServerContainer`, `MySQLContainer`, `OracleContainer`, or `PostgreSQLContainer` | Containers of type `MariaDBContainer`, `MSSQLServerContainer`, `MySQLContainer`, `OracleContainer`, or `PostgreSQLContainer`

@ -29,6 +29,7 @@ dependencies {
optional("org.testcontainers:neo4j") optional("org.testcontainers:neo4j")
optional("org.testcontainers:oracle-xe") optional("org.testcontainers:oracle-xe")
optional("org.testcontainers:postgresql") optional("org.testcontainers:postgresql")
optional("org.testcontainers:pulsar")
optional("org.testcontainers:rabbitmq") optional("org.testcontainers:rabbitmq")
optional("org.testcontainers:redpanda") optional("org.testcontainers:redpanda")
optional("org.testcontainers:r2dbc") optional("org.testcontainers:r2dbc")
@ -50,8 +51,13 @@ dependencies {
testImplementation("org.springframework:spring-r2dbc") testImplementation("org.springframework:spring-r2dbc")
testImplementation("org.springframework.amqp:spring-rabbit") testImplementation("org.springframework.amqp:spring-rabbit")
testImplementation("org.springframework.kafka:spring-kafka") testImplementation("org.springframework.kafka:spring-kafka")
testImplementation("org.springframework.pulsar:spring-pulsar")
testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:junit-jupiter")
testRuntimeOnly("com.oracle.database.r2dbc:oracle-r2dbc") testRuntimeOnly("com.oracle.database.r2dbc:oracle-r2dbc")
} }
test {
jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED"
jvmArgs += "--add-opens=java.base/sun.net=ALL-UNNAMED"
}

@ -0,0 +1,62 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.testcontainers.service.connection.pulsar;
import org.testcontainers.containers.PulsarContainer;
import org.springframework.boot.autoconfigure.pulsar.PulsarConnectionDetails;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
/**
* {@link ContainerConnectionDetailsFactory} to create {@link PulsarConnectionDetails}
* from a {@link ServiceConnection @ServiceConnection}-annotated {@link PulsarContainer}.
*
* @author Chris Bono
*/
class PulsarContainerConnectionDetailsFactory
extends ContainerConnectionDetailsFactory<PulsarContainer, PulsarConnectionDetails> {
@Override
protected PulsarConnectionDetails getContainerConnectionDetails(ContainerConnectionSource<PulsarContainer> source) {
return new PulsarContainerConnectionDetails(source);
}
/**
* {@link PulsarConnectionDetails} backed by a {@link ContainerConnectionSource}.
*/
private static final class PulsarContainerConnectionDetails extends ContainerConnectionDetails<PulsarContainer>
implements PulsarConnectionDetails {
private PulsarContainerConnectionDetails(ContainerConnectionSource<PulsarContainer> source) {
super(source);
}
@Override
public String getPulsarBrokerUrl() {
return getContainer().getPulsarBrokerUrl();
}
@Override
public String getPulsarAdminUrl() {
return getContainer().getHttpServiceUrl();
}
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for testcontainers Pulsar service connections.
*/
package org.springframework.boot.testcontainers.service.connection.pulsar;

@ -19,6 +19,7 @@ org.springframework.boot.testcontainers.service.connection.kafka.KafkaContainerC
org.springframework.boot.testcontainers.service.connection.liquibase.LiquibaseContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.liquibase.LiquibaseContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.mongo.MongoContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.mongo.MongoContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.neo4j.Neo4jContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.neo4j.Neo4jContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.pulsar.PulsarContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.r2dbc.MariaDbR2dbcContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.r2dbc.MariaDbR2dbcContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.r2dbc.MySqlR2dbcContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.r2dbc.MySqlR2dbcContainerConnectionDetailsFactory,\
org.springframework.boot.testcontainers.service.connection.r2dbc.OracleR2dbcContainerConnectionDetailsFactory,\ org.springframework.boot.testcontainers.service.connection.r2dbc.OracleR2dbcContainerConnectionDetailsFactory,\

@ -0,0 +1,95 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.testcontainers.service.connection.pulsar;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.pulsar.client.api.PulsarClientException;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.pulsar.PulsarAutoConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link PulsarContainerConnectionDetailsFactory}.
*
* @author Chris Bono
*/
@SpringJUnitConfig
@Testcontainers(disabledWithoutDocker = true)
@TestPropertySource(properties = { "spring.pulsar.consumer.subscription.initial-position=earliest" })
class PulsarContainerConnectionDetailsFactoryIntegrationTests {
@Container
@ServiceConnection
@SuppressWarnings("unused")
static final PulsarContainer PULSAR = new PulsarContainer(DockerImageNames.pulsar())
.withStartupTimeout(Duration.ofMinutes(3));
@Autowired
private PulsarTemplate<String> pulsarTemplate;
@Autowired
private TestListener listener;
@Test
void connectionCanBeMadeToPulsarContainer() throws PulsarClientException {
this.pulsarTemplate.send("test-topic", "test-data");
Awaitility.waitAtMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(this.listener.messages).containsExactly("test-data"));
}
@Configuration(proxyBeanMethods = false)
@ImportAutoConfiguration(PulsarAutoConfiguration.class)
static class TestConfiguration {
@Bean
TestListener testListener() {
return new TestListener();
}
}
static class TestListener {
private final List<String> messages = new ArrayList<>();
@PulsarListener(topics = "test-topic")
void processMessage(String message) {
this.messages.add(message);
}
}
}

@ -10,6 +10,7 @@ dependencies {
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive")) implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive"))
testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test")) testImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test"))
testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support")) testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
testImplementation(project(":spring-boot-project:spring-boot-testcontainers"))
testImplementation("org.awaitility:awaitility") testImplementation("org.awaitility:awaitility")
testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:pulsar") testImplementation("org.testcontainers:pulsar")

@ -32,10 +32,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames; import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -44,15 +43,11 @@ import static org.assertj.core.api.Assertions.assertThat;
class SamplePulsarApplicationTests { class SamplePulsarApplicationTests {
@Container @Container
@ServiceConnection
@SuppressWarnings("unused")
static final PulsarContainer container = new PulsarContainer(DockerImageNames.pulsar()).withStartupAttempts(2) static final PulsarContainer container = new PulsarContainer(DockerImageNames.pulsar()).withStartupAttempts(2)
.withStartupTimeout(Duration.ofMinutes(3)); .withStartupTimeout(Duration.ofMinutes(3));
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", container::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", container::getHttpServiceUrl);
}
abstract class PulsarApplication { abstract class PulsarApplication {
private final String type; private final String type;

Loading…
Cancel
Save