changeset 2540:e2ad1b41260b

[PATCH] fix TCP IPC registration lock issue See http://php.mandelson.org/mk3/index.php/2011/10/06/better-selectablechannel-registration-in-java-nio/ This could be an issue on Linux, even for Unix Domain sockets, but it doesn't seem to be. Reviewed-by: ebaron Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2016-November/021784.html
author Simon Tooke <stooke@redhat.com>
date Thu, 01 Dec 2016 16:30:37 -0500
parents 34066083ddd5
children 9c6f95f61014
files agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java
diffstat 5 files changed, 61 insertions(+), 121 deletions(-) [+]
line wrap: on
line diff
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java	Tue Nov 29 11:22:49 2016 +0100
+++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java	Thu Dec 01 16:30:37 2016 -0500
@@ -37,6 +37,7 @@
 package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal;
 
 import java.io.IOException;
+import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.HashSet;
@@ -55,6 +56,7 @@
     private final ExecutorService execService;
     private final Selector selector;
     private final ClientHandlerCreator handlerCreator;
+    private final Object registerLock = new Object();
     
     private boolean shutdown;
     
@@ -74,6 +76,7 @@
         logger.info("Ready to accept client connections");
         try {
             while (!shutdown) {
+                synchronized(registerLock) {}
                 int selected = selector.select();
                 if (selected < 0) {
                     // Something bad happened
@@ -134,7 +137,31 @@
             }
         }
     }
-    
+
+    /**
+     * Register a new channel to this acceptThread.
+     * Handles the select() loop and associated locks.
+     *
+     * See http://php.mandelson.org/mk3/index.php/2011/10/06/better-selectablechannel-registration-in-java-nio/
+     * This could also be an issue for Unix Domain Sockets, but it doesn't seem to be a problem on Linux for
+     * either UDS or TCP sockets on Linux - not sure why.
+     *
+     * May need to add a simiilar deregister() function for clean shutdown.
+     *
+     * @param channel
+     * @param ops
+     * @return
+     * @throws IOException
+     */
+    SelectionKey register(SelectableChannel channel, int ops) throws IOException {
+        final SelectionKey key;
+        synchronized(registerLock) {
+            selector.wakeup();
+            key =  channel.register(selector, ops);
+        }
+        return key;
+    }
+
     void shutdown() throws IOException {
         this.shutdown = true;
         // Interrupt accept thread
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java	Tue Nov 29 11:22:49 2016 +0100
+++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java	Thu Dec 01 16:30:37 2016 -0500
@@ -40,19 +40,10 @@
 import java.net.SocketAddress;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
-import java.nio.file.DirectoryStream;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
 import java.nio.file.attribute.UserPrincipal;
-import java.nio.file.attribute.UserPrincipalLookupService;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -74,7 +65,6 @@
     // Access/modification of this field should by synchronized
     private final Map<String, ThermostatServerSocketChannelImpl> sockets;
     private final ExecutorService execService;
-    private final FileUtils fileUtils;
     private final ChannelUtils channelUtils;
     private final ThreadCreator threadCreator;
     
@@ -84,15 +74,14 @@
     
     TcpSocketServerTransport(SelectorProvider selectorProvider) {
         this(selectorProvider, Executors.newFixedThreadPool(determineDefaultThreadPoolSize(), new CountingThreadFactory()), 
-                new FileUtils(), new ThreadCreator(), new ChannelUtils());
+                new ThreadCreator(), new ChannelUtils());
     }
     
     TcpSocketServerTransport(SelectorProvider selectorProvider, ExecutorService execService,
-            FileUtils fileUtils, ThreadCreator threadCreator, ChannelUtils channelCreator) {
+            ThreadCreator threadCreator, ChannelUtils channelCreator) {
         this.selectorProvider = selectorProvider;
         this.sockets = new HashMap<>();
         this.execService = execService;
-        this.fileUtils = fileUtils;
         this.channelUtils = channelCreator;
         this.threadCreator = threadCreator;
     }
@@ -139,7 +128,7 @@
         
         // Create socket
         ThermostatServerSocketChannelImpl socket = 
-                channelUtils.createServerSocketChannel(name, addr, callbacks, props, selector);
+                channelUtils.createServerSocketChannel(name, addr, callbacks, props, selector, acceptThread);
 
         sockets.put(name, socket);
         if (!acceptThread.isAlive())
