Rename ServerRSocketFactoryCustomizer

Rename `ServerRSocketFactoryCustomizer` to
`ServerRSocketFactoryProcessor` to better reflect that it is not a
traditional customizer.

Closes gh-18390
pull/18490/head
Phillip Webb 5 years ago
parent 96f85a40de
commit 71f8347c81

@ -37,7 +37,7 @@ import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap; import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory; import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServerFactory; import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -70,11 +70,9 @@ public class RSocketServerAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider(RSocketProperties properties, RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider(RSocketProperties properties,
RSocketMessageHandler messageHandler, ObjectProvider<ServerRSocketFactoryCustomizer> customizers) { RSocketMessageHandler messageHandler, ObjectProvider<ServerRSocketFactoryProcessor> processors) {
RSocketWebSocketNettyRouteProvider routeProvider = new RSocketWebSocketNettyRouteProvider( return new RSocketWebSocketNettyRouteProvider(properties.getServer().getMappingPath(),
properties.getServer().getMappingPath(), messageHandler.responder()); messageHandler.responder(), processors.orderedStream());
routeProvider.setCustomizers(customizers.orderedStream().collect(Collectors.toList()));
return routeProvider;
} }
} }
@ -92,14 +90,14 @@ public class RSocketServerAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
RSocketServerFactory rSocketServerFactory(RSocketProperties properties, ReactorResourceFactory resourceFactory, RSocketServerFactory rSocketServerFactory(RSocketProperties properties, ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryCustomizer> customizers) { ObjectProvider<ServerRSocketFactoryProcessor> processors) {
NettyRSocketServerFactory factory = new NettyRSocketServerFactory(); NettyRSocketServerFactory factory = new NettyRSocketServerFactory();
factory.setResourceFactory(resourceFactory); factory.setResourceFactory(resourceFactory);
factory.setTransport(properties.getServer().getTransport()); factory.setTransport(properties.getServer().getTransport());
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getServer().getAddress()).to(factory::setAddress); map.from(properties.getServer().getAddress()).to(factory::setAddress);
map.from(properties.getServer().getPort()).to(factory::setPort); map.from(properties.getServer().getPort()).to(factory::setPort);
factory.setServerCustomizers(customizers.orderedStream().collect(Collectors.toList())); factory.setServerProcessors(processors.orderedStream().collect(Collectors.toList()));
return factory; return factory;
} }
@ -111,8 +109,7 @@ public class RSocketServerAutoConfiguration {
} }
@Bean @Bean
ServerRSocketFactoryCustomizer frameDecoderServerFactoryCustomizer( ServerRSocketFactoryProcessor frameDecoderServerFactoryCustomizer(RSocketMessageHandler rSocketMessageHandler) {
RSocketMessageHandler rSocketMessageHandler) {
return (serverRSocketFactory) -> { return (serverRSocketFactory) -> {
if (rSocketMessageHandler.getRSocketStrategies() if (rSocketMessageHandler.getRSocketStrategies()
.dataBufferFactory() instanceof NettyDataBufferFactory) { .dataBufferFactory() instanceof NettyDataBufferFactory) {

@ -16,8 +16,9 @@
package org.springframework.boot.autoconfigure.rsocket; package org.springframework.boot.autoconfigure.rsocket;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.rsocket.RSocketFactory; import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor; import io.rsocket.SocketAcceptor;
@ -25,7 +26,7 @@ import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.WebsocketRouteTransport; import io.rsocket.transport.netty.server.WebsocketRouteTransport;
import reactor.netty.http.server.HttpServerRoutes; import reactor.netty.http.server.HttpServerRoutes;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.web.embedded.netty.NettyRouteProvider; import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
/** /**
@ -39,22 +40,20 @@ class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider {
private final SocketAcceptor socketAcceptor; private final SocketAcceptor socketAcceptor;
private List<ServerRSocketFactoryCustomizer> customizers = Collections.emptyList(); private final List<ServerRSocketFactoryProcessor> processors;
RSocketWebSocketNettyRouteProvider(String mappingPath, SocketAcceptor socketAcceptor) { RSocketWebSocketNettyRouteProvider(String mappingPath, SocketAcceptor socketAcceptor,
Stream<ServerRSocketFactoryProcessor> processors) {
this.mappingPath = mappingPath; this.mappingPath = mappingPath;
this.socketAcceptor = socketAcceptor; this.socketAcceptor = socketAcceptor;
} this.processors = processors.collect(Collectors.toList());
void setCustomizers(List<ServerRSocketFactoryCustomizer> customizers) {
this.customizers = customizers;
} }
@Override @Override
public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) { public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) {
RSocketFactory.ServerRSocketFactory server = RSocketFactory.receive(); RSocketFactory.ServerRSocketFactory server = RSocketFactory.receive();
for (ServerRSocketFactoryCustomizer customizer : this.customizers) { for (ServerRSocketFactoryProcessor processor : this.processors) {
server = customizer.apply(server); server = processor.process(server);
} }
ServerTransport.ConnectionAcceptor acceptor = server.acceptor(this.socketAcceptor).toConnectionAcceptor(); ServerTransport.ConnectionAcceptor acceptor = server.acceptor(this.socketAcceptor).toConnectionAcceptor();
return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(acceptor)); return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(acceptor));

@ -22,7 +22,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.context.RSocketPortInfoApplicationContextInitializer; import org.springframework.boot.rsocket.context.RSocketPortInfoApplicationContextInitializer;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap; import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerFactory; import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner; import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.server.WebServerFactoryCustomizer; import org.springframework.boot.web.server.WebServerFactoryCustomizer;
@ -79,7 +79,7 @@ class RSocketServerAutoConfigurationTests {
reactiveWebContextRunner().withPropertyValues("spring.rsocket.server.port=0") reactiveWebContextRunner().withPropertyValues("spring.rsocket.server.port=0")
.run((context) -> assertThat(context).hasSingleBean(RSocketServerFactory.class) .run((context) -> assertThat(context).hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(RSocketServerBootstrap.class) .hasSingleBean(RSocketServerBootstrap.class)
.hasSingleBean(ServerRSocketFactoryCustomizer.class)); .hasSingleBean(ServerRSocketFactoryProcessor.class));
} }
@Test @Test
@ -88,7 +88,7 @@ class RSocketServerAutoConfigurationTests {
.withInitializer(new RSocketPortInfoApplicationContextInitializer()).run((context) -> { .withInitializer(new RSocketPortInfoApplicationContextInitializer()).run((context) -> {
assertThat(context).hasSingleBean(RSocketServerFactory.class) assertThat(context).hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(RSocketServerBootstrap.class) .hasSingleBean(RSocketServerBootstrap.class)
.hasSingleBean(ServerRSocketFactoryCustomizer.class); .hasSingleBean(ServerRSocketFactoryProcessor.class);
assertThat(context.getEnvironment().getProperty("local.rsocket.server.port")).isNotNull(); assertThat(context.getEnvironment().getProperty("local.rsocket.server.port")).isNotNull();
}); });
} }

@ -28,7 +28,7 @@ import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.autoconfigure.web.reactive.HttpHandlerAutoConfiguration; import org.springframework.boot.autoconfigure.web.reactive.HttpHandlerAutoConfiguration;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxAutoConfiguration; import org.springframework.boot.autoconfigure.web.reactive.WebFluxAutoConfiguration;
import org.springframework.boot.autoconfigure.web.reactive.error.ErrorWebFluxAutoConfiguration; import org.springframework.boot.autoconfigure.web.reactive.error.ErrorWebFluxAutoConfiguration;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner; import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext; import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext;
@ -52,7 +52,6 @@ import static org.assertj.core.api.Assertions.assertThat;
* *
* @author Brian Clozel * @author Brian Clozel
*/ */
class RSocketWebSocketNettyRouteProviderTests { class RSocketWebSocketNettyRouteProviderTests {
@Test @Test
@ -77,7 +76,7 @@ class RSocketWebSocketNettyRouteProviderTests {
WebTestClient client = createWebTestClient(serverContext.getWebServer()); WebTestClient client = createWebTestClient(serverContext.getWebServer());
client.get().uri("/protocol").exchange().expectStatus().isOk().expectBody().jsonPath("name", client.get().uri("/protocol").exchange().expectStatus().isOk().expectBody().jsonPath("name",
"http"); "http");
assertThat(WebConfiguration.customizerCallCount).isEqualTo(1); assertThat(WebConfiguration.processorCallCount).isEqualTo(1);
}); });
} }
@ -95,7 +94,7 @@ class RSocketWebSocketNettyRouteProviderTests {
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
static class WebConfiguration { static class WebConfiguration {
static int customizerCallCount = 0; static int processorCallCount = 0;
@Bean @Bean
WebController webController() { WebController webController() {
@ -110,9 +109,9 @@ class RSocketWebSocketNettyRouteProviderTests {
} }
@Bean @Bean
ServerRSocketFactoryCustomizer myRSocketFactoryCustomizer() { ServerRSocketFactoryProcessor myRSocketFactoryProcessor() {
return (server) -> { return (server) -> {
customizerCallCount++; processorCallCount++;
return server; return server;
}; };
} }

@ -37,7 +37,7 @@ import reactor.netty.tcp.TcpServer;
import org.springframework.boot.rsocket.server.ConfigurableRSocketServerFactory; import org.springframework.boot.rsocket.server.ConfigurableRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServer; import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerFactory; import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.http.client.reactive.ReactorResourceFactory; import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -60,7 +60,7 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
private Duration lifecycleTimeout; private Duration lifecycleTimeout;
private List<ServerRSocketFactoryCustomizer> serverCustomizers = new ArrayList<>(); private List<ServerRSocketFactoryProcessor> serverProcessors = new ArrayList<>();
@Override @Override
public void setPort(int port) { public void setPort(int port) {
@ -86,23 +86,23 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
} }
/** /**
* Set {@link ServerRSocketFactoryCustomizer}s that should be applied to the RSocket * Set {@link ServerRSocketFactoryProcessor}s that should be applied to the RSocket
* server builder. Calling this method will replace any existing customizers. * server builder. Calling this method will replace any existing customizers.
* @param serverCustomizers the customizers to set * @param serverProcessors server processors to apply before the server starts
*/ */
public void setServerCustomizers(Collection<? extends ServerRSocketFactoryCustomizer> serverCustomizers) { public void setServerProcessors(Collection<? extends ServerRSocketFactoryProcessor> serverProcessors) {
Assert.notNull(serverCustomizers, "ServerCustomizers must not be null"); Assert.notNull(serverProcessors, "ServerProcessors must not be null");
this.serverCustomizers = new ArrayList<>(serverCustomizers); this.serverProcessors = new ArrayList<>(serverProcessors);
} }
/** /**
* Add {@link ServerRSocketFactoryCustomizer}s that should applied while building the * Add {@link ServerRSocketFactoryProcessor}s that should applied while building the
* server. * server.
* @param serverCustomizers the customizers to add * @param serverProcessors server processors to apply before the server starts
*/ */
public void addServerCustomizers(ServerRSocketFactoryCustomizer... serverCustomizers) { public void addServerProcessors(ServerRSocketFactoryProcessor... serverProcessors) {
Assert.notNull(serverCustomizers, "ServerCustomizer must not be null"); Assert.notNull(serverProcessors, "ServerProcessors must not be null");
this.serverCustomizers.addAll(Arrays.asList(serverCustomizers)); this.serverProcessors.addAll(Arrays.asList(serverProcessors));
} }
/** /**
@ -118,8 +118,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
public NettyRSocketServer create(SocketAcceptor socketAcceptor) { public NettyRSocketServer create(SocketAcceptor socketAcceptor) {
ServerTransport<CloseableChannel> transport = createTransport(); ServerTransport<CloseableChannel> transport = createTransport();
RSocketFactory.ServerRSocketFactory factory = RSocketFactory.receive(); RSocketFactory.ServerRSocketFactory factory = RSocketFactory.receive();
for (ServerRSocketFactoryCustomizer customizer : this.serverCustomizers) { for (ServerRSocketFactoryProcessor processor : this.serverProcessors) {
factory = customizer.apply(factory); factory = processor.process(factory);
} }
Mono<CloseableChannel> starter = factory.acceptor(socketAcceptor).transport(transport).start(); Mono<CloseableChannel> starter = factory.acceptor(socketAcceptor).transport(transport).start();
return new NettyRSocketServer(starter, this.lifecycleTimeout); return new NettyRSocketServer(starter, this.lifecycleTimeout);

@ -16,19 +16,25 @@
package org.springframework.boot.rsocket.server; package org.springframework.boot.rsocket.server;
import java.util.function.Function; import io.rsocket.RSocketFactory.ServerRSocketFactory;
import io.rsocket.RSocketFactory;
/** /**
* Mapping function that can be used to customize an RSocket server factory. * Processor that allows for custom modification of a {@link ServerRSocketFactory
* RSocketFactory.ServerRSocketFactory} before it is used.
* *
* @author Brian Clozel * @author Brian Clozel
* @see RSocketServerFactory * @see RSocketServerFactory
* @since 2.2.0 * @since 2.2.0
*/ */
@FunctionalInterface @FunctionalInterface
public interface ServerRSocketFactoryCustomizer public interface ServerRSocketFactoryProcessor {
extends Function<RSocketFactory.ServerRSocketFactory, RSocketFactory.ServerRSocketFactory> {
/**
* Apply this {@code ServerRSocketFactoryProcessor} to the given factory instance
* before it's used.
* @param factory the factory to process
* @return the processed factory instance
*/
ServerRSocketFactory process(ServerRSocketFactory factory);
} }

@ -36,7 +36,7 @@ import org.mockito.InOrder;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.boot.rsocket.server.RSocketServer; import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer; import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder; import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -130,19 +130,19 @@ class NettyRSocketServerFactoryTests {
} }
@Test @Test
void serverCustomizers() { void serverProcessors() {
NettyRSocketServerFactory factory = getFactory(); NettyRSocketServerFactory factory = getFactory();
ServerRSocketFactoryCustomizer[] customizers = new ServerRSocketFactoryCustomizer[2]; ServerRSocketFactoryProcessor[] processors = new ServerRSocketFactoryProcessor[2];
for (int i = 0; i < customizers.length; i++) { for (int i = 0; i < processors.length; i++) {
customizers[i] = mock(ServerRSocketFactoryCustomizer.class); processors[i] = mock(ServerRSocketFactoryProcessor.class);
given(customizers[i].apply(any(RSocketFactory.ServerRSocketFactory.class))) given(processors[i].process(any(RSocketFactory.ServerRSocketFactory.class)))
.will((invocation) -> invocation.getArgument(0)); .will((invocation) -> invocation.getArgument(0));
} }
factory.setServerCustomizers(Arrays.asList(customizers[0], customizers[1])); factory.setServerProcessors(Arrays.asList(processors));
this.server = factory.create(new EchoRequestResponseAcceptor()); this.server = factory.create(new EchoRequestResponseAcceptor());
InOrder ordered = inOrder((Object[]) customizers); InOrder ordered = inOrder((Object[]) processors);
for (ServerRSocketFactoryCustomizer customizer : customizers) { for (ServerRSocketFactoryProcessor processor : processors) {
ordered.verify(customizer).apply(any(RSocketFactory.ServerRSocketFactory.class)); ordered.verify(processor).process(any(RSocketFactory.ServerRSocketFactory.class));
} }
} }

Loading…
Cancel
Save