Mercurial > hg > release > thermostat-2.0
changeset 2540:e2ad1b41260b
[PATCH] fix TCP IPC registration lock issue
See http://php.mandelson.org/mk3/index.php/2011/10/06/better-selectablechannel-registration-in-java-nio/
This could be an issue on Linux, even for Unix Domain sockets, but it doesn't seem to be.
Reviewed-by: ebaron
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2016-November/021784.html
line wrap: on
line diff
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java Tue Nov 29 11:22:49 2016 +0100 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java Thu Dec 01 16:30:37 2016 -0500 @@ -37,6 +37,7 @@ package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; import java.io.IOException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.HashSet; @@ -55,6 +56,7 @@ private final ExecutorService execService; private final Selector selector; private final ClientHandlerCreator handlerCreator; + private final Object registerLock = new Object(); private boolean shutdown; @@ -74,6 +76,7 @@ logger.info("Ready to accept client connections"); try { while (!shutdown) { + synchronized(registerLock) {} int selected = selector.select(); if (selected < 0) { // Something bad happened @@ -134,7 +137,31 @@ } } } - + + /** + * Register a new channel to this acceptThread. + * Handles the select() loop and associated locks. + * + * See http://php.mandelson.org/mk3/index.php/2011/10/06/better-selectablechannel-registration-in-java-nio/ + * This could also be an issue for Unix Domain Sockets, but it doesn't seem to be a problem on Linux for + * either UDS or TCP sockets on Linux - not sure why. + * + * May need to add a simiilar deregister() function for clean shutdown. + * + * @param channel + * @param ops + * @return + * @throws IOException + */ + SelectionKey register(SelectableChannel channel, int ops) throws IOException { + final SelectionKey key; + synchronized(registerLock) { + selector.wakeup(); + key = channel.register(selector, ops); + } + return key; + } + void shutdown() throws IOException { this.shutdown = true; // Interrupt accept thread
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java Tue Nov 29 11:22:49 2016 +0100 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java Thu Dec 01 16:30:37 2016 -0500 @@ -40,19 +40,10 @@ import java.net.SocketAddress; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; -import java.nio.file.DirectoryStream; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.UserPrincipal; -import java.nio.file.attribute.UserPrincipalLookupService; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -74,7 +65,6 @@ // Access/modification of this field should by synchronized private final Map<String, ThermostatServerSocketChannelImpl> sockets; private final ExecutorService execService; - private final FileUtils fileUtils; private final ChannelUtils channelUtils; private final ThreadCreator threadCreator; @@ -84,15 +74,14 @@ TcpSocketServerTransport(SelectorProvider selectorProvider) { this(selectorProvider, Executors.newFixedThreadPool(determineDefaultThreadPoolSize(), new CountingThreadFactory()), - new FileUtils(), new ThreadCreator(), new ChannelUtils()); + new ThreadCreator(), new ChannelUtils()); } TcpSocketServerTransport(SelectorProvider selectorProvider, ExecutorService execService, - FileUtils fileUtils, ThreadCreator threadCreator, ChannelUtils channelCreator) { + ThreadCreator threadCreator, ChannelUtils channelCreator) { this.selectorProvider = selectorProvider; this.sockets = new HashMap<>(); this.execService = execService; - this.fileUtils = fileUtils; this.channelUtils = channelCreator; this.threadCreator = threadCreator; } @@ -139,7 +128,7 @@ // Create socket ThermostatServerSocketChannelImpl socket = - channelUtils.createServerSocketChannel(name, addr, callbacks, props, selector); + channelUtils.createServerSocketChannel(name, addr, callbacks, props, selector, acceptThread); sockets.put(name, socket); if (!acceptThread.isAlive()) @@ -212,54 +201,6 @@ /* for testing purposes */ Thread getAcceptThread() { return acceptThread; } - /* For testing purposes */ - static class FileUtils { - - boolean exists(Path path) { - return Files.exists(path); - } - - void delete(Path path) throws IOException { - Files.delete(path); - } - - boolean isDirectory(Path path) { - return Files.isDirectory(path); - } - - Path createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException { - return Files.createDirectory(dir, attrs); - } - - Path createDirectories(Path dir, FileAttribute<?>... attrs) throws IOException { - return Files.createDirectories(dir, attrs); - } - - Set<PosixFilePermission> getPosixFilePermissions(Path path) throws IOException { - return Files.getPosixFilePermissions(path); - } - - FileAttribute<Set<PosixFilePermission>> toFileAttribute(Set<PosixFilePermission> perms) { - return PosixFilePermissions.asFileAttribute(perms); - } - - DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException { - return Files.newDirectoryStream(dir); - } - - UserPrincipalLookupService getUserPrincipalLookupService() { - return FileSystems.getDefault().getUserPrincipalLookupService(); - } - - String getUsername() { - return System.getProperty("user.name"); - } - - UserPrincipal getOwner(Path path) throws IOException { - return Files.getOwner(path); - } - - } /* For testing purposes */ static class ThreadCreator { @@ -271,8 +212,8 @@ /* For testing purposes */ static class ChannelUtils { ThermostatServerSocketChannelImpl createServerSocketChannel(String name, SocketAddress addr, - ThermostatIPCCallbacks callbacks, IPCProperties props, Selector selector) throws IOException { - return ThermostatServerSocketChannelImpl.open(name, addr, callbacks, props, selector); + ThermostatIPCCallbacks callbacks, IPCProperties props, Selector selector, AcceptThread acceptThread) throws IOException { + return ThermostatServerSocketChannelImpl.open(name, addr, callbacks, props, selector, acceptThread); } void closeSelector(Selector selector) throws IOException { selector.close();
--- a/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java Tue Nov 29 11:22:49 2016 +0100 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java Thu Dec 01 16:30:37 2016 -0500 @@ -36,7 +36,6 @@ package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; -import java.io.File; import java.io.IOException; import java.net.SocketAddress; @@ -63,22 +62,22 @@ private final ServerSocketChannel impl; private final SocketAddress addr; private final ThermostatIPCCallbacks callbacks; - private final Selector selector; - + private final AcceptThread acceptThread; + private SelectionKey key; ThermostatServerSocketChannelImpl(String name, ServerSocketChannel impl, SocketAddress addr, - ThermostatIPCCallbacks callbacks, Selector selector, SelectionKey key) { + ThermostatIPCCallbacks callbacks, AcceptThread acceptThread, SelectionKey key) { this.name = name; this.impl = impl; this.addr = addr; this.callbacks = callbacks; - this.selector = selector; + this.acceptThread = acceptThread; this.key = key; } static ThermostatServerSocketChannelImpl open(String name, SocketAddress addr, ThermostatIPCCallbacks callbacks, - IPCProperties props, Selector selector) throws IOException { + IPCProperties props, Selector selector, AcceptThread acceptThread) throws IOException { ServerSocketChannel impl = channelHelper.open(); @@ -87,10 +86,10 @@ // Set non-blocking channelHelper.configureBlocking(impl, false); // Register for selection - SelectionKey key = channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT); + SelectionKey key = channelHelper.register(acceptThread, impl, SelectionKey.OP_ACCEPT); // Attach wrapper socket to key, for use in select loop ThermostatServerSocketChannelImpl sock = - new ThermostatServerSocketChannelImpl(name, impl, addr, callbacks, selector, key); + new ThermostatServerSocketChannelImpl(name, impl, addr, callbacks, acceptThread, key); channelHelper.attachToKey(key, sock); // Send wakeup to trigger re-selection selector.wakeup(); @@ -113,7 +112,7 @@ // Set non-blocking channelHelper.configureBlocking(clientImpl, false); // Register for selection - SelectionKey key = channelHelper.register(clientImpl, selector, SelectionKey.OP_READ); + SelectionKey key = channelHelper.register(acceptThread, clientImpl, SelectionKey.OP_READ); return new AcceptedSocketChannelImpl(name, clientImpl, key); } @@ -145,10 +144,9 @@ void bind(ServerSocketChannel channel, SocketAddress addr) throws IOException { channel.socket().bind(addr); } - - SelectionKey register(SelectableChannel channel, Selector sel, int ops) throws IOException { - sel.wakeup(); - return channel.register(sel, ops); + + SelectionKey register(AcceptThread acceptThread, SelectableChannel channel, int ops) throws IOException { + return acceptThread.register(channel, ops); } SelectableChannel configureBlocking(AbstractSelectableChannel channel, boolean block) throws IOException {
--- a/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java Tue Nov 29 11:22:49 2016 +0100 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java Thu Dec 01 16:30:37 2016 -0500 @@ -48,24 +48,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.File; import java.io.IOException; import java.net.SocketAddress; -import java.net.InetSocketAddress; -import java.net.InetAddress; import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.SelectorProvider; -import java.nio.file.DirectoryStream; -import java.nio.file.InvalidPathException; -import java.nio.file.Path; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.nio.file.attribute.UserPrincipal; -import java.nio.file.attribute.UserPrincipalLookupService; -import java.util.Iterator; -import java.util.Set; + import java.util.concurrent.ExecutorService; import org.junit.Before; @@ -77,7 +65,6 @@ import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties; import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ChannelUtils; -import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.FileUtils; import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ThreadCreator; public class TcpSocketServerTransportTest { @@ -91,14 +78,12 @@ private SelectorProvider provider; private AbstractSelector selector; private ExecutorService execService; - private FileUtils fileUtils; private AcceptThread acceptThread; private ThreadCreator threadCreator; private ThermostatIPCCallbacks callbacks; private ChannelUtils channelUtils; private ThermostatServerSocketChannelImpl channel; private TcpSocketIPCProperties props; - private UserPrincipalLookupService lookup; private SocketAddress addr; @SuppressWarnings("unchecked") @@ -107,9 +92,7 @@ provider = mock(SelectorProvider.class); selector = mock(AbstractSelector.class); when(provider.openSelector()).thenReturn(selector); - - - fileUtils = mock(FileUtils.class); + addr = mock(SocketAddress.class); props = mock(TcpSocketIPCProperties.class); @@ -118,12 +101,6 @@ when(props.getType()).thenReturn(IPCType.TCP_SOCKET); doThrow(new IOException()).when(props).getSocketAddr(BAD_SERVER_NAME); - lookup = mock(UserPrincipalLookupService.class); - when(fileUtils.getUserPrincipalLookupService()).thenReturn(lookup); - when(fileUtils.getUsername()).thenReturn(USERNAME); - UserPrincipal principal = mock(UserPrincipal.class); - when(lookup.lookupPrincipalByName(USERNAME)).thenReturn(principal); - execService = mock(ExecutorService.class); acceptThread = mock(AcceptThread.class); @@ -134,10 +111,9 @@ channel = mock(ThermostatServerSocketChannelImpl.class); callbacks = mock(ThermostatIPCCallbacks.class); - when(channelUtils.createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector)).thenReturn(channel); + when(channelUtils.createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector, acceptThread)).thenReturn(channel); - transport = new TcpSocketServerTransport(provider, execService, fileUtils, - threadCreator, channelUtils); + transport = new TcpSocketServerTransport(provider, execService, threadCreator, channelUtils); } @Test @@ -152,8 +128,7 @@ // Not TcpSocketIPCProperties IPCProperties badProps = mock(IPCProperties.class); when(badProps.getType()).thenReturn(IPCType.UNKNOWN); - transport = new TcpSocketServerTransport(provider, execService, fileUtils, - threadCreator, channelUtils); + transport = new TcpSocketServerTransport(provider, execService, threadCreator, channelUtils); transport.start(badProps); } @@ -184,12 +159,8 @@ public void testShutdownFailure() throws Exception { transport.start(props); doThrow(new IOException()).when(acceptThread).shutdown(); - - //try { - transport.shutdown(); - fail("Expected IO Exception"); - //} catch (IOException e) { - //} + transport.shutdown(); + fail("Expected IO Exception"); } @Test @@ -200,7 +171,7 @@ } private void checkChannel() throws IOException { - verify(channelUtils).createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector); + verify(channelUtils).createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector, acceptThread); ThermostatServerSocketChannelImpl result = transport.getSockets().get(SERVER_NAME); assertEquals(channel, result); }
--- a/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java Tue Nov 29 11:22:49 2016 +0100 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java Thu Dec 01 16:30:37 2016 -0500 @@ -67,6 +67,7 @@ private static final String SERVER_NAME = "test"; private TcpServerSocketChannelHelper channelHelper; + private AcceptThread acceptThread; private SocketAddress addr; private ServerSocketChannel impl; private Selector selector; @@ -79,7 +80,9 @@ channelHelper = mock(TcpServerSocketChannelHelper.class); ThermostatServerSocketChannelImpl.setChannelHelper(channelHelper); addr = mock(SocketAddress.class); - + + acceptThread = mock(AcceptThread.class); + impl = mock(ServerSocketChannel.class); when(channelHelper.open()).thenReturn(impl); when(channelHelper.isOpen(impl)).thenReturn(true); @@ -88,7 +91,7 @@ callbacks = mock(ThermostatIPCCallbacks.class); key = mock(SelectionKey.class); - when(channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT)).thenReturn(key); + when(channelHelper.register(acceptThread, impl, SelectionKey.OP_ACCEPT)).thenReturn(key); props = mock(IPCProperties.class); File propFile = mock(File.class); when(props.getPropertiesFile()).thenReturn(propFile); @@ -118,7 +121,7 @@ verify(channelHelper).open(); verify(channelHelper).bind(impl, addr); verify(channelHelper).configureBlocking(impl, false); - verify(channelHelper).register(impl, selector, SelectionKey.OP_ACCEPT); + verify(channelHelper).register(acceptThread, impl, SelectionKey.OP_ACCEPT); verify(channelHelper).attachToKey(key, channel); verify(selector).wakeup(); @@ -130,7 +133,7 @@ verify(channelHelper, never()).open(); verify(channelHelper, never()).bind(impl, addr); verify(channelHelper, never()).configureBlocking(impl, false); - verify(channelHelper, never()).register(impl, selector, SelectionKey.OP_ACCEPT); + verify(channelHelper, never()).register(acceptThread, impl, SelectionKey.OP_ACCEPT); verify(channelHelper, never()).attachToKey(eq(key), any()); verify(selector, never()).wakeup(); } @@ -150,7 +153,7 @@ channel.accept(); verify(impl).accept(); verify(channelHelper).configureBlocking(clientImpl, false); - verify(channelHelper).register(clientImpl, selector, SelectionKey.OP_READ); + verify(channelHelper).register(acceptThread, clientImpl, SelectionKey.OP_READ); } @Test @@ -177,7 +180,7 @@ } private ThermostatServerSocketChannelImpl createChannel() throws IOException { - return ThermostatServerSocketChannelImpl.open(SERVER_NAME, addr, callbacks, props, selector); + return ThermostatServerSocketChannelImpl.open(SERVER_NAME, addr, callbacks, props, selector, acceptThread); } }