@@ -212,54 +201,6 @@
     /* for testing purposes */
     Thread getAcceptThread() { return acceptThread; }
 
-    /* For testing purposes */
-    static class FileUtils {
-        
-        boolean exists(Path path) {
-            return Files.exists(path);
-        }
-        
-        void delete(Path path) throws IOException {
-            Files.delete(path);
-        }
-        
-        boolean isDirectory(Path path) {
-            return Files.isDirectory(path);
-        }
-        
-        Path createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
-            return Files.createDirectory(dir, attrs);
-        }
-        
-        Path createDirectories(Path dir, FileAttribute<?>... attrs) throws IOException {
-            return Files.createDirectories(dir, attrs);
-        }
-        
-        Set<PosixFilePermission> getPosixFilePermissions(Path path) throws IOException {
-            return Files.getPosixFilePermissions(path);
-        }
-        
-        FileAttribute<Set<PosixFilePermission>> toFileAttribute(Set<PosixFilePermission> perms) {
-            return PosixFilePermissions.asFileAttribute(perms);
-        }
-        
-        DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException {
-            return Files.newDirectoryStream(dir);
-        }
-        
-        UserPrincipalLookupService getUserPrincipalLookupService() {
-            return FileSystems.getDefault().getUserPrincipalLookupService();
-        }
-        
-        String getUsername() {
-            return System.getProperty("user.name");
-        }
-        
-        UserPrincipal getOwner(Path path) throws IOException {
-            return Files.getOwner(path);
-        }
-        
-    }
     
     /* For testing purposes */
     static class ThreadCreator {
@@ -271,8 +212,8 @@
     /* For testing purposes */
     static class ChannelUtils {
         ThermostatServerSocketChannelImpl createServerSocketChannel(String name, SocketAddress addr, 
-                ThermostatIPCCallbacks callbacks, IPCProperties props, Selector selector) throws IOException {
-            return ThermostatServerSocketChannelImpl.open(name, addr, callbacks, props, selector);
+                ThermostatIPCCallbacks callbacks, IPCProperties props, Selector selector, AcceptThread acceptThread) throws IOException {
+            return ThermostatServerSocketChannelImpl.open(name, addr, callbacks, props, selector, acceptThread);
         }
         void closeSelector(Selector selector) throws IOException {
             selector.close();
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java	Tue Nov 29 11:22:49 2016 +0100
+++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java	Thu Dec 01 16:30:37 2016 -0500
@@ -36,7 +36,6 @@
 
 package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal;
 
-import java.io.File;
 import java.io.IOException;
 
 import java.net.SocketAddress;
@@ -63,22 +62,22 @@
     private final ServerSocketChannel impl;
     private final SocketAddress addr;
     private final ThermostatIPCCallbacks callbacks;
-    private final Selector selector;
-    
+    private final AcceptThread acceptThread;
+
     private SelectionKey key;
     
     ThermostatServerSocketChannelImpl(String name, ServerSocketChannel impl, SocketAddress addr, 
-            ThermostatIPCCallbacks callbacks, Selector selector, SelectionKey key) {
+            ThermostatIPCCallbacks callbacks, AcceptThread acceptThread, SelectionKey key) {
         this.name = name;
         this.impl = impl;
         this.addr = addr;
         this.callbacks = callbacks;
-        this.selector = selector;
+        this.acceptThread = acceptThread;
         this.key = key;
     }
 
     static ThermostatServerSocketChannelImpl open(String name, SocketAddress addr, ThermostatIPCCallbacks callbacks, 
-            IPCProperties props, Selector selector) throws IOException {
+            IPCProperties props, Selector selector, AcceptThread acceptThread) throws IOException {
         
         ServerSocketChannel impl = channelHelper.open();
 
@@ -87,10 +86,10 @@
         // Set non-blocking
         channelHelper.configureBlocking(impl, false);
         // Register for selection
-        SelectionKey key = channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT);
+        SelectionKey key = channelHelper.register(acceptThread, impl, SelectionKey.OP_ACCEPT);
         // Attach wrapper socket to key, for use in select loop
         ThermostatServerSocketChannelImpl sock =
-                new ThermostatServerSocketChannelImpl(name, impl, addr, callbacks, selector, key);
+                new ThermostatServerSocketChannelImpl(name, impl, addr, callbacks, acceptThread, key);
         channelHelper.attachToKey(key, sock);
         // Send wakeup to trigger re-selection
         selector.wakeup();
@@ -113,7 +112,7 @@
         // Set non-blocking
         channelHelper.configureBlocking(clientImpl, false);
         // Register for selection
-        SelectionKey key = channelHelper.register(clientImpl, selector, SelectionKey.OP_READ);
+        SelectionKey key = channelHelper.register(acceptThread, clientImpl, SelectionKey.OP_READ);
         return new AcceptedSocketChannelImpl(name, clientImpl, key);
     }
 
@@ -145,10 +144,9 @@
         void bind(ServerSocketChannel channel, SocketAddress addr) throws IOException {
             channel.socket().bind(addr);
         }
-        
-        SelectionKey register(SelectableChannel channel, Selector sel, int ops) throws IOException {
-            sel.wakeup();
-            return channel.register(sel, ops);
+
+        SelectionKey register(AcceptThread acceptThread, SelectableChannel channel, int ops) throws IOException {
+            return acceptThread.register(channel, ops);
         }
         
         SelectableChannel configureBlocking(AbstractSelectableChannel channel, boolean block) throws IOException {
--- a/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java	Tue Nov 29 11:22:49 2016 +0100
+++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java	Thu Dec 01 16:30:37 2016 -0500
@@ -48,24 +48,12 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.net.InetSocketAddress;
-import java.net.InetAddress;
 
 import java.nio.channels.spi.AbstractSelector;
 import java.nio.channels.spi.SelectorProvider;
-import java.nio.file.DirectoryStream;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
-import java.nio.file.attribute.UserPrincipal;
-import java.nio.file.attribute.UserPrincipalLookupService;
-import java.util.Iterator;
-import java.util.Set;
+
 import java.util.concurrent.ExecutorService;
 
 import org.junit.Before;
@@ -77,7 +65,6 @@
 import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks;
 import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties;
 import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ChannelUtils;
-import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.FileUtils;
 import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ThreadCreator;
 
 public class TcpSocketServerTransportTest {
@@ -91,14 +78,12 @@
     private SelectorProvider provider;
     private AbstractSelector selector;
     private ExecutorService execService;
-    private FileUtils fileUtils;
     private AcceptThread acceptThread;
     private ThreadCreator threadCreator;
     private ThermostatIPCCallbacks callbacks;
     private ChannelUtils channelUtils;
     private ThermostatServerSocketChannelImpl channel;
     private TcpSocketIPCProperties props;
-    private UserPrincipalLookupService lookup;
     private SocketAddress addr;
     
     @SuppressWarnings("unchecked")
@@ -107,9 +92,7 @@
         provider = mock(SelectorProvider.class);
         selector = mock(AbstractSelector.class);
         when(provider.openSelector()).thenReturn(selector);
-        
-               
-        fileUtils = mock(FileUtils.class);
+
         addr = mock(SocketAddress.class);
         
         props = mock(TcpSocketIPCProperties.class);
@@ -118,12 +101,6 @@
         when(props.getType()).thenReturn(IPCType.TCP_SOCKET);
         doThrow(new IOException()).when(props).getSocketAddr(BAD_SERVER_NAME);
         
-        lookup = mock(UserPrincipalLookupService.class);
-        when(fileUtils.getUserPrincipalLookupService()).thenReturn(lookup);
-        when(fileUtils.getUsername()).thenReturn(USERNAME);
-        UserPrincipal principal = mock(UserPrincipal.class);
-        when(lookup.lookupPrincipalByName(USERNAME)).thenReturn(principal);
-        
         execService = mock(ExecutorService.class);
         
         acceptThread = mock(AcceptThread.class);
@@ -134,10 +111,9 @@
         channel = mock(ThermostatServerSocketChannelImpl.class);
         
         callbacks = mock(ThermostatIPCCallbacks.class);
-        when(channelUtils.createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector)).thenReturn(channel);
+        when(channelUtils.createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector, acceptThread)).thenReturn(channel);
         
-        transport = new TcpSocketServerTransport(provider, execService, fileUtils, 
-                threadCreator, channelUtils);
+        transport = new TcpSocketServerTransport(provider, execService, threadCreator, channelUtils);
     }
     
     @Test
@@ -152,8 +128,7 @@
         // Not TcpSocketIPCProperties
         IPCProperties badProps = mock(IPCProperties.class);
         when(badProps.getType()).thenReturn(IPCType.UNKNOWN);
-        transport = new TcpSocketServerTransport(provider, execService, fileUtils, 
-                threadCreator, channelUtils);
+        transport = new TcpSocketServerTransport(provider, execService, threadCreator, channelUtils);
         transport.start(badProps);
     }
     
@@ -184,12 +159,8 @@
     public void testShutdownFailure() throws Exception {
         transport.start(props);
         doThrow(new IOException()).when(acceptThread).shutdown();
-        
-        //try {
-            transport.shutdown();
-            fail("Expected IO Exception");
-        //} catch (IOException e) {
-        //}
+        transport.shutdown();
+        fail("Expected IO Exception");
     }
     
     @Test
@@ -200,7 +171,7 @@
     }
 
     private void checkChannel() throws IOException {
-        verify(channelUtils).createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector);
+        verify(channelUtils).createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector, acceptThread);
         ThermostatServerSocketChannelImpl result = transport.getSockets().get(SERVER_NAME);
         assertEquals(channel, result);
     }
