|
|
@ -1,5 +1,5 @@
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Copyright 2012-2019 the original author or authors.
|
|
|
|
* Copyright 2012-2020 the original author or authors.
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
@ -19,6 +19,7 @@ package org.springframework.boot.rsocket.netty;
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
import java.time.Duration;
|
|
|
|
import java.time.Duration;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
|
|
|
import java.util.concurrent.Callable;
|
|
|
|
|
|
|
|
|
|
|
|
import io.netty.buffer.PooledByteBufAllocator;
|
|
|
|
import io.netty.buffer.PooledByteBufAllocator;
|
|
|
|
import io.rsocket.AbstractRSocket;
|
|
|
|
import io.rsocket.AbstractRSocket;
|
|
|
@ -87,10 +88,13 @@ class NettyRSocketServerFactoryTests {
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
void specificPort() {
|
|
|
|
void specificPort() {
|
|
|
|
NettyRSocketServerFactory factory = getFactory();
|
|
|
|
NettyRSocketServerFactory factory = getFactory();
|
|
|
|
int specificPort = SocketUtils.findAvailableTcpPort(41000);
|
|
|
|
int specificPort = doWithRetry(() -> {
|
|
|
|
factory.setPort(specificPort);
|
|
|
|
int port = SocketUtils.findAvailableTcpPort(41000);
|
|
|
|
this.server = factory.create(new EchoRequestResponseAcceptor());
|
|
|
|
factory.setPort(port);
|
|
|
|
this.server.start();
|
|
|
|
this.server = factory.create(new EchoRequestResponseAcceptor());
|
|
|
|
|
|
|
|
this.server.start();
|
|
|
|
|
|
|
|
return port;
|
|
|
|
|
|
|
|
});
|
|
|
|
this.requester = createRSocketTcpClient();
|
|
|
|
this.requester = createRSocketTcpClient();
|
|
|
|
String payload = "test payload";
|
|
|
|
String payload = "test payload";
|
|
|
|
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
|
|
|
|
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
|
|
|
@ -117,15 +121,12 @@ class NettyRSocketServerFactoryTests {
|
|
|
|
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
|
|
|
|
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
|
|
|
|
resourceFactory.afterPropertiesSet();
|
|
|
|
resourceFactory.afterPropertiesSet();
|
|
|
|
factory.setResourceFactory(resourceFactory);
|
|
|
|
factory.setResourceFactory(resourceFactory);
|
|
|
|
int specificPort = SocketUtils.findAvailableTcpPort(41000);
|
|
|
|
|
|
|
|
factory.setPort(specificPort);
|
|
|
|
|
|
|
|
this.server = factory.create(new EchoRequestResponseAcceptor());
|
|
|
|
this.server = factory.create(new EchoRequestResponseAcceptor());
|
|
|
|
this.server.start();
|
|
|
|
this.server.start();
|
|
|
|
this.requester = createRSocketWebSocketClient();
|
|
|
|
this.requester = createRSocketWebSocketClient();
|
|
|
|
String payload = "test payload";
|
|
|
|
String payload = "test payload";
|
|
|
|
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
|
|
|
|
String response = this.requester.route("test").data(payload).retrieveMono(String.class).block(TIMEOUT);
|
|
|
|
assertThat(response).isEqualTo(payload);
|
|
|
|
assertThat(response).isEqualTo(payload);
|
|
|
|
assertThat(this.server.address().getPort()).isEqualTo(specificPort);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
@ -164,6 +165,19 @@ class NettyRSocketServerFactoryTests {
|
|
|
|
return RSocketRequester.builder().rsocketStrategies(strategies);
|
|
|
|
return RSocketRequester.builder().rsocketStrategies(strategies);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private <T> T doWithRetry(Callable<T> action) {
|
|
|
|
|
|
|
|
Exception lastFailure = null;
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
return action.call();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
catch (Exception ex) {
|
|
|
|
|
|
|
|
lastFailure = ex;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
throw new IllegalStateException("Action was not successful in 10 attempts", lastFailure);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static class EchoRequestResponseAcceptor implements SocketAcceptor {
|
|
|
|
static class EchoRequestResponseAcceptor implements SocketAcceptor {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|