Merge branch '2.3.x' into 2.4.x

Closes gh-26607
pull/26691/head
Andy Wilkinson 4 years ago
commit 64e76badc2

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2020 the original author or authors. * Copyright 2012-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -169,6 +169,7 @@ public class JettyReactiveWebServerFactory extends AbstractReactiveWebServerFact
InetSocketAddress address = new InetSocketAddress(getAddress(), port); InetSocketAddress address = new InetSocketAddress(getAddress(), port);
Server server = new Server(getThreadPool()); Server server = new Server(getThreadPool());
server.addConnector(createConnector(address, server)); server.addConnector(createConnector(address, server));
server.setStopTimeout(0);
ServletHolder servletHolder = new ServletHolder(servlet); ServletHolder servletHolder = new ServletHolder(servlet);
servletHolder.setAsyncSupported(true); servletHolder.setAsyncSupported(true);
ServletContextHandler contextHandler = new ServletContextHandler(server, "/", false, false); ServletContextHandler contextHandler = new ServletContextHandler(server, "/", false, false);

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2020 the original author or authors. * Copyright 2012-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -169,6 +169,7 @@ public class JettyServletWebServerFactory extends AbstractServletWebServerFactor
private Server createServer(InetSocketAddress address) { private Server createServer(InetSocketAddress address) {
Server server = new Server(getThreadPool()); Server server = new Server(getThreadPool());
server.setConnectors(new Connector[] { createConnector(address, server) }); server.setConnectors(new Connector[] { createConnector(address, server) });
server.setStopTimeout(0);
return server; return server;
} }

@ -424,6 +424,30 @@ public abstract class AbstractReactiveWebServerFactoryTests {
blockingHandler.completeOne(); blockingHandler.completeOne();
} }
@Test
void whenARequestIsActiveAfterGracefulShutdownEndsThenStopWillComplete() throws InterruptedException {
AbstractReactiveWebServerFactory factory = getFactory();
factory.setShutdown(Shutdown.GRACEFUL);
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
Mono<ResponseEntity<Void>> request = getWebClient(this.webServer.getPort()).build().get().retrieve()
.toBodilessEntity();
AtomicReference<ResponseEntity<Void>> responseReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
request.subscribe((response) -> {
responseReference.set(response);
responseLatch.countDown();
});
blockingHandler.awaitQueue();
AtomicReference<GracefulShutdownResult> result = new AtomicReference<>();
this.webServer.shutDownGracefully(result::set);
this.webServer.stop();
Awaitility.await().atMost(Duration.ofSeconds(30))
.until(() -> GracefulShutdownResult.REQUESTS_ACTIVE == result.get());
blockingHandler.completeOne();
}
@Test @Test
void whenARequestIsActiveThenStopWillComplete() throws InterruptedException, BrokenBarrierException { void whenARequestIsActiveThenStopWillComplete() throws InterruptedException, BrokenBarrierException {
AbstractReactiveWebServerFactory factory = getFactory(); AbstractReactiveWebServerFactory factory = getFactory();

@ -1121,6 +1121,31 @@ public abstract class AbstractServletWebServerFactoryTests {
} }
} }
@Test
void whenARequestIsActiveAfterGracefulShutdownEndsThenStopWillComplete()
throws InterruptedException, BrokenBarrierException {
AbstractServletWebServerFactory factory = getFactory();
factory.setShutdown(Shutdown.GRACEFUL);
BlockingServlet blockingServlet = new BlockingServlet();
this.webServer = factory
.getWebServer((context) -> context.addServlet("blockingServlet", blockingServlet).addMapping("/"));
this.webServer.start();
int port = this.webServer.getPort();
initiateGetRequest(port, "/");
blockingServlet.awaitQueue();
AtomicReference<GracefulShutdownResult> result = new AtomicReference<>();
this.webServer.shutDownGracefully(result::set);
this.webServer.stop();
Awaitility.await().atMost(Duration.ofSeconds(30))
.until(() -> GracefulShutdownResult.REQUESTS_ACTIVE == result.get());
try {
blockingServlet.admitOne();
}
catch (RuntimeException ex) {
}
}
protected Future<Object> initiateGetRequest(int port, String path) { protected Future<Object> initiateGetRequest(int port, String path) {
return initiateGetRequest(HttpClients.createMinimal(), port, path); return initiateGetRequest(HttpClients.createMinimal(), port, path);
} }

Loading…
Cancel
Save