--- a/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java	Tue Nov 29 11:22:49 2016 +0100
+++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java	Thu Dec 01 16:30:37 2016 -0500
@@ -67,6 +67,7 @@
     private static final String SERVER_NAME = "test";
     
     private TcpServerSocketChannelHelper channelHelper;
+    private AcceptThread acceptThread;
     private SocketAddress addr;
     private ServerSocketChannel impl;
     private Selector selector;
@@ -79,7 +80,9 @@
         channelHelper = mock(TcpServerSocketChannelHelper.class);
         ThermostatServerSocketChannelImpl.setChannelHelper(channelHelper);
         addr = mock(SocketAddress.class);
-        
+
+        acceptThread = mock(AcceptThread.class);
+
         impl = mock(ServerSocketChannel.class);
         when(channelHelper.open()).thenReturn(impl);
         when(channelHelper.isOpen(impl)).thenReturn(true);
@@ -88,7 +91,7 @@
         callbacks = mock(ThermostatIPCCallbacks.class);
 
         key = mock(SelectionKey.class);
-        when(channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT)).thenReturn(key);
+        when(channelHelper.register(acceptThread, impl, SelectionKey.OP_ACCEPT)).thenReturn(key);
         props = mock(IPCProperties.class);
         File propFile = mock(File.class);
         when(props.getPropertiesFile()).thenReturn(propFile);
