From 98f1a2ed1683fad4407e7ca948440bfca4d0e379 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Mar 2016 05:19:32 +1100 Subject: [PATCH 1/3] unsuccessful fix for #21 --- .../unixsocket/UnixServerSocketChannel.java | 9 +- .../jnr/unixsocket/AcceptInterruptTest.java | 118 ++++++++++++++++++ .../jnr/unixsocket/example/UnixServer.java | 94 +++++++++----- 3 files changed, 191 insertions(+), 30 deletions(-) create mode 100644 src/test/java/jnr/unixsocket/AcceptInterruptTest.java diff --git a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java index aa85ff7..b15f15c 100644 --- a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java +++ b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java @@ -53,7 +53,14 @@ public UnixSocketChannel accept() throws IOException { SockAddrUnix addr = remote.getStruct(); IntByReference len = new IntByReference(addr.getMaximumLength()); - int clientfd = Native.accept(getFD(), addr, len); + int clientfd=-1; + try { + begin(); + clientfd = Native.accept(getFD(), addr, len); + } + finally { + end(clientfd>=0); + } if (clientfd < 0) { throw new IOException("accept failed: " + Native.getLastErrorString()); diff --git a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java new file mode 100644 index 0000000..2bd1824 --- /dev/null +++ b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java @@ -0,0 +1,118 @@ +package jnr.unixsocket; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import jnr.unixsocket.UnixServerSocketChannel; +import jnr.unixsocket.UnixSocketAddress; +import junit.framework.Assert; + +import org.junit.Test; + +public class AcceptInterruptTest +{ + @Test + public void testAcceptCloseInterrupt() throws Exception + { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); + + final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); + channel.socket().bind(new UnixSocketAddress(file)); + + final AtomicBoolean run = new AtomicBoolean(true); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch complete = new CountDownLatch(1); + Thread accept = new Acceptor(complete, start, channel, run); + + // Start accepting thread + accept.setDaemon(true); + accept.start(); + Assert.assertTrue(start.await(5,TimeUnit.SECONDS)); + + // Mark as no longer running + run.set(false); + + // Close and Interrupt + channel.close(); + accept.interrupt(); + Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); + } + + @Test + public void testAcceptInterrupt() throws Exception + { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); + + final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); + channel.socket().bind(new UnixSocketAddress(file)); + + final AtomicBoolean run = new AtomicBoolean(true); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch complete = new CountDownLatch(1); + Thread accept = new Acceptor(complete, start, channel, run); + + // Start accepting thread + accept.setDaemon(true); + accept.start(); + Assert.assertTrue(start.await(5,TimeUnit.SECONDS)); + + // Mark as no longer running + run.set(false); + + accept.interrupt(); + Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); + + } + + + private final class Acceptor extends Thread { + private final CountDownLatch complete; + private final CountDownLatch start; + private final UnixServerSocketChannel channel; + private final AtomicBoolean run; + + private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, + AtomicBoolean run) { + this.complete = complete; + this.start = start; + this.channel = channel; + this.run = run; + } + + @Override public void run() + { + try + { + while(run.get()) + { + if (start.getCount()>0) + start.countDown(); + try + { + channel.accept(); + System.err.println("accepted"); + } + catch (IOException e) + { + e.printStackTrace(); + } + finally + { + System.err.println("finally"); + } + } + } + finally + { + complete.countDown(); + } + } + } +} diff --git a/src/test/java/jnr/unixsocket/example/UnixServer.java b/src/test/java/jnr/unixsocket/example/UnixServer.java index 2358d42..536d604 100644 --- a/src/test/java/jnr/unixsocket/example/UnixServer.java +++ b/src/test/java/jnr/unixsocket/example/UnixServer.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.Iterator; import java.util.Set; import java.util.Iterator; import java.util.logging.Level; @@ -30,84 +31,119 @@ import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketChannel; -public class UnixServer { +public class UnixServer +{ - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException + { java.io.File path = new java.io.File("/tmp/fubar.sock"); path.deleteOnExit(); UnixSocketAddress address = new UnixSocketAddress(path); UnixServerSocketChannel channel = UnixServerSocketChannel.open(); - try { + try + { Selector sel = NativeSelectorProvider.getInstance().openSelector(); channel.configureBlocking(false); channel.socket().bind(address); - channel.register(sel, SelectionKey.OP_ACCEPT, new ServerActor(channel, sel)); + channel.register(sel,SelectionKey.OP_ACCEPT,new ServerActor(channel,sel)); - while (sel.select() > 0) { + System.err.printf("Selecting ...%n"); + while (sel.select() > 0) + { Set keys = sel.selectedKeys(); - Iterator iterator = keys.iterator(); - while ( iterator.hasNext() ) { - SelectionKey k = iterator.next(); - Actor a = (Actor) k.attachment(); - if (!a.rxready()) { + System.err.printf("Selected %d%n",keys.size()); + + for (Iterator i = keys.iterator();i.hasNext();) + { + SelectionKey k = i.next(); + i.remove(); + Actor a = (Actor)k.attachment(); + System.err.printf("Key %s actor=%s%n",k,a); + if (!a.rxready()) + { k.cancel(); } iterator.remove(); } + System.err.printf("selecting ...%n"); } - } catch (IOException ex) { - Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE, null, ex); + } + catch (IOException ex) + { + Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE,null,ex); } } - static interface Actor { + static interface Actor + { public boolean rxready(); } - static final class ServerActor implements Actor { + static final class ServerActor implements Actor + { private final UnixServerSocketChannel channel; private final Selector selector; - public ServerActor(UnixServerSocketChannel channel, Selector selector) { + public ServerActor(UnixServerSocketChannel channel, Selector selector) + { this.channel = channel; this.selector = selector; } - public final boolean rxready() { - try { + + public final boolean rxready() + { + try + { + System.err.printf("%x rxready()%n",hashCode()); UnixSocketChannel client = channel.accept(); client.configureBlocking(false); - client.register(selector, SelectionKey.OP_READ, new ClientActor(client)); + ClientActor actor = new ClientActor(client); + System.err.printf("%x accepted%n",actor.hashCode()); + client.register(selector,SelectionKey.OP_READ,actor); return true; - } catch (IOException ex) { + } + catch (IOException ex) + { + ex.printStackTrace(); return false; } } } - static final class ClientActor implements Actor { + + static final class ClientActor implements Actor + { private final UnixSocketChannel channel; - public ClientActor(UnixSocketChannel channel) { + public ClientActor(UnixSocketChannel channel) + { this.channel = channel; } - public final boolean rxready() { - try { + public final boolean rxready() + { + try + { ByteBuffer buf = ByteBuffer.allocate(1024); int n = channel.read(buf); - UnixSocketAddress remote = channel.getRemoteSocketAddress(); - System.out.printf("Read in %d bytes from %s\n", n, remote); - - if (n > 0) { + UnixSocketAddress local = channel.getLocalSocketAddress(); + System.err.printf("%x Read in %d bytes from %s%n",hashCode(),n,local); + + if (n > 0) + { buf.flip(); channel.write(buf); return true; - } else if (n < 0) { + } + else if (n < 0) + { return false; } - } catch (IOException ex) { + } + catch (IOException ex) + { ex.printStackTrace(); return false; } From 5672ba127db9434f54b318e7bf7ae3f618874846 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 13 Dec 2016 16:10:08 +1100 Subject: [PATCH 2/3] fixed formatting --- .../jnr/unixsocket/AcceptInterruptTest.java | 96 ++++++++----------- 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java index 2bd1824..2837efb 100644 --- a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java +++ b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java @@ -12,14 +12,12 @@ import org.junit.Test; -public class AcceptInterruptTest -{ - @Test - public void testAcceptCloseInterrupt() throws Exception - { - File file = File.createTempFile("test", ".sock"); - file.delete(); - file.deleteOnExit(); +public class AcceptInterruptTest { + @Test + public void testAcceptCloseInterrupt() throws Exception { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); channel.socket().bind(new UnixSocketAddress(file)); @@ -43,12 +41,11 @@ public void testAcceptCloseInterrupt() throws Exception Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); } - @Test - public void testAcceptInterrupt() throws Exception - { - File file = File.createTempFile("test", ".sock"); - file.delete(); - file.deleteOnExit(); + @Test + public void testAcceptInterrupt() throws Exception { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); channel.socket().bind(new UnixSocketAddress(file)); @@ -68,51 +65,42 @@ public void testAcceptInterrupt() throws Exception accept.interrupt(); Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); - } - - - private final class Acceptor extends Thread { - private final CountDownLatch complete; - private final CountDownLatch start; - private final UnixServerSocketChannel channel; - private final AtomicBoolean run; + + private final class Acceptor extends Thread { + private final CountDownLatch complete; + private final CountDownLatch start; + private final UnixServerSocketChannel channel; + private final AtomicBoolean run; - private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, - AtomicBoolean run) { - this.complete = complete; - this.start = start; - this.channel = channel; - this.run = run; - } + private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, + AtomicBoolean run) { + this.complete = complete; + this.start = start; + this.channel = channel; + this.run = run; + } - @Override public void run() - { - try - { - while(run.get()) - { - if (start.getCount()>0) - start.countDown(); - try - { - channel.accept(); - System.err.println("accepted"); - } - catch (IOException e) - { - e.printStackTrace(); - } - finally - { - System.err.println("finally"); - } - } + @Override public void run() { + try { + while(run.get()) { + if (start.getCount()>0) + start.countDown(); + try { + channel.accept(); + System.err.println("accepted"); + } + catch (IOException e) { + e.printStackTrace(); } - finally - { - complete.countDown(); + finally { + System.err.println("finally"); } } + } + finally { + complete.countDown(); + } } + } } From 0c2ec2ad167aad3dbd44b510bf517391ccd9d088 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 22 Nov 2018 10:35:31 +0100 Subject: [PATCH 3/3] fixed formatting Signed-off-by: Greg Wilkins --- .../unixsocket/UnixServerSocketChannel.java | 6 +- .../jnr/unixsocket/AcceptInterruptTest.java | 66 +++++++++---------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java index ee988d7..3e8f8a9 100644 --- a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java +++ b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java @@ -54,13 +54,13 @@ public UnixSocketChannel accept() throws IOException { int maxLength = addr.getMaximumLength(); IntByReference len = new IntByReference(maxLength); - int clientfd=-1; + int clientfd=-1; try { - begin(); + begin(); clientfd = Native.accept(getFD(), addr, len); } finally { - end(clientfd>=0); + end(clientfd>=0); } if (clientfd < 0) { diff --git a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java index 2837efb..4c80f37 100644 --- a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java +++ b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java @@ -66,41 +66,41 @@ public void testAcceptInterrupt() throws Exception { accept.interrupt(); Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); } - + private final class Acceptor extends Thread { - private final CountDownLatch complete; - private final CountDownLatch start; - private final UnixServerSocketChannel channel; - private final AtomicBoolean run; + private final CountDownLatch complete; + private final CountDownLatch start; + private final UnixServerSocketChannel channel; + private final AtomicBoolean run; - private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, - AtomicBoolean run) { - this.complete = complete; - this.start = start; - this.channel = channel; - this.run = run; - } + private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, + AtomicBoolean run) { + this.complete = complete; + this.start = start; + this.channel = channel; + this.run = run; + } - @Override public void run() { - try { - while(run.get()) { - if (start.getCount()>0) - start.countDown(); - try { - channel.accept(); - System.err.println("accepted"); - } - catch (IOException e) { - e.printStackTrace(); - } - finally { - System.err.println("finally"); - } - } - } - finally { - complete.countDown(); - } - } + @Override public void run() { + try { + while(run.get()) { + if (start.getCount()>0) + start.countDown(); + try { + channel.accept(); + System.err.println("accepted"); + } + catch (IOException e) { + e.printStackTrace(); + } + finally { + System.err.println("finally"); + } + } + } + finally { + complete.countDown(); + } + } } }