@ -16,12 +16,21 @@
package org.springframework.boot.buildpack.platform.socket ;
import java.io.FileNotFoundException ;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.OutputStream ;
import java.io.RandomAccessFile ;
import java.net.Socket ;
import java.nio.ByteBuffer ;
import java.nio.channels.AsynchronousByteChannel ;
import java.nio.channels.AsynchronousCloseException ;
import java.nio.channels.AsynchronousFileChannel ;
import java.nio.channels.Channels ;
import java.nio.channels.CompletionHandler ;
import java.nio.file.NoSuchFileException ;
import java.nio.file.Paths ;
import java.nio.file.StandardOpenOption ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.Future ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Consumer ;
@ -32,6 +41,7 @@ import com.sun.jna.platform.win32.Kernel32;
* A { @link Socket } implementation for named pipes .
*
* @author Phillip Webb
* @author Scott Frederick
* @since 2.3 .0
* /
public class NamedPipeSocket extends Socket {
@ -40,27 +50,22 @@ public class NamedPipeSocket extends Socket {
private static final long TIMEOUT = TimeUnit . MILLISECONDS . toNanos ( 1000 ) ;
private final RandomAccessFile file ;
private final InputStream inputStream ;
private final OutputStream outputStream ;
private final AsynchronousFileByteChannel channel ;
NamedPipeSocket ( String path ) throws IOException {
this . file = open ( path ) ;
this . inputStream = new NamedPipeInputStream ( ) ;
this . outputStream = new NamedPipeOutputStream ( ) ;
this . channel = open ( path ) ;
}
private static RandomAccessFile open ( String path ) throws IOException {
private AsynchronousFileByteChannel open ( String path ) throws IOException {
Consumer < String > awaiter = Platform . isWindows ( ) ? new WindowsAwaiter ( ) : new SleepAwaiter ( ) ;
long startTime = System . nanoTime ( ) ;
while ( true ) {
try {
return new RandomAccessFile ( path , "rw" ) ;
return new AsynchronousFileByteChannel ( AsynchronousFileChannel . open ( Paths . get ( path ) ,
StandardOpenOption . READ , StandardOpenOption . WRITE ) ) ;
}
catch ( FileNotFound Exception ex ) {
if ( System . nanoTime ( ) - startTime > TIMEOUT ) {
catch ( NoSuch FileException ex ) {
if ( System . nanoTime ( ) - startTime > = TIMEOUT ) {
throw ex ;
}
awaiter . accept ( path ) ;
@ -70,21 +75,19 @@ public class NamedPipeSocket extends Socket {
@Override
public InputStream getInputStream ( ) {
return this . inputStream ;
return Channels . newInputStream ( this . channel ) ;
}
@Override
public OutputStream getOutputStream ( ) {
return this . outputStream ;
return Channels . newOutputStream ( this . channel ) ;
}
@Override
public void close ( ) throws IOException {
this . file . close ( ) ;
}
protected final RandomAccessFile getFile ( ) {
return this . file ;
if ( this . channel ! = null ) {
this . channel . close ( ) ;
}
}
/ * *
@ -98,35 +101,81 @@ public class NamedPipeSocket extends Socket {
}
/ * *
* { @link InputStream } returned from the { @link NamedPipeSocket } .
* Adapt an { @code AsynchronousByteChannel } to an { @code AsynchronousFileChannel } .
* /
private class NamedPipeInputStream extends InputStream {
private static class AsynchronousFileByteChannel implements AsynchronousByteChannel {
private final AsynchronousFileChannel fileChannel ;
AsynchronousFileByteChannel ( AsynchronousFileChannel fileChannel ) {
this . fileChannel = fileChannel ;
}
@Override
public int read ( ) throws IOException {
return getFile ( ) . read ( ) ;
public < A > void read ( ByteBuffer dst , A attachment , CompletionHandler < Integer , ? super A > handler ) {
this . fileChannel . read ( dst , 0 , attachment , new CompletionHandler < Integer , A > ( ) {
@Override
public void completed ( Integer read , A attachment ) {
handler . completed ( ( read > 0 ) ? read : - 1 , attachment ) ;
}
@Override
public void failed ( Throwable exc , A attachment ) {
if ( exc instanceof AsynchronousCloseException ) {
handler . completed ( - 1 , attachment ) ;
return ;
}
handler . failed ( exc , attachment ) ;
}
} ) ;
}
@Override
public int read ( byte [ ] bytes , int off , int len ) throws IOException {
return getFile ( ) . read ( bytes , off , len ) ;
public Future < Integer > read ( ByteBuffer dst ) {
CompletableFutureHandler future = new CompletableFutureHandler ( ) ;
this . fileChannel . read ( dst , 0 , null , future ) ;
return future ;
}
}
@Override
public < A > void write ( ByteBuffer src , A attachment , CompletionHandler < Integer , ? super A > handler ) {
this . fileChannel . write ( src , 0 , attachment , handler ) ;
}
/ * *
* { @link InputStream } returned from the { @link NamedPipeSocket } .
* /
private class NamedPipeOutputStream extends OutputStream {
@Override
public Future < Integer > write ( ByteBuffer src ) {
return this . fileChannel . write ( src , 0 ) ;
}
@Override
public void write ( int value ) throws IOException {
NamedPipeSocket . this . file . write ( value ) ;
public void close( ) throws IOException {
this . fileChannel . close ( ) ;
}
@Override
public void write ( byte [ ] bytes , int off , int len ) throws IOException {
NamedPipeSocket . this . file . write ( bytes , off , len ) ;
public boolean isOpen ( ) {
return this . fileChannel . isOpen ( ) ;
}
private static class CompletableFutureHandler extends CompletableFuture < Integer >
implements CompletionHandler < Integer , Object > {
@Override
public void completed ( Integer read , Object attachment ) {
complete ( ( read > 0 ) ? read : - 1 ) ;
}
@Override
public void failed ( Throwable exc , Object attachment ) {
if ( exc instanceof AsynchronousCloseException ) {
complete ( - 1 ) ;
return ;
}
completeExceptionally ( exc ) ;
}
}
}