@@ -118,7 +121,7 @@
         verify(channelHelper).open();
         verify(channelHelper).bind(impl, addr);
         verify(channelHelper).configureBlocking(impl, false);
-        verify(channelHelper).register(impl, selector, SelectionKey.OP_ACCEPT);
+        verify(channelHelper).register(acceptThread, impl, SelectionKey.OP_ACCEPT);
         verify(channelHelper).attachToKey(key, channel);
         verify(selector).wakeup();
         
@@ -130,7 +133,7 @@
         verify(channelHelper, never()).open();
         verify(channelHelper, never()).bind(impl, addr);
         verify(channelHelper, never()).configureBlocking(impl, false);
-        verify(channelHelper, never()).register(impl, selector, SelectionKey.OP_ACCEPT);
+        verify(channelHelper, never()).register(acceptThread, impl, SelectionKey.OP_ACCEPT);
         verify(channelHelper, never()).attachToKey(eq(key), any());
         verify(selector, never()).wakeup();
     }
@@ -150,7 +153,7 @@
         channel.accept();
         verify(impl).accept();
         verify(channelHelper).configureBlocking(clientImpl, false);
-        verify(channelHelper).register(clientImpl, selector, SelectionKey.OP_READ);
+        verify(channelHelper).register(acceptThread, clientImpl, SelectionKey.OP_READ);
     }
     
     @Test
@@ -177,7 +180,7 @@
     }
 
     private ThermostatServerSocketChannelImpl createChannel() throws IOException {
-        return ThermostatServerSocketChannelImpl.open(SERVER_NAME, addr, callbacks, props, selector);
+        return ThermostatServerSocketChannelImpl.open(SERVER_NAME, addr, callbacks, props, selector, acceptThread);
     }
 
 }