@ -1,5 +1,5 @@
/ *
* Copyright 2012 - 202 2 the original author or authors .
* Copyright 2012 - 202 3 the original author or authors .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -25,6 +25,9 @@ import java.util.Deque;
import java.util.Iterator ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.locks.Condition ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import org.apache.commons.logging.Log ;
import org.apache.commons.logging.LogFactory ;
@ -122,7 +125,12 @@ public class HttpTunnelServer {
private long disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT ;
private volatile ServerThread serverThread ;
/ * *
* Guards access to { @link # serverThread } .
* /
private final Lock serverThreadLock = new ReentrantLock ( ) ;
private ServerThread serverThread ;
/ * *
* Creates a new { @link HttpTunnelServer } instance .
@ -164,7 +172,8 @@ public class HttpTunnelServer {
* @throws IOException in case of I / O errors
* /
protected ServerThread getServerThread ( ) throws IOException {
synchronized ( this ) {
this . serverThreadLock . lock ( ) ;
try {
if ( this . serverThread = = null ) {
ByteChannel channel = this . serverConnection . open ( this . longPollTimeout ) ;
this . serverThread = new ServerThread ( channel ) ;
@ -172,15 +181,22 @@ public class HttpTunnelServer {
}
return this . serverThread ;
}
finally {
this . serverThreadLock . unlock ( ) ;
}
}
/ * *
* Called when the server thread exits .
* /
void clearServerThread ( ) {
synchronized ( this ) {
this . serverThreadLock . lock ( ) ;
try {
this . serverThread = null ;
}
finally {
this . serverThreadLock . unlock ( ) ;
}
}
/ * *
@ -210,6 +226,13 @@ public class HttpTunnelServer {
private final Deque < HttpConnection > httpConnections ;
/ * *
* Guards access to { @link # httpConnections } .
* /
private final Lock httpConnectionsLock = new ReentrantLock ( ) ;
private final Condition httpConnectionsCondition = this . httpConnectionsLock . newCondition ( ) ;
private final HttpTunnelPayloadForwarder payloadForwarder ;
private boolean closed ;
@ -247,7 +270,8 @@ public class HttpTunnelServer {
while ( this . targetServer . isOpen ( ) ) {
closeStaleHttpConnections ( ) ;
ByteBuffer data = HttpTunnelPayload . getPayloadData ( this . targetServer ) ;
synchronized ( this . httpConnections ) {
this . httpConnectionsLock . lock ( ) ;
try {
if ( data ! = null ) {
HttpTunnelPayload payload = new HttpTunnelPayload ( this . responseSeq . incrementAndGet ( ) , data ) ;
payload . logIncoming ( ) ;
@ -255,15 +279,20 @@ public class HttpTunnelServer {
connection . respond ( payload ) ;
}
}
finally {
this . httpConnectionsLock . unlock ( ) ;
}
}
}
private HttpConnection getOrWaitForHttpConnection ( ) {
synchronized ( this . httpConnections ) {
this . httpConnectionsLock . lock ( ) ;
try {
HttpConnection httpConnection = this . httpConnections . pollFirst ( ) ;
while ( httpConnection = = null ) {
try {
this . httpConnections . wait ( HttpTunnelServer . this . longPollTimeout ) ;
this . httpConnectionsCondition . await ( HttpTunnelServer . this . longPollTimeout ,
TimeUnit . MILLISECONDS ) ;
}
catch ( InterruptedException ex ) {
Thread . currentThread ( ) . interrupt ( ) ;
@ -273,10 +302,14 @@ public class HttpTunnelServer {
}
return httpConnection ;
}
finally {
this . httpConnectionsLock . unlock ( ) ;
}
}
private void closeStaleHttpConnections ( ) throws IOException {
synchronized ( this . httpConnections ) {
this . httpConnectionsLock . lock ( ) ;
try {
checkNotDisconnected ( ) ;
Iterator < HttpConnection > iterator = this . httpConnections . iterator ( ) ;
while ( iterator . hasNext ( ) ) {
@ -287,6 +320,9 @@ public class HttpTunnelServer {
}
}
}
finally {
this . httpConnectionsLock . unlock ( ) ;
}
}
private void checkNotDisconnected ( ) {
@ -298,7 +334,8 @@ public class HttpTunnelServer {
}
private void closeHttpConnections ( ) {
synchronized ( this . httpConnections ) {
this . httpConnectionsLock . lock ( ) ;
try {
while ( ! this . httpConnections . isEmpty ( ) ) {
try {
this . httpConnections . removeFirst ( ) . respond ( HttpStatus . GONE ) ;
@ -308,6 +345,9 @@ public class HttpTunnelServer {
}
}
}
finally {
this . httpConnectionsLock . unlock ( ) ;
}
}
private void closeTargetServer ( ) {
@ -328,13 +368,17 @@ public class HttpTunnelServer {
if ( this . closed ) {
httpConnection . respond ( HttpStatus . GONE ) ;
}
synchronized ( this . httpConnections ) {
this . httpConnectionsLock . lock ( ) ;
try {
while ( this . httpConnections . size ( ) > 1 ) {
this . httpConnections . removeFirst ( ) . respond ( HttpStatus . TOO_MANY_REQUESTS ) ;
}
this . lastHttpRequestTime = System . currentTimeMillis ( ) ;
this . httpConnections . addLast ( httpConnection ) ;
this . httpConnections . notify ( ) ;
this . httpConnectionsCondition . signal ( ) ;
}
finally {
this . httpConnectionsLock . unlock ( ) ;
}
forwardToTargetServer ( httpConnection ) ;
}
@ -368,6 +412,10 @@ public class HttpTunnelServer {
private volatile boolean complete = false ;
private final Lock lock = new ReentrantLock ( ) ;
private final Condition lockCondition = this . lock . newCondition ( ) ;
public HttpConnection ( ServerHttpRequest request , ServerHttpResponse response ) {
this . createTime = System . currentTimeMillis ( ) ;
this . request = request ;
@ -426,8 +474,12 @@ public class HttpTunnelServer {
if ( this . async = = null ) {
while ( ! this . complete ) {
try {
synchronized ( this ) {
wait ( 1000 ) ;
this . lock . lock ( ) ;
try {
this . lockCondition . await ( 1 , TimeUnit . SECONDS ) ;
}
finally {
this . lock . unlock ( ) ;
}
}
catch ( InterruptedException ex ) {
@ -476,9 +528,13 @@ public class HttpTunnelServer {
this . async . complete ( ) ;
}
else {
synchronized ( this ) {
this . lock . lock ( ) ;
try {
this . complete = true ;
notifyAll ( ) ;
this . lockCondition . signalAll ( ) ;
}
finally {
this . lock . unlock ( ) ;
}
}
}