Merge branch '2.2.x'

Closes gh-21327
pull/21361/head
Andy Wilkinson 5 years ago
commit e03cb94190

@ -141,6 +141,7 @@ public class UndertowReactiveWebServerFactory extends AbstractReactiveWebServerF
else {
builder.addHttpListener(port, getListenAddress());
}
builder.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 0);
for (UndertowBuilderCustomizer customizer : this.builderCustomizers) {
customizer.customize(builder);
}

@ -241,6 +241,7 @@ public class UndertowServletWebServerFactory extends AbstractServletWebServerFac
else {
builder.addHttpListener(port, getListenAddress());
}
builder.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 0);
for (UndertowBuilderCustomizer customizer : this.builderCustomizers) {
customizer.customize(builder);
}

@ -25,6 +25,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@ -423,6 +424,25 @@ public abstract class AbstractReactiveWebServerFactoryTests {
assertThat(responseLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
@Test
void whenARequestIsActiveThenStopWillComplete() throws InterruptedException, BrokenBarrierException {
AbstractReactiveWebServerFactory factory = getFactory();
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
Mono<ResponseEntity<Void>> request = getWebClient(this.webServer.getPort()).build().get().retrieve()
.toBodilessEntity();
AtomicReference<ResponseEntity<Void>> responseReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
request.subscribe((response) -> {
responseReference.set(response);
responseLatch.countDown();
});
blockingHandler.awaitQueue();
this.webServer.stop();
blockingHandler.completeOne();
}
protected WebClient prepareCompressionTest() {
Compression compression = new Compression();
compression.setEnabled(true);

@ -1147,6 +1147,25 @@ public abstract class AbstractServletWebServerFactoryTests {
assertThat(request.get(30, TimeUnit.SECONDS)).isInstanceOf(HttpResponse.class);
}
@Test
void whenARequestIsActiveThenStopWillComplete() throws InterruptedException, BrokenBarrierException {
AbstractServletWebServerFactory factory = getFactory();
BlockingServlet blockingServlet = new BlockingServlet();
this.webServer = factory
.getWebServer((context) -> context.addServlet("blockingServlet", blockingServlet).addMapping("/"));
this.webServer.start();
int port = this.webServer.getPort();
initiateGetRequest(port, "/");
blockingServlet.awaitQueue();
this.webServer.stop();
try {
blockingServlet.admitOne();
}
catch (RuntimeException ex) {
}
}
protected Future<Boolean> initiateGracefulShutdown() {
RunnableFuture<Boolean> future = new FutureTask<Boolean>(() -> this.webServer.shutDownGracefully());
new Thread(future).start();
@ -1524,7 +1543,10 @@ public abstract class AbstractServletWebServerFactoryTests {
public void admitOne() {
try {
this.barriers.take().await();
CyclicBarrier barrier = this.barriers.take();
if (!barrier.isBroken()) {
barrier.await();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();

Loading…
Cancel
Save