# HG changeset patch # User Simon Tooke # Date 1486759971 18000 # Node ID a6ba41a449c85d1d0ccce732f45b89ba6ea4b3e8 # Parent ac0f70abdcb1f9085440c7eb7eecc92d3cc60474 [PATCH] Windows Named Pipes - preliminary implementation A preliminary implementation of windows named pipes. Does not set windoes named pipes as default IPC; remains TCP reviewed-by: aazores, ebaron review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-February/022126.html diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerMain.java --- a/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerMain.java Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerMain.java Fri Feb 10 15:52:51 2017 -0500 @@ -43,7 +43,10 @@ import com.redhat.thermostat.agent.ipc.client.ClientIPCService; import com.redhat.thermostat.agent.ipc.client.ClientIPCServiceFactory; import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.shared.config.NativeLibraryResolver; +import com.redhat.thermostat.shared.config.OS; import com.redhat.thermostat.shared.config.SSLConfiguration; +import com.redhat.thermostat.shared.config.internal.CommonPathsImpl; public class CommandChannelServerMain { @@ -69,6 +72,11 @@ } catch (NumberFormatException e) { throw new IOException("Port number must be a valid integer"); } + + // Windows named pipes has some native code - must set paths to find the DLL + if (OS.IS_WINDOWS) { + NativeLibraryResolver.setCommonPaths(new CommonPathsImpl()); + } // Get IPC configuration file location from system property String configFileStr = System.getProperty(CONFIG_FILE_PROP); diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCType.java --- a/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCType.java Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCType.java Fri Feb 10 15:52:51 2017 -0500 @@ -44,6 +44,7 @@ UNIX_SOCKET("unixsocket"), TCP_SOCKET("tcpsocket"), + WINDOWS_NAMED_PIPES("winpipes"), UNKNOWN(null); // Used for testing // Reverse lookup diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/pom.xml --- a/agent/ipc/pom.xml Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/ipc/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -64,6 +64,7 @@ server unix-socket tcp-socket + windows-named-pipes diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/server/pom.xml --- a/agent/ipc/server/pom.xml Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/ipc/server/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -91,11 +91,11 @@ ${project.version} test - - com.redhat.thermostat - thermostat-common-portability - ${project.version} - + + com.redhat.thermostat + thermostat-common-portability + ${project.version} + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/server/src/main/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriter.java --- a/agent/ipc/server/src/main/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriter.java Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/ipc/server/src/main/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriter.java Fri Feb 10 15:52:51 2017 -0500 @@ -41,6 +41,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.util.Properties; +import java.util.UUID; import com.redhat.thermostat.agent.ipc.common.internal.IPCType; import com.redhat.thermostat.common.portability.PortableProcessImpl; @@ -50,29 +51,29 @@ static final String PROP_IPC_TYPE = "type"; private static final String PROP_UNIX_SOCKET_DIR = "unixsocket.dir"; + private static final String PROP_WINPIPE_ID = "winpipe.id"; private static final String PROP_TCP_SOCKET_SUFFIX= ".tcpsocket.port"; - //private static final String TCP_SOCKET_JUMBO_FRAMES = "tcpsocket.jumboframes"; - // suggest some default vaules for TCP sockets - test this range for unused sockets - private static int TEST_SOCKET_LOW = 51200; - private static int TEST_SOCKET_HIGH = 55000; + // suggest some default values for TCP sockets - test this range for unused sockets + private static final int TEST_SOCKET_LOW = 51200; + private static final int TEST_SOCKET_HIGH = 55000; private static final String COMMENTS = - "Configuration for Inter-process Communication (IPC) used in the Thermostat agent.\n" - + "The agent is configured to use Unix sockets for IPC by default on Linux,\n" - + "or TCP sockets on Windows.\n" - + "The options below can be set to modify the defaults used by the agent:\n\n" - - + "Directory where Unix sockets are created, which may be deleted if it already exists.\n" + " Configuration for Inter-process Communication (IPC) used in the Thermostat agent.\n" + + " The agent is configured to use Unix sockets for IPC by default on Linux,\n" + + " and TCP sockets by default on Windows.\n" + + " The options below can be set to modify the defaults used by the agent:\n\n" + + " Transport type (one of unixsocket, tcpsocket, winpipes).\n" + + " On Linux, unixsocket or tcpsocket are valid choices." + + " On Windows, winpipes or tcpsocket are valid choices.\n" + + PROP_IPC_TYPE + "=transporttype\n\n" + + " Directory where Unix sockets are created, which may be deleted if it already exists.\n" + PROP_UNIX_SOCKET_DIR + "=/path/to/unix/sockets\n\n" - - + "TCP socket port numbers for various services.\n" + + " Prefix for thermostat-related named pipes - should be different for every installation.\n" + + PROP_WINPIPE_ID + "=XXXXXX\n\n" + + " TCP socket port numbers for various services.\n" + "command-channel" + PROP_TCP_SOCKET_SUFFIX + "=NNNN\n" + "agent-proxy" + PROP_TCP_SOCKET_SUFFIX + "=MMMM\n\n"; - - //in the future, will allow jumbo frames for performance - //+ "TCP parameters\n" - //+ TCP_SOCKET_JUMBO_FRAMES + "=false\n\n"; private final File configFile; private final PropertiesHelper helper; @@ -87,8 +88,11 @@ } void write() throws IOException { + // Write defaults to config file - configFile.createNewFile(); + if (!configFile.createNewFile()) { + throw new IOException("IPC configuration file '" + configFile + "' already exists"); + } Properties props = helper.createProperties(); @@ -96,10 +100,18 @@ // unix socket will work without configuration (creates sockets in tmp directory // but tcpsocket always needs ports predefined (in the future, should support service discovery) + // windows named pipes will use a random string for different Thermostat installations + // Windows named pipes id + // will be combined with other strings to make a full pipe name + // for example, winpipe.id=abc345 may five a pipe name such as \\.\thermostat-abc345-command-channel + { + final String randString = makeThermostatId(); + props.setProperty(PROP_WINPIPE_ID, randString); + } + // TCP properties (will be ignored if a different transpot is selected) // this implementation is flawed; // the unused ports might be in used by a process that simply wasn't running at thermostat setup time. - { int cmdPort = findUnusedTCPSocket(TEST_SOCKET_LOW, TEST_SOCKET_HIGH); int aport = cmdPort == 0 ? 0 : findUnusedTCPSocket(cmdPort + 1, TEST_SOCKET_HIGH); @@ -110,8 +122,8 @@ // write a property for each user on the system - currently only the current user // note: this is required for UNIX too if (OS.IS_WINDOWS) { - int uport = aport == 0 ? 0 : findUnusedTCPSocket( aport + 1, TEST_SOCKET_HIGH); - int uid = PortableProcessImpl.getInstance().getUid(0); // if pid=0, gets uid of current process + int uport = aport == 0 ? 0 : findUnusedTCPSocket(aport + 1, TEST_SOCKET_HIGH); + int uid = helper.getCurrentUid(); props.setProperty("agent-proxy-" + uid + PROP_TCP_SOCKET_SUFFIX, Integer.toString(uport)); } } @@ -121,6 +133,10 @@ } } + private static String makeThermostatId() { + return UUID.randomUUID().toString(); + } + private static int findUnusedTCPSocket(int lowPort, int highPort) { for (int port=lowPort; port<=highPort; port++) { if (isTCPPortAvailable(port)) { @@ -150,6 +166,9 @@ Properties createProperties() { return new Properties(); } + int getCurrentUid() { + return PortableProcessImpl.getInstance().getUid(0); // if pid=0, gets uid of current process + } } } diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/server/src/test/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriterTest.java --- a/agent/ipc/server/src/test/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriterTest.java Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/ipc/server/src/test/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriterTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -69,12 +69,14 @@ @Before public void setUp() throws Exception { helper = mock(PropertiesHelper.class); - + File configFile = mock(File.class); + when(configFile.createNewFile()).thenReturn(true); props = mock(Properties.class); when(helper.createProperties()).thenReturn(props); fos = mock(FileOutputStream.class); when(helper.createStream(configFile)).thenReturn(fos); + when(helper.getCurrentUid()).thenReturn(9876); writer = new IPCConfigurationWriter(configFile, helper); } @@ -83,7 +85,7 @@ public void testWrite() throws Exception { writer.write(); - final IPCType expectedType = OS.IS_UNIX ? IPCType.UNIX_SOCKET : IPCType.TCP_SOCKET; + final IPCType expectedType = OS.IS_UNIX ? IPCType.UNIX_SOCKET : IPCType.WINDOWS_NAMED_PIPES; verify(props).setProperty(IPCConfigurationWriter.PROP_IPC_TYPE, expectedType.getConfigValue()); verify(props).store(eq(fos), anyString()); verify(fos).close(); diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/pom.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,100 @@ + + + + 4.0.0 + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes + 1.99.12-SNAPSHOT + + + thermostat-agent-ipc-winpipes-client + jar + + Thermostat IPC Client - Windows Named Pipes + + + + junit + junit + test + + + org.mockito + mockito-core + test + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes-common + ${project.version} + + + com.redhat.thermostat + thermostat-agent-ipc-client + ${project.version} + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + + + + + + + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesMessageChannel.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesMessageChannel.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,88 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.SyncMessageReader; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.SyncMessageWriter; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesChannelImpl; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; + +public class WinPipesMessageChannel implements IPCMessageChannel { + + private final SyncMessageReader reader; + private final SyncMessageWriter writer; + + private final WinPipesChannelImpl channel; + + WinPipesMessageChannel(WinPipesChannelImpl channel) { + this(channel, new SyncMessageReader(channel), new SyncMessageWriter(channel)); + } + + WinPipesMessageChannel(final WinPipesChannelImpl channel, SyncMessageReader reader, SyncMessageWriter writer) { + this.channel = channel; + this.reader = reader; + this.writer = writer; + } + + WinPipesChannelImpl getByteChannel() { + return channel; + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public synchronized void close() throws IOException { + channel.close(); + } + + @Override + public ByteBuffer readMessage() throws IOException { + return reader.readData(); + } + + @Override + public void writeMessage(ByteBuffer message) throws IOException { + writer.writeData(message); + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,85 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import java.io.IOException; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransport; +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesChannelImpl; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; + +public class WinPipesTransportImpl implements ClientTransport { + + private final WinPipesIPCProperties pipeProps; + private final PipeHelper pipeHelper; + + WinPipesTransportImpl(WinPipesIPCProperties props) throws IOException { + this(props, new PipeHelper()); + } + + WinPipesTransportImpl(WinPipesIPCProperties props, PipeHelper pipeHelper) throws IOException { + this.pipeProps = props; + this.pipeHelper = pipeHelper; + } + + @Override + public IPCMessageChannel connect(String serverName) throws IOException { + requireNonNull(serverName, "server name cannot be null"); + final WinPipesChannelImpl channel = pipeHelper.openChannel(pipeProps.getPipeName(serverName)); + return pipeHelper.createMessageChannel(channel); + } + + // java.lang.Objects is JDK 7+ and we need this to be JDK 6 compat. + private static void requireNonNull(Object item, String message) { + if (item == null) { + throw new NullPointerException(message); + } + } + + // Helper class for testing + static class PipeHelper { + WinPipesChannelImpl openChannel(String name) throws IOException { + return WinPipesChannelImpl.open(name); + } + + WinPipesMessageChannel createMessageChannel(WinPipesChannelImpl channel) { + return new WinPipesMessageChannel(channel); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportProvider.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportProvider.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,62 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import java.io.IOException; + +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransport; +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransportProvider; +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; + +public class WinPipesTransportProvider implements ClientTransportProvider { + + @Override + public ClientTransport create(IPCProperties props) throws IOException { + if (!(props instanceof WinPipesIPCProperties)) { + throw new IOException("Unexpected IPC properties for 'winpipes' type"); + } + return new WinPipesTransportImpl((WinPipesIPCProperties)props); + } + + @Override + public IPCType getType() { + return IPCType.WINDOWS_NAMED_PIPES; + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.client.internal.ClientTransportProvider --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.client.internal.ClientTransportProvider Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,1 @@ +com.redhat.thermostat.agent.ipc.winpipes.client.internal.WinPipesTransportProvider diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesMessageChannelTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesMessageChannelTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,97 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.SyncMessageReader; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.SyncMessageWriter; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesChannelImpl; + +public class WinPipesMessageChannelTest { + + private SyncMessageReader reader; + private SyncMessageWriter writer; + private WinPipesMessageChannel channel; + private WinPipesChannelImpl sock; + + @Before + public void setUp() { + sock = mock(WinPipesChannelImpl.class); + reader = mock(SyncMessageReader.class); + writer = mock(SyncMessageWriter.class); + channel = new WinPipesMessageChannel(sock, reader, writer); + } + + @Test + public void testReadMessage() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + when(reader.readData()).thenReturn(buf); + ByteBuffer result = channel.readMessage(); + assertEquals(buf, result); + } + + @Test + public void testWriteMessage() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + channel.writeMessage(buf); + verify(writer).writeData(buf); + } + + @Test + public void testIsOpen() throws Exception { + when(sock.isOpen()).thenReturn(true); + assertTrue(channel.isOpen()); + verify(sock).isOpen(); + } + + @Test + public void testClose() throws Exception { + channel.close(); + verify(sock).close(); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportImplTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportImplTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,131 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransport; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCPropertiesProvider; +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesChannelImpl; + +public class WinPipesTransportImplTest { + + private static final String SERVER_NAME = "test"; + private static final String PIPE_NAME = "/path/to/pipe/test"; + private WinPipesTransportImpl.PipeHelper pipeHelper; + private WinPipesMessageChannel messageChannel; + private WinPipesIPCProperties props; + + @Before + public void setUp() throws Exception { + + pipeHelper = mock(WinPipesTransportImpl.PipeHelper.class); + WinPipesChannelImpl sockChannel = mock(WinPipesChannelImpl.class); + when(pipeHelper.openChannel(eq(SERVER_NAME))).thenReturn(sockChannel); + + props = mock(WinPipesIPCProperties.class); + when(props.getPipeName(SERVER_NAME)).thenReturn(PIPE_NAME); + } + + @Test + public void testConnectToServer() throws Exception { + WinPipesTransportImpl service = new WinPipesTransportImpl(props, pipeHelper); + IPCMessageChannel result = service.connect(SERVER_NAME); + assertEquals(messageChannel, result); + } + + @Test + public void testConnectToServerBadSocket() throws Exception { + when(pipeHelper.openChannel(anyString())).thenThrow(new IOException()); + WinPipesTransportImpl service = new WinPipesTransportImpl(props, pipeHelper); + + try { + service.connect(SERVER_NAME); + fail("Expected IOException"); + } catch (IOException ignored) { + verify(pipeHelper).openChannel(props.getPipeName(SERVER_NAME)); + } + } + + + /** + * small test routine to open a client connection, send a message and wait for a reply + * @param args (ignored) + */ + public static void main(String args[]) { + + try { + WinPipesIPCProperties props = new WinPipesIPCPropertiesProvider().create(System.getProperties(), new File(System.getProperty("user.home") + "/.thermostat/etc/ipc.properties")); + ClientTransport tr = new WinPipesTransportImpl(props); + IPCMessageChannel ch = tr.connect("command-channel"); + if (!ch.isOpen()) { + WinPipesMessageChannel wch = (WinPipesMessageChannel)(ch); + final String name = wch.getByteChannel().getName(); + System.err.println("error opening channel '" + name + "'"); + return; + } + ByteBuffer msg = ByteBuffer.wrap("".getBytes(Charset.forName("UTF-8"))); + ch.writeMessage(msg); + try { + sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + ByteBuffer msg2 = ch.readMessage(); + ch.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportProviderTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/client/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/client/internal/WinPipesTransportProviderTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.client.internal; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; + +public class WinPipesTransportProviderTest { + + private static final String SERVER_NAME = "test"; + private static final String PIPE_NAME = "/path/to/pipe/test"; + private IPCProperties props; + + @Before + public void setUp() throws Exception { + props = mock(IPCProperties.class); + } + + @Test(expected = IOException.class) + public void testBadProperties() throws Exception { + // Not WinPipesIPCProperties + new WinPipesTransportProvider().create(props); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/Makefile Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,49 @@ +CC = gcc +JAVAH = javah +MYCFLAGS = -c -Wall -fPIC $(EXTRA_CFLAGS) +MYLDFLAGS = -fPIC -shared $(EXTRA_CFLAGS) +COPY = cp -a + +JNI_PLATFORM = linux +CLASSPATH = target/classes/ +TARGET_DIR = target +SO_PREFIX = lib +SO_SUFFIX = .so + +INCLUDE = -I $(TARGET_DIR) -I "$(JAVA_HOME)/include/" -I "$(JAVA_HOME)/include/$(JNI_PLATFORM)" + +WINHELPER_SOURCES = src/main/native/WinPipesNativeHelper.c +WINHELPER_TARGET = $(TARGET_DIR)/WinPipesNativeHelper.c +WINHELPER_OBJECTS = $(WINHELPER_TARGET:.c=.o) +WINHELPER_EXECUTABLE = $(SO_PREFIX)WinPipesNativeWrapper$(SO_SUFFIX) + +.PHONY: +JNI_LIST = com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper + +$(JNI_LIST): + $(JAVAH) -force -classpath $(CLASSPATH) -d $(TARGET_DIR) $(JNI_LIST) + +all: $(JNI_LIST) init $(WINHELPER_SOURCES) $(WINHELPER_EXECUTABLE) + +.PHONY: +init: + $(COPY) $(WINHELPER_SOURCES) $(WINHELPER_TARGET) + +$(WINHELPER_EXECUTABLE): $(WINHELPER_OBJECTS) + $(CC) $(MYLDFLAGS) $(LDFLAGS) $(PLATFORM_LIBS) $(WINHELPER_OBJECTS) -o $(TARGET_DIR)/$@ + +.cpp.o: + $(CC) $(MYCFLAGS) $(CFLAGS) $(INCLUDE) $< -o $@ + +.c.o: + $(CC) $(MYCFLAGS) $(CFLAGS) $(INCLUDE) $< -o $@ + +clean-lib: + rm -f $(TARGET_DIR)/$(WINHELPER_EXECUTABLE) + +clean-obj: + rm -f $(WINHELPER_OBJECTS) $(WINHELPER_TARGET) + +clean: clean-obj clean-lib + + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/pom.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,178 @@ + + + + 4.0.0 + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes + 1.99.12-SNAPSHOT + + + thermostat-agent-ipc-winpipes-common + bundle + + Thermostat IPC Common - Windows Named Pipes + + + + default + + true + + + + + linux + + Unix + + + + + + + + windows + + Windows + + + -lws2_32 + + .dll + + + + + + org.codehaus.mojo + exec-maven-plugin + + + compile + + exec + + + + + make + + all + CC=${c.compiler} + SO_PREFIX=${sharedlib.prefix} + SO_SUFFIX=${sharedlib.suffix} + EXTRA_CFLAGS=${cflags} + JNI_PLATFORM=${jni.platform} + PLATFORM_LIBS=${platform.libs} + + + + + + + + + + + com.redhat.thermostat + thermostat-agent-ipc-common + ${project.version} + + + junit + junit + test + + + org.mockito + mockito-core + test + + + com.redhat.thermostat + thermostat-shared-config + ${project.version} + compile + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + Red Hat, Inc. + com.redhat.thermostat.agent.ipc.winpipes.common + + + com.redhat.thermostat.agent.ipc.winpipes.common.internal + + + <_nouses>true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + + + + + + + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageReader.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,76 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The windows pipes Async reader behaves slightly differently. + * There is no read() for it to call, instead it gets "fed" data from the ClientHandler until a complete message is built up + */ +public class AsyncMessageReader extends MessageReader { + + private final MessageListener listener; + + public AsyncMessageReader(MessageListener listener) { + this(listener, new MessageLimits()); + } + + public AsyncMessageReader(MessageListener listener, MessageLimits limits) { + super(limits); + this.listener = listener; + } + + /** + * process the next incoming block of data + * + * @param data chunk of a message + * @return true if the data completed the current message and called readFullMessage() + * @throws IOException if there was a protocol error or corrupt message + */ + public boolean process(ByteBuffer data) throws IOException { + processData(data); + return getState() == ReadState.NEW_MESSAGE; + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + listener.messageRead(fullMessage); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageWriter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageWriter.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,119 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.LinkedList; +import java.util.Queue; + +public class AsyncMessageWriter extends MessageWriter { + + private final WritableByteChannel channel; + private final Queue messages; + private final MessageLimits limits; + + private boolean headerSent; + + public AsyncMessageWriter(WritableByteChannel channel) { + this(channel, new MessageLimits()); + } + + AsyncMessageWriter(WritableByteChannel channel, MessageLimits limits) { + super(limits); + this.channel = channel; + this.limits = limits; + this.messages = new LinkedList(); + this.headerSent = false; + } + + public synchronized void writeData() throws IOException { + if (messages.isEmpty()) { + throw new IllegalStateException("Write requested, but nothing to write"); + } + + boolean done = writeMessage(messages.peek()); + if (done) { + // Reset state for next message + headerSent = false; + // Remove header and message from the queue + messages.remove(); + } + } + + private boolean writeMessage(MessageToWrite toWrite) throws IOException { + + ByteBuffer currentHeader = toWrite.getHeader(); + ByteBuffer currentMessage = toWrite.getMessage(); + + boolean done = false; + // Write the header if not yet written fully + if (!headerSent) { + channel.write(currentHeader); + // Check if the entire header was written + headerSent = !currentHeader.hasRemaining(); + } + + // Write the message, if we've sent the full header + if (headerSent) { + channel.write(currentMessage); + // Check if the message has been fully written + done = !currentMessage.hasRemaining(); + } + return done; + } + + public synchronized boolean hasMoreMessages() { + return !messages.isEmpty(); + } + + public synchronized void enqueueForWriting(ByteBuffer buf) throws IOException { + if (buf.remaining() > limits.getMaxMessageSize()) { + throw new IOException("Total message size is larger than maximum of " + + limits.getMaxMessageSize() + " bytes"); + } + // Split potentially large message into smaller messages and add headers + ByteBuffer fullMessage = buf.duplicate(); + while (fullMessage.hasRemaining()) { + MessageToWrite message = getNextMessage(fullMessage); + // Add header and message to queue + messages.add(message); + } + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageHeader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageHeader.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,187 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +class MessageHeader { + + private static final int INTEGER_BYTES = (Integer.SIZE/Byte.SIZE); + private static final int BYTE_BYTES = 1; + private static final byte[] MAGIC = { 'T', 'H', 'E', 'R' }; + // Size upto and including header size value + // Should remain constant across protocol versions + private static final int MINIMUM_HEADER_SIZE = MAGIC.length + INTEGER_BYTES * 2; + // Size of (magic + protocolVersion + headerSize + messageSize + moreData) + private static final int DEFAULT_HEADER_SIZE = MAGIC.length + INTEGER_BYTES * 3 + BYTE_BYTES; + // Protocol version to use for writing new MessageHeaders + private static final int DEFAULT_PROTOCOL_VERSION = 1; + + private final int protocolVersion; + private final int headerSize; + private int messageSize; + // If true, then the next message is a continuation of the data in this one + private boolean moreData; + + MessageHeader() { + this(DEFAULT_PROTOCOL_VERSION, DEFAULT_HEADER_SIZE); + } + + // Only used for reading MessageHeaders + MessageHeader(int protocolVersion, int headerSize) { + this.protocolVersion = protocolVersion; + this.headerSize = headerSize; + this.messageSize = -1; + this.moreData = false; + } + + byte[] toByteArray() { + ByteBuffer buf = ByteBuffer.allocate(getHeaderSize()); + buf.put(MAGIC); + buf.putInt(protocolVersion); + buf.putInt(headerSize); + buf.putInt(messageSize); + putBoolean(buf, moreData); + return buf.array(); + } + + static MessageHeader fromByteBuffer(ByteBuffer buf) throws IOException { + checkMagic(buf); + int protoVersion = getPositiveIntOrThrow(buf, "Protocol version"); + int headerSize = getPositiveIntOrThrow(buf, "Header size"); + return new MessageHeader(protoVersion, headerSize); + } + + void setRemainingFields(ByteBuffer buf) throws IOException { + int effectiveProtocolVersion = getEffectiveProtocolVersion(); + if (effectiveProtocolVersion <= 0) { + throw new IOException("Invalid protocol version: " + effectiveProtocolVersion); + } + if (effectiveProtocolVersion >= 1) { + this.messageSize = getPositiveIntOrThrow(buf, "Message size"); + this.moreData = getBoolean(buf); + } + } + + private int getEffectiveProtocolVersion() { + // If this header's protocol version is greater than we can handle + // just use the maximum we know about + return Math.min(protocolVersion, DEFAULT_PROTOCOL_VERSION); + } + + private static void checkMagic(ByteBuffer buf) throws IOException { + checkBytesRemaining(buf, MAGIC.length); + + byte[] bufMagic = new byte[MAGIC.length]; + buf.get(bufMagic); + if (!Arrays.equals(bufMagic, MAGIC)) { + throw new IOException("MessageHeader is invalid"); + } + } + + private static void checkBytesRemaining(ByteBuffer buf, int size) throws IOException { + if (buf.remaining() < size) { + throw new IOException("MessageHeader too short"); + } + } + + private static int getPositiveIntOrThrow(ByteBuffer buf, String errorMsgPrefix) throws IOException { + checkBytesRemaining(buf, INTEGER_BYTES); + int result = buf.getInt(); + if (result <= 0) { + throw new IOException(errorMsgPrefix + " must be greater than zero"); + } + return result; + } + + static int getDefaultProtocolVersion() { + return DEFAULT_PROTOCOL_VERSION; + } + + int getProtocolVersion() { + return protocolVersion; + } + + static int getMinimumHeaderSize() { + return MINIMUM_HEADER_SIZE; + } + + static int getDefaultHeaderSize() { + return DEFAULT_HEADER_SIZE; + } + + int getHeaderSize() { + return headerSize; + } + + int getMessageSize() { + return messageSize; + } + + void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + + boolean isMoreData() { + return moreData; + } + + void setMoreData(boolean moreData) { + this.moreData = moreData; + } + + void dumpHeader(String prefix) { + System.out.println(prefix + "Protocol Version: " + protocolVersion); + System.out.println(prefix + "Header Length: " + headerSize); + System.out.println(prefix + "Message Length: " + messageSize); + System.out.println(prefix + "More Data: " + moreData); + } + + private boolean getBoolean(ByteBuffer buf) throws IOException { + checkBytesRemaining(buf, 1); + byte b = buf.get(); + return (b != 0); + } + + private void putBoolean(ByteBuffer buf, boolean value) { + byte b = (byte) (value ? 1 : 0); + buf.put(b); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageLimits.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageLimits.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,82 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +public class MessageLimits { + + private static final int DEFAULT_MAX_MESSAGE_PART_SIZE = 0x800; // 2 KiB + private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x20000; // 128 KiB + private static final int DEFAULT_MAX_HEADER_SIZE = 0x80; // 128 B + private static final int DEFAULT_BUFFER_SIZE = DEFAULT_MAX_MESSAGE_PART_SIZE; + + /* + * Values below should adhere to the following: + * 0 <= MinHdr <= MaxHdr <= MaxMsgPart <= Buffer <= MaxMsg + */ + // Maximum value for a single part of a message + private final int maxMessagePartSize; + // Maximum value for the combined payload of a multi-part message + private final int maxMessageSize; + // Maximum value for a message header + private final int maxHeaderSize; + // Size used for read/write buffers + private final int bufferSize; + + public MessageLimits() { + this.maxMessagePartSize = DEFAULT_MAX_MESSAGE_PART_SIZE; + this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + this.maxHeaderSize = DEFAULT_MAX_HEADER_SIZE; + this.bufferSize = DEFAULT_BUFFER_SIZE; + } + + public int getMaxMessagePartSize() { + return maxMessagePartSize; + } + + public int getMaxMessageSize() { + return maxMessageSize; + } + + public int getMaxHeaderSize() { + return maxHeaderSize; + } + + public int getBufferSize() { + return bufferSize; + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageListener.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageListener.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,48 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MessageListener { + + void messageRead(ByteBuffer buf); + + void writeMessage(ByteBuffer buf) throws IOException; + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageReader.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,232 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +abstract class MessageReader { + + // States used to track how much of a message (header) we've processed + protected enum ReadState { NEW_MESSAGE, MIN_HEADER_READ, FULL_HEADER_READ, ERROR }; + // Number of bytes to read from the header to determine the total header size + protected static final int MIN_HEADER_SIZE = MessageHeader.getMinimumHeaderSize(); + // If true, dumps header information for each header read/written + private static final boolean DEBUG_HEADER = false; + + // Fixed size buffer used to read the initial part of a message header + private final ByteBuffer minHeaderBuf; + // Various message-related limits + private final MessageLimits limits; + + // Current state of data that has been processed by this reader + protected ReadState state; + // Message header for the message currently being processed + protected MessageHeader currentHeader; + // Buffer used for the remainder of the message header, + // after the minimum has been read + private ByteBuffer headerBuf; + // Buffer used for the message payload of the message being processed currently + private ByteBuffer messageBuf; + // Messages that have finished being processed, but are part of a multi-part message + // that has not been fully processed yet. + private List messages; + + protected MessageReader(MessageLimits limits) { + this.state = ReadState.NEW_MESSAGE; + this.currentHeader = null; + this.minHeaderBuf = ByteBuffer.allocate(MIN_HEADER_SIZE); + this.limits = limits; + this.headerBuf = null; + this.messageBuf = null; + this.messages = new ArrayList(); + } + + protected void processData(ByteBuffer readBuffer) throws IOException { + try { + while (readBuffer.hasRemaining()) { + switch (state) { + case NEW_MESSAGE: + // Append data to current buffer, if room + putMinRemaining(minHeaderBuf, readBuffer); + + // Is there enough data to read the minimum header size? + if (minHeaderBuf.remaining() == 0) { + minHeaderBuf.position(0); + if (currentHeader != null || headerBuf != null) { + throw new IllegalStateException("Header already processed"); + } + // Create Header + currentHeader = MessageHeader.fromByteBuffer(minHeaderBuf); + minHeaderBuf.clear(); + + // Check header size + // Set up reader to process remainder of header + int headerSize = currentHeader.getHeaderSize(); + if (headerSize > limits.getMaxHeaderSize()) { + throw new IOException("Message header size larger than maximum of " + + limits.getMaxHeaderSize() + " bytes"); + } + int remainingHeader = headerSize - MIN_HEADER_SIZE; + headerBuf = ByteBuffer.allocate(remainingHeader); + state = ReadState.MIN_HEADER_READ; + } + break; + case MIN_HEADER_READ: + if (currentHeader == null || headerBuf == null) { + throw new IllegalStateException("No header available"); + } else if (messageBuf != null) { + throw new IllegalStateException("Message already processed"); + } + + // Append data to current buffer, if room + putMinRemaining(headerBuf, readBuffer); + + // Have we read the full header? + if (headerBuf.remaining() == 0) { + headerBuf.position(0); + + // Finish setting MessageHeader object fields + currentHeader.setRemainingFields(headerBuf); + + // Set up reader to process message payload + int messageSize = currentHeader.getMessageSize(); + if (messageSize > limits.getMaxMessagePartSize()) { + throw new IOException("Message part size larger than maximum of " + + limits.getMaxMessagePartSize() + " bytes"); + } + messageBuf = ByteBuffer.allocate(messageSize); + headerBuf = null; + state = ReadState.FULL_HEADER_READ; + + // Dump header information if requested + if (DEBUG_HEADER) { + currentHeader.dumpHeader("[Read] "); + } + } + break; + case FULL_HEADER_READ: + if (currentHeader == null || messageBuf == null) { + throw new IllegalStateException("Missing header or message"); + } + // Append data to current buffer, if room + putMinRemaining(messageBuf, readBuffer); + + // Have we read the full message? + if (messageBuf.remaining() == 0) { + messageBuf.position(0); + + // Store this message until we received all parts + messages.add(messageBuf); + // Did we receive all parts of this message? + if (!currentHeader.isMoreData()) { + // Notify listener + ByteBuffer fullMessage = joinMessages(); + readFullMessage(fullMessage); + // Start new list of message parts + messages = new ArrayList(); + } + + // Reset reader state + messageBuf = null; + currentHeader = null; + state = ReadState.NEW_MESSAGE; + } + break; + case ERROR: + throw new IOException("Reader state corrupted by previous fatal error"); + default: + throw new IllegalStateException("Unknown state: " + state.name()); + } + } + } catch (IOException e) { + // Set to error state to stop this reader from processing more data + this.state = ReadState.ERROR; + throw e; + } catch (IllegalStateException e) { + // Set to error state to stop this reader from processing more data + this.state = ReadState.ERROR; + throw e; + } + } + + protected abstract void readFullMessage(ByteBuffer fullMessage); + + private ByteBuffer joinMessages() throws IOException { + // Single part shortcut + if (messages.size() == 1) { + return messages.get(0); + } + + int totalSize = 0; + int maxMessageSize = limits.getMaxMessageSize(); + for (ByteBuffer buf : messages) { + totalSize += buf.limit(); + // Check for overflow as well as size limit + if (totalSize < 0 || totalSize > maxMessageSize) { + throw new IOException("Total message size is larger than maximum of " + maxMessageSize + " bytes"); + } + } + + ByteBuffer fullMessage = ByteBuffer.allocate(totalSize); + for (ByteBuffer buf : messages) { + fullMessage.put(buf); + } + fullMessage.flip(); + return fullMessage; + } + + private void putMinRemaining(ByteBuffer dst, ByteBuffer src) { + int minRemaining = Math.min(dst.remaining(), src.remaining()); + for (int i = 0; i < minRemaining; i++) { + dst.put(src.get()); + } + } + + // For testing purposes + ReadState getState() { + return state; + } + + // For testing purposes + MessageHeader getCurrentHeader() { + return currentHeader; + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageWriter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageWriter.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,100 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.nio.ByteBuffer; + +abstract class MessageWriter { + + // If true, dumps header information for each header read/written + private static final boolean DEBUG_HEADER = false; + + // Various message-related limits + private final MessageLimits limits; + + protected MessageWriter(MessageLimits limits) { + this.limits = limits; + } + + protected MessageToWrite getNextMessage(ByteBuffer fullMessage) { + ByteBuffer message = ByteBuffer.allocate(limits.getMaxMessagePartSize()); + putMinRemaining(message, fullMessage); + message.flip(); + + // Create a message header for this part + int messageSize = message.limit(); + boolean moreData = fullMessage.hasRemaining(); + MessageHeader header = new MessageHeader(); + header.setMessageSize(messageSize); + header.setMoreData(moreData); + + // Dump header information if requested + if (DEBUG_HEADER) { + header.dumpHeader("[Write] "); + } + + ByteBuffer headerBuf = ByteBuffer.wrap(header.toByteArray()); + return new MessageToWrite(headerBuf, message); + } + + private void putMinRemaining(ByteBuffer dst, ByteBuffer src) { + int minRemaining = Math.min(dst.remaining(), src.remaining()); + for (int i = 0; i < minRemaining; i++) { + dst.put(src.get()); + } + } + + // Container class to hold a single header and message + protected static class MessageToWrite { + private final ByteBuffer header; + private final ByteBuffer message; + + public MessageToWrite(ByteBuffer header, ByteBuffer message) { + this.header = header; + this.message = message; + } + + public ByteBuffer getHeader() { + return header; + } + + public ByteBuffer getMessage() { + return message; + } + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageReader.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,111 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SyncMessageReader extends MessageReader { + + private final ByteBuffer readBuffer; + private final WinPipesChannelImpl channel; + private ByteBuffer fullMessage; + + public SyncMessageReader(WinPipesChannelImpl channel) { + this(channel, new MessageLimits()); + } + + public SyncMessageReader(WinPipesChannelImpl channel, MessageLimits limits) { + super(limits); + this.readBuffer = ByteBuffer.allocate(limits.getBufferSize()); + this.channel = channel; + this.fullMessage = null; + } + + public ByteBuffer readData() throws IOException { + fullMessage = null; + boolean moreData = true; + while (moreData) { + doBlockingRead(readBuffer, MIN_HEADER_SIZE); + processData(readBuffer); + assertState(ReadState.MIN_HEADER_READ); + + int remainingHeaderSize = currentHeader.getHeaderSize() - MIN_HEADER_SIZE; + doBlockingRead(readBuffer, remainingHeaderSize); + processData(readBuffer); + assertState(ReadState.FULL_HEADER_READ); + + int messageSize = currentHeader.getMessageSize(); + moreData = currentHeader.isMoreData(); + doBlockingRead(readBuffer, messageSize); + processData(readBuffer); + assertState(ReadState.NEW_MESSAGE); + } + + if (fullMessage == null) { + throw new IllegalStateException("No message was read"); + } + return fullMessage; + } + + private void doBlockingRead(ByteBuffer buf, int expected) throws IOException { + buf.clear(); + // Use a precise limit to read exact number of bytes + buf.limit(expected); + + while (buf.hasRemaining()) { + int read = channel.read(buf); + if (read < 0) { + throw new IOException("Stream closed unexpectedly while reading message"); + } + } + buf.flip(); + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + this.fullMessage = fullMessage; + } + + private void assertState(ReadState expected) { + if (state != expected) { + throw new IllegalStateException("Expected ReadState \'" + expected.name() + + "\', but was \'" + state.name() + "\'"); + } + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageWriter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageWriter.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,77 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SyncMessageWriter extends MessageWriter { + + private final WinPipesChannelImpl channel; + private final MessageLimits limits; + + public SyncMessageWriter(WinPipesChannelImpl channel) { + this(channel, new MessageLimits()); + } + + SyncMessageWriter(WinPipesChannelImpl channel, MessageLimits limits) { + super(limits); + this.channel = channel; + this.limits = limits; + } + + public void writeData(ByteBuffer data) throws IOException { + if (data.remaining() > limits.getMaxMessageSize()) { + throw new IOException("Total message size is larger than maximum of " + + limits.getMaxMessageSize() + " bytes"); + } + // Split into messages and add headers + ByteBuffer fullMessage = data.duplicate(); + while (fullMessage.hasRemaining()) { + MessageToWrite message = getNextMessage(fullMessage); + writeFully(message.getHeader()); + writeFully(message.getMessage()); + } + } + + private void writeFully(ByteBuffer buf) throws IOException { + while (buf.hasRemaining()) { + channel.write(buf); + } + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipe.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipe.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,127 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * WinPipe performs basic operations (mostly serverside) on a Windows named pipe. + */ +public class WinPipe { + + private final WinPipesNativeHelper helper; + + public static class Id { + private final String id; + Id(final String id) { + this.id = id; + } + String getPipePath() { + return id; + } + public String toString() { + return "pipeid:" + id; + } + } + + private final Id pipeid; + private long handle; + + public WinPipe(String pipeName) { + this.pipeid = new Id(pipeName); + this.helper = WinPipesNativeHelper.INSTANCE; + this.handle = WinPipesNativeHelper.INVALID_HANDLE; + } + + WinPipe(Id id) { + this.pipeid = id; + this.helper = WinPipesNativeHelper.INSTANCE; + this.handle = WinPipesNativeHelper.INVALID_HANDLE; + } + + // for testing + public WinPipe(WinPipesNativeHelper helper, String pipeName) { + this.pipeid = new Id(pipeName); + this.helper = helper; + this.handle = WinPipesNativeHelper.INVALID_HANDLE; + } + + public WinPipe(WinPipesNativeHelper helper, Id id) { + this.pipeid = id; + this.helper = helper; + this.handle = WinPipesNativeHelper.INVALID_HANDLE; + } + + long open() throws IOException { + handle = helper.openNamedPipe(pipeid.getPipePath()); + if (!isOpen()) { + throw new IOException("Can't open pipe " + getPipeName() + " err=" + helper.getLastError()); + } + return isOpen() ? handle : WinPipesNativeHelper.INVALID_HANDLE; + } + + public long createWindowsNamedPipe(int numInstances, int bufsize) { + handle = helper.createNamedPipe(pipeid.getPipePath(), numInstances, bufsize); + return isOpen() ? handle : WinPipesNativeHelper.INVALID_HANDLE; + } + + public void close() { + helper.closeHandle(handle); + handle = WinPipesNativeHelper.INVALID_HANDLE; + } + + public boolean isOpen() { + return handle != WinPipesNativeHelper.INVALID_HANDLE; + } + + public String getPipeName() { + return pipeid.getPipePath(); + } + + public String toString() { + return "winpipe:" + pipeid; + } + + int write(ByteBuffer buffer) { + return helper.writeFile(handle, buffer); + } + + int read(ByteBuffer buffer) { + return helper.readFile(handle, buffer); + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesChannelImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesChannelImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,137 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +public class WinPipesChannelImpl implements ByteChannel { + + private final WinPipesChannelHelper channelHelper; + + private final WinPipe pipe; + + protected WinPipesChannelImpl(WinPipe pipe) { + this(pipe, new WinPipesChannelHelper()); + } + + protected WinPipesChannelImpl(WinPipe pipe, WinPipesChannelHelper helper) { + this.pipe = pipe; + this.channelHelper = helper; + } + + public static WinPipesChannelImpl open(String channelname) throws IOException { + final WinPipesChannelHelper helper = new WinPipesChannelHelper(); + final WinPipe.Id addr = helper.createAddress(channelname); + final WinPipe pipe = helper.open(addr); + return new WinPipesChannelImpl(pipe,helper); + } + + public static WinPipesChannelImpl open(String channelname, WinPipesChannelHelper helper) throws IOException { + final WinPipe.Id addr = helper.createAddress(channelname); + final WinPipe pipe = helper.open(addr); + return new WinPipesChannelImpl(pipe,helper); + } + + public String getName() { + return pipe.getPipeName(); + } + + /** + * synchronous pipe read + * @param dst data to read + * @return number of bytes read or error if < 0 + * @throws IOException if there'a n IO error + */ + @Override + public int read(ByteBuffer dst) throws IOException { + return channelHelper.read(pipe, dst); + } + + /** + * synchronous pipe write + * @param src data to write + * @return number of byte written + * @throws IOException if theree's an IO error + */ + @Override + public int write(ByteBuffer src) throws IOException { + return channelHelper.write(pipe, src); + } + + @Override + public boolean isOpen() { + return channelHelper.isOpen(pipe); + } + + @Override + public void close() throws IOException { + channelHelper.close(pipe); + } + + // ---- For testing purposes ---- + + // Wraps methods that can't be mocked + static class WinPipesChannelHelper { + WinPipe open(WinPipe.Id addr) throws IOException { + WinPipe pipe = new WinPipe(addr); + pipe.open(); + return pipe; + } + + int read(WinPipe pipe, ByteBuffer dst) throws IOException { + return pipe.read(dst); + } + + int write(WinPipe pipe, ByteBuffer src) throws IOException { + return pipe.write(src); + } + + boolean isOpen(WinPipe pipe) { + return pipe.isOpen(); + } + + void close(WinPipe pipe) throws IOException { + pipe.close(); + } + + WinPipe.Id createAddress(final String channelname) throws IOException { + return new WinPipe.Id(channelname); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCProperties.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCProperties.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,86 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; + +public class WinPipesIPCProperties extends IPCProperties { + + static final String GLOBAL_WINPIPE_PREFIX = "\\\\.\\pipe\\"; + static final String DEFAULT_WINPIPE_PREFIX = "thermostat-pipe-"; + + static final String PIPE_PREFIX_PROPERTY = "winpipe.prefix"; + static final String WINPIPE_ID_PROPERTY = "winpipe.id"; + + private final String pipePrefix; + private final String pipeId; + + WinPipesIPCProperties(Properties props, File propFile) throws IOException { + this(props, propFile, new PathUtils()); + } + + WinPipesIPCProperties(final Properties props, final File propFile, final PathUtils pathUtils) throws IOException { + super(IPCType.WINDOWS_NAMED_PIPES, propFile); + this.pipePrefix = pathUtils.getProperty(props, PIPE_PREFIX_PROPERTY, DEFAULT_WINPIPE_PREFIX); + this.pipeId = pathUtils.getProperty(props, WINPIPE_ID_PROPERTY, ""); + } + + public String getPipeName(final String serverName) { + final String pipeid = getPipeId().isEmpty() ? "" : getPipeId() + '-'; + return GLOBAL_WINPIPE_PREFIX + pipePrefix + pipeid + serverName; + } + + String getPipePrefix() { + return pipePrefix; + } + + private String getPipeId() { + return pipeId; + } + + // Helper class for testing purposes + static class PathUtils { + String getProperty(final Properties props, final String name, final String dflt) { + return props.getProperty(name, dflt); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCPropertiesProvider.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCPropertiesProvider.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; + +public class WinPipesIPCPropertiesProvider implements IPCPropertiesProvider { + + @Override + public WinPipesIPCProperties create(Properties props, File propFile) throws IOException { + return new WinPipesIPCProperties(props, propFile); + } + + @Override + public IPCType getType() { + return IPCType.WINDOWS_NAMED_PIPES; + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesNativeHelper.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesNativeHelper.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,360 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import com.redhat.thermostat.shared.config.NativeLibraryResolver; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Wrapper for Windows native methods pertaining to named pipes + */ +public class WinPipesNativeHelper { + + public static WinPipesNativeHelper INSTANCE = new WinPipesNativeHelper(); + + static { + String lib = NativeLibraryResolver.getAbsoluteLibraryPath("WinPipesNativeWrapper"); + System.load(lib); + } + + // windows constants + public static final long WAIT_OBJECT_0 = getConstantWaitObject0(); + public static final long INVALID_HANDLE = 0; + public static final long INFINITE = getConstantInfinite0(); + + // windows 'error' codes + public static final int ERROR_SUCCESS = 0; + public static final int ERROR_IO_PENDING = getConstantErrorIOPending0(); + public static final int ERROR_IO_INCOMPLETE = getConstantErrorIOIncomplete0(); + public static final int ERROR_HANDLE_EOF = getConstantErrorHandleEOF0(); + public static final int ERROR_MORE_DATA = getConstantErrorMoreData0(); + public static final int ERROR_PIPE_BUSY = getConstantErrorPipeBusy0(); + public static final int ERROR_BROKEN_PIPE = getConstantErrorBrokenPipe0(); + public static final int INVALID_HANDLE_VALUE = getConstantInvalidHandle0(); + public static final int ERROR_PIPE_CONNECTED = getConstantErrorPipeConnected0(); + + private WinPipesNativeHelper() { + } + + public ByteBuffer createDirectBuffer(int buffersize) { + final ByteBuffer buf = createDirectBuffer0(buffersize); + buf.limit(0); + return buf; + } + + public ByteBuffer createDirectOverlapStruct(long eHandle) { + return createDirectOverlapStruct0(eHandle); + } + + public void freeDirectBuffer(final ByteBuffer byteBuffer) throws IOException { + ensureDirectBuffer(byteBuffer); + freeDirectBuffer0(byteBuffer); + } + + public long createNamedPipe(final String pipeName, int instances, int buffersize) { + return createNamedPipe0(pipeName, instances, buffersize); + } + + /** + * Open existing named pipe (as client, for synchronous access) + * @param pipeName name of Windows named pipe e.g. '\\.\pipe\thermostat-pipe-command-channel' + * @return windows handle or 0 if failure + */ + long openNamedPipe(final String pipeName) { + return openExistingNamedPipe0(pipeName); + } + + public long createEvent() { + return createEvent0(true, false); + } + + public void resetEvent(long eventHandle) { + resetEvent0(eventHandle); + } + + public void setEvent(long eventHandle) { + setEvent0(eventHandle); + } + + public int getLastError() { + return getLastError0(); + } + /** + * Wait for a client connection on this pipe + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @return 0 if sucessful, ERROR_IO_PENDING if pending, or other error code on failure + */ + public int connectNamedPipe(long pipeHandle, ByteBuffer ooverlapped) throws IOException { + ensureDirectBuffer(ooverlapped); + return connectNamedPipe0(pipeHandle, ooverlapped); + } + + public boolean disconnectNamedPipe(long pipeHandle) { + return disconnectNamedPipe0(pipeHandle); + } + + public long getNamedPipeClientProcessId(long pipeHandle) { + return getNamedPipeClientProcessId0(pipeHandle); + } + + public boolean closeHandle(long handle) { + return closeHandle0(handle); + } + + public int waitForMultipleObjects(int n, long[] ehandles, boolean waitAll, int millis) { + return waitForMultipleObjects0(n, ehandles, waitAll, millis); + } + + /** + * get result of overlapped operation + * + * @param pipeHandle handle + * @param ooverlapped buffer to store Windows OVERLAPPED structure + * @param wait true to wait for result, false to return immediately + * @return bytes transferred (>= 0) if success, (- bytes transferred -1) if failure + */ + public int getOverlappedResult(long pipeHandle, ByteBuffer ooverlapped, boolean wait) throws IOException { + ensureDirectBuffer(ooverlapped); + return getOverlappedResult0(pipeHandle, ooverlapped, wait); + } + + /** + * Read from a file in overlapped mode + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @param buffer byte buffer to be read into + * @return true if successful + */ + public boolean readFileOverlapped(long pipeHandle, ByteBuffer ooverlapped, ByteBuffer buffer) throws IOException { + ensureDirectBuffer(ooverlapped); + ensureDirectBuffer(buffer); + return readFileOverlapped0(pipeHandle, ooverlapped, buffer, buffer.position(), buffer.remaining()); + } + + /** + * Read from a file in syncronous mode + * + * @param pipeHandle pipe handle + * @param buffer byte buffer to be read into + * @return >= 0 if 0 or more bytes read, -1 if error + */ + int readFile(long pipeHandle, ByteBuffer buffer) { + int count = readFile0(pipeHandle, buffer.array(), buffer.position(), buffer.remaining()); + if (count >= 0) { + buffer.position(buffer.position() + count); + } + return count; + } + + /** + * Write to a file in overlapped mode + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @param buffer byte buffer to be written + * @return true if IO pending, false if error + */ + public boolean writeFileOverlapped(long pipeHandle, ByteBuffer ooverlapped, ByteBuffer buffer) throws IOException { + ensureDirectBuffer(ooverlapped); + ensureDirectBuffer(buffer); + return writeFileOverlapped0(pipeHandle, ooverlapped, buffer, buffer.position(), buffer.remaining()); + // don't modify buffer's position() here, becuase need to wait until the buffer has been fully written + } + + /** + * Write to a file in synchronous mode + * + * @param pipeHandle pipe handle + * @param buffer byte buffer to be written + * @return >= 0 if 0 or more bytes written, -1 if error + */ + public int writeFile(long pipeHandle, ByteBuffer buffer) { + final int count = writeFile0(pipeHandle, buffer.array(), buffer.position(), buffer.remaining()); + if (count > 0) + buffer.position(buffer.position() + count); + return count; + } + + /** + * Write to a file in synchronous mode + * + * @param pipeHandle pipe handle + * @param buffer byte array to be written + * @return >= 0 if 0 or more bytes written, -1 if error + */ + public int writeFile(long pipeHandle, byte[] buffer) { + return writeFile0(pipeHandle, buffer, 0, buffer.length); + } + + private void ensureDirectBuffer( ByteBuffer b) throws IOException { + if (!b.isDirect()) { + throw new IOException("ByteBuffer is not a DirectByteBuffer"); + } + } + + // native functions + + private native long createNamedPipe0(final String pipeName, int instances, int buffersize); + + private native long createEvent0(boolean manual, boolean initiallySignalled); + private native void resetEvent0(long handle); + private native void setEvent0(long handle); + + private native int getLastError0(); + + /** + * Connect to a namedpipe client (native code) + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @return return 0 if sucessfull, ERROR_IO_PENDING if still pending, or GetLastError() if failure + */ + private native int connectNamedPipe0(long pipeHandle, ByteBuffer ooverlapped); + + private native long openExistingNamedPipe0(final String pipeName); + + private native boolean disconnectNamedPipe0(long pipeHandle); + private native long getNamedPipeClientProcessId0(long pipeHandle); + private native boolean closeHandle0(long handle); + private native int waitForMultipleObjects0(int n, long[] ehandles, boolean waitAll, int millis); + private native int getOverlappedResult0(long pipeHandle, ByteBuffer ooverlapped, boolean wait); + + /** + * Read from a file in overlapped mode (native code) + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @param buffer byte buffer to be read into + * @param bufsize max number of bytes to read + * @return >= 0 if 0 ore more bytes read, -1 if pending IO, -2 if other error + */ + private native boolean readFileOverlapped0(long pipeHandle, ByteBuffer ooverlapped, ByteBuffer buffer, int position, int bufsize); + + /** + * Write to a file in overlapped mode (native code) + * + * The eventHandle will be inserted into the OVERLAP struct by the native code + * (Java can't do this as the OOVERLAP struct is opaque to Java) + * + * @param pipeHandle pipe handle + * @param ooverlapped contiguous array used to store a Windows OVERLAPPED structure + * @param buffer byte buffer to be written + * @param bufsize number of bytes to write + * @return true if successful + */ + private native boolean writeFileOverlapped0(long pipeHandle, ByteBuffer ooverlapped, ByteBuffer buffer, int position, int bufsize); + + /** + * Cancel pending asynchronous IO + * Will not cancel pending IO for this handle started on another thread + * + * @param pipeHandle handle with pending IO to cancel + * @return false if fails + */ + public boolean cancelIO(long pipeHandle) { + return cancelIo0(pipeHandle); + } + + /** + * Cancel pending asynchronous IO + * Cancells all pending IO on all threads + * + * @param pipeHandle handle with pending IO to cancel + * @return false if fails + */ + public boolean cancelAllIo(long pipeHandle, ByteBuffer ooverlapped) { + return cancelIoEx0(pipeHandle, ooverlapped); + } + + /** + * Cancel pending asynchronous IO + * Will not cancel pending IO for this handle started on another thread + * + * @param pipeHandle handle with pending IO to cancel + * @return false if fails + */ + private native boolean cancelIo0(long pipeHandle); + + /** + * Cancel pending asynchronous IO + * Cancells all pending IO on all threads + * + * @param pipeHandle handle with pending IO to cancel + * @return false if fails + */ + private native boolean cancelIoEx0(long pipeHandle, ByteBuffer ooverlapped); + + private native int readFile0(long pipeHandle, byte[] buffer, int position, int bufsize); + private native int writeFile0(long pipeHandle, byte[] buffer, int position, int bufsize); + + private static native long getConstantWaitObject0(); + private static native long getConstantInfinite0(); + + private static native int getConstantErrorIOPending0(); + private static native int getConstantErrorIOIncomplete0(); + private static native int getConstantErrorHandleEOF0(); + private static native int getConstantErrorMoreData0(); + private static native int getConstantErrorPipeBusy0(); + private static native int getConstantInvalidHandle0(); + private static native int getConstantErrorPipeConnected0(); + private static native int getConstantErrorBrokenPipe0(); + + // bytebuffer maniuplation + private native ByteBuffer createDirectBuffer0(int buffersize); + private native ByteBuffer createDirectOverlapStruct0(long eHandle); + private native void freeDirectBuffer0(final ByteBuffer byteBuffer); +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/native/WinPipesNativeHelper.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/native/WinPipesNativeHelper.c Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,467 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +#include "com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper.h" + +#include +#include +#include +#include + +#if defined(_WIN32) +# include +# include +#endif + +#define PIPE_TIMEOUT 5000 +//#define FILL_READ_BUFFER 1 + +static jint throw_IOException(JNIEnv *env, const char *message) { + const char *class_name = "java/io/IOException"; + jclass class = (*env)->FindClass(env, class_name); + if (class == NULL) { + return -1; + } + return (*env)->ThrowNew(env, class, message); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: createNamedPipe0 + * Signature: (Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_createNamedPipe0 + (JNIEnv *env, jobject obj, jstring pipeName, jint instances, jint bufsize) +{ + const char *pname = (*env)->GetStringUTFChars(env, pipeName, NULL); + if (pname == NULL) { + return 0; + } + HANDLE pipeHandle = CreateNamedPipe( + pname, // pipe name + PIPE_ACCESS_DUPLEX | // read/write access + FILE_FLAG_OVERLAPPED, // overlapped mode + PIPE_TYPE_MESSAGE | // message-type pipe + PIPE_READMODE_MESSAGE | // message-read mode + PIPE_WAIT, // blocking mode + instances, // number of instances + bufsize, // output buffer size + bufsize, // input buffer size + PIPE_TIMEOUT, // client time-out + NULL); // default security attributes + + (*env)->ReleaseStringUTFChars(env, pipeName, pname); + return (jlong)pipeHandle; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: openNamedPipe0 + * Signature: (Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_openExistingNamedPipe0 + (JNIEnv *env, jobject obj, jstring pipeName) +{ + const char *pname = (*env)->GetStringUTFChars(env, pipeName, NULL); + if (pname == NULL) { + return 0; + } + + jlong hnd = (jlong)CreateFile( + pname, // pipe name + GENERIC_READ | // read and write access + GENERIC_WRITE, + 0, // no sharing + NULL, // default security attributes + OPEN_EXISTING, // opens existing pipe + 0, // default attributes + NULL); // no template file + + (*env)->ReleaseStringUTFChars(env, pipeName, pname); + return (jlong)hnd; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: createEvent0 + * Signature: ()J + */ +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_createEvent0 + (JNIEnv *env, jobject obj, jboolean manual, jboolean initial) +{ + return (jlong)CreateEvent(NULL, manual, initial, NULL); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: resetEvent0 + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_resetEvent0 + (JNIEnv *env, jobject obj, jlong eventHandle) +{ + ResetEvent((HANDLE)eventHandle); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: setEvent0 + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_setEvent0 + (JNIEnv *env, jobject obj, jlong eventHandle) +{ + SetEvent((HANDLE)eventHandle); +} + +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getLastError0 + (JNIEnv *env, jobject obj) +{ + return GetLastError(); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: connectNamedPipe0 + * Signature: (JJ[D)Ljava/lang/String; + * + * return 0 if sucessfull, ERROR_IO_PENDING if still pending, or GetLastError() if failure + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_connectNamedPipe0 + (JNIEnv *env, jobject obj, jlong pipeHandle, jobject ooverlapped) +{ + OVERLAPPED* overlapped = (OVERLAPPED*)(*env)->GetDirectBufferAddress(env, ooverlapped); + BOOL ret = ConnectNamedPipe((HANDLE)pipeHandle, overlapped); + return ret ? GetLastError() : 0; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: disconnectnamedPipe0 + * Signature: (J)Z + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_disconnectNamedPipe0 + (JNIEnv *env, jobject obj, jlong pipeHandle) +{ + return DisconnectNamedPipe((HANDLE)pipeHandle); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getNamedPipeClientProcessId0 + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getNamedPipeClientProcessId0 + (JNIEnv *env, jobject obj, jlong pipeHandle) +{ + throw_IOException(env, "getNamedPipeClientProcessId0() not yet implemented"); + return -1; // TODO - implement if needed +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: closeHandle0 + * Signature: (J)Z + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_closeHandle0 + (JNIEnv *env, jobject obj, jlong handle) +{ + return CloseHandle((HANDLE)handle); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: waitForMultipleObjects0 + * Signature: (I[JZI)I + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_waitForMultipleObjects0 + (JNIEnv *env, jobject obj, jint numObjects, jlongArray handles, jboolean waitForAll, jint millis) +{ + void* eventHandles = (*env)->GetLongArrayElements(env, handles, NULL); + int ret = WaitForMultipleObjects(numObjects, eventHandles, waitForAll, millis); + (*env)->ReleaseLongArrayElements(env, handles, eventHandles, 0); + return (jint) ret; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getOverlappedResult0 + * Signature: (JJ[DZ)J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getOverlappedResult0 + (JNIEnv *env, jobject obj, jlong pipeHandle, jobject ooverlapped, jboolean wait) +{ + OVERLAPPED* overlapped = (OVERLAPPED*)(*env)->GetDirectBufferAddress(env, ooverlapped); + + DWORD bytesTransferred = 0; + BOOL success = GetOverlappedResult( (HANDLE)pipeHandle, overlapped, &bytesTransferred, wait ); + + return success ? bytesTransferred : (-bytesTransferred) - 1; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: readFileOverlapped0 + * Signature: (JJ[D[BJ)J + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_readFileOverlapped0 + (JNIEnv *env, jobject obj, jlong handle, jobject ooverlapped, jobject buffer, jint offset, jint bufsize) +{ + OVERLAPPED* overlapped = (OVERLAPPED*)(*env)->GetDirectBufferAddress(env, ooverlapped); + HANDLE hEvent = overlapped->hEvent; + memset(overlapped, 0, sizeof(OVERLAPPED)); + overlapped->hEvent = hEvent; + jbyte* buf = (*env)->GetDirectBufferAddress(env, buffer); +#if defined(FILL_READ_BUFFER) + memset(buf + offset, 'r', (int)bufsize); +#endif + BOOL success = ReadFile( (HANDLE)handle, buf + offset, (int)bufsize, NULL, overlapped ); + return success; +} + + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: readFile0 + * Signature: (J[BJ)J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_readFile0 + (JNIEnv *env, jobject obj, jlong handle, jbyteArray array, jint offset, jint bufsize) +{ + jbyte* buf = (*env)->GetByteArrayElements(env, array, 0); + DWORD bytesTransferred = 0; + BOOL success = ReadFile( (HANDLE)handle, buf + offset, (int)bufsize, &bytesTransferred, NULL ); + (*env)->ReleaseByteArrayElements(env, array, buf, 0); + return success ? bytesTransferred : (-bytesTransferred) - 1; +} + + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: writeFileOverlapped0 + * Signature: (JJ[D[BJ)J + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_writeFileOverlapped0 + (JNIEnv *env, jobject obj, jlong handle, jobject ooverlapped, jobject buffer, jint offset, jint bufsize) +{ + OVERLAPPED* overlapped = (OVERLAPPED*)(*env)->GetDirectBufferAddress(env, ooverlapped); + jbyte* buf = (*env)->GetDirectBufferAddress(env, buffer); + HANDLE hEvent = overlapped->hEvent; + memset(overlapped, 0, sizeof(OVERLAPPED)); + overlapped->hEvent = hEvent; + + BOOL success = WriteFile( (HANDLE)handle, buf + offset, (int)bufsize, NULL, overlapped ); + + return success; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: writeFile0 + * Signature: (J[BJ)J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_writeFile0 + (JNIEnv *env, jobject obj, jlong handle, jobject array, jint offset, jint bufsize) +{ + jbyte* data = (*env)->GetByteArrayElements(env, array, 0); + DWORD bytesTransferred = 0; + BOOL success = WriteFile( (HANDLE)handle, data + offset, (int)bufsize, &bytesTransferred, 0 ); + (*env)->ReleaseByteArrayElements(env, array, data, JNI_ABORT); + return success ? bytesTransferred : (-bytesTransferred) - 1; +} + +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantWaitObject0 + (JNIEnv *env, jobject obj) +{ + return (jlong)WAIT_OBJECT_0; +} + +JNIEXPORT jlong JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantInfinite0 + (JNIEnv *env, jobject obj) +{ + return (jlong)INFINITE; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorIOPending0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorIOPending0 + (JNIEnv *env, jobject obj) +{ + return ERROR_IO_PENDING; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorIOIncomplete0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorIOIncomplete0 + (JNIEnv *env, jobject obj) +{ + return ERROR_IO_INCOMPLETE; +} +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorHandleEOF0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorHandleEOF0 + (JNIEnv *env, jobject obj) +{ + return ERROR_HANDLE_EOF; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorMoreData0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorMoreData0 + (JNIEnv *env, jobject obj) +{ + return ERROR_MORE_DATA; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorPipeBusy0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorPipeBusy0 + (JNIEnv *env, jobject obj) +{ + return ERROR_PIPE_BUSY; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorPipeConnected0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorPipeConnected0 + (JNIEnv *env, jobject obj) +{ + return ERROR_PIPE_CONNECTED; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantInvalidHandle0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantInvalidHandle0 + (JNIEnv *env, jobject obj) +{ + return (jint)INVALID_HANDLE_VALUE; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: getConstantErrorBrokenPipe0 + * Signature: ()J + */ +JNIEXPORT jint JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_getConstantErrorBrokenPipe0 + (JNIEnv *env, jobject obj) +{ + return (jint)ERROR_BROKEN_PIPE; +} + + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: createDirectBuffer0 + * Signature: (I)Ljava/nio/ByteBuffer; + */ +JNIEXPORT jobject JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_createDirectBuffer0 + (JNIEnv *env, jobject obj, jint bufsize) +{ + void* buffer = malloc(bufsize); + jobject bytebuffer = (*env)->NewDirectByteBuffer(env, buffer, bufsize); + return bytebuffer; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: createDirectOverlapStruct0 + * Signature: ()Ljava/nio/ByteBuffer; + */ +JNIEXPORT jobject JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_createDirectOverlapStruct0 + (JNIEnv *env, jobject obj, jlong eHandle) +{ + const int bufsize = sizeof(OVERLAPPED); + OVERLAPPED* overlapped = (OVERLAPPED*)calloc(bufsize, 1); + overlapped->hEvent = (HANDLE)eHandle; + jobject bytebuffer = (*env)->NewDirectByteBuffer(env, overlapped, bufsize); + return bytebuffer; +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: freeDirectBuffer0 + * Signature: (Ljava/nio/ByteBuffer;)V + */ +JNIEXPORT void JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_freeDirectBuffer0 + (JNIEnv *env, jobject obj, jobject bytebuffer) +{ + void *buffer = (*env)->GetDirectBufferAddress(env, bytebuffer); + free(buffer); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: cancelIo0 + * Signature: (J)Z + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_cancelIo0 + (JNIEnv *env, jobject obj, jlong pipeHandle) +{ + return (jboolean)CancelIo(pipeHandle); +} + +/* + * Class: com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper + * Method: cancelIoEx0 + * Signature: (JLjava/nio/ByteBuffer;)Z + */ +JNIEXPORT jboolean JNICALL Java_com_redhat_thermostat_agent_ipc_winpipes_common_internal_WinPipesNativeHelper_cancelIoEx0 + (JNIEnv *env, jobject obj, jlong pipeHandle, jobject ooverlapped) +{ + OVERLAPPED* overlapped = ooverlapped == 0 ? 0 : (OVERLAPPED*)(*env)->GetDirectBufferAddress(env, ooverlapped); + return (jboolean)CancelIoEx(pipeHandle, ooverlapped); +} \ No newline at end of file diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,1 @@ +com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCPropertiesProvider diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageWriterTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/AsyncMessageWriterTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,158 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createByteArrays; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createHeader; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.WriteAnswer; + +public class AsyncMessageWriterTest { + + private static final int MAX_MESSAGE_PART_SIZE = 5; + + private WritableByteChannel channel; + private AsyncMessageWriter writer; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + channel = mock(WritableByteChannel.class); + limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_PART_SIZE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + writer = new AsyncMessageWriter(channel, limits); + } + + @Test + public void testWriteDataNotFullHeader() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + byte[][] results = new byte[][] { new byte[headerBytes.length - 1] }; + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + + // Should not attempt to write message, since header was not fully written + byte[] expected = Arrays.copyOfRange(headerBytes, 0, headerBytes.length - 1); + assertArrayEquals(expected, results[0]); + assertTrue(writer.hasMoreMessages()); + verify(channel).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataNotFullMessage() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + byte[][] results = createByteArrays(headerBytes.length, messageBytes.length - 1); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + + assertArrayEquals(headerBytes, results[0]); + assertArrayEquals(Arrays.copyOfRange(messageBytes, 0, messageBytes.length - 1), results[1]); + assertTrue(writer.hasMoreMessages()); + verify(channel, times(2)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataMulti() throws Exception { + final String message = "hello world"; + final byte[] fullMessageBytes = message.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, 10); + final byte[] message3Bytes = Arrays.copyOfRange(fullMessageBytes, 10, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, true); + final byte[] header3Bytes = createHeader(message3Bytes.length, false); + + byte[][] results = createByteArrays(header1Bytes.length, message1Bytes.length, + header2Bytes.length, message2Bytes.length, header3Bytes.length, message3Bytes.length); + + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(fullMessageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + assertTrue(writer.hasMoreMessages()); + writer.writeData(); + assertTrue(writer.hasMoreMessages()); + writer.writeData(); + assertFalse(writer.hasMoreMessages()); + + assertArrayEquals(header1Bytes, results[0]); + assertArrayEquals(message1Bytes, results[1]); + assertArrayEquals(header2Bytes, results[2]); + assertArrayEquals(message2Bytes, results[3]); + assertArrayEquals(header3Bytes, results[4]); + assertArrayEquals(message3Bytes, results[5]); + verify(channel, times(6)).write(any(ByteBuffer.class)); + } + + @Test(expected=IOException.class) + public void testWriteMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/ChannelTestUtils.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/ChannelTestUtils.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,147 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +class ChannelTestUtils { + + // Expected to be in-order position arguments + static byte[][] splitByteArray(byte[] data, int... splitPos) { + int numSplits = splitPos.length; + if (numSplits == 0) { + throw new IllegalArgumentException("Give at least one split position"); + } + byte[][] result = new byte[numSplits + 1][]; + + result[0] = Arrays.copyOfRange(data, 0, splitPos[0]); + for (int i = 1; i < numSplits; i++) { + result[i] = Arrays.copyOfRange(data, splitPos[i - 1], splitPos[i]); + } + result[numSplits] = Arrays.copyOfRange(data, splitPos[numSplits - 1], data.length); + return result; + } + + static byte[] joinByteArrays(byte[] first, byte[]... more) { + if (more.length == 0) { + return first; + } + + int size = first.length; + for (byte[] other : more) { + size += other.length; + } + + byte[] result = new byte[size]; + System.arraycopy(first, 0, result, 0, first.length); + int ix = first.length; + for (byte[] other : more) { + System.arraycopy(other, 0, result, ix, other.length); + ix += other.length; + } + return result; + } + + static byte[] createHeader(int messageSize, boolean moreData) { + MessageHeader header = new MessageHeader(); + header.setMessageSize(messageSize); + header.setMoreData(moreData); + return header.toByteArray(); + } + + static class ReadAnswer implements Answer { + + private byte[][] bufs; + private int count; + + ReadAnswer(byte[][] bufs) { + this.bufs = bufs; + this.count = 0; + } + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + // read(ByteBuffer) + ByteBuffer dst = (ByteBuffer) invocation.getArguments()[0]; + byte[] buf = bufs[count++]; + dst.put(buf); + return buf.length; + } + + } + + static byte[][] createByteArrays(int firstLength, int... moreLengths) { + int numSplits = moreLengths.length + 1; + byte[][] result = new byte[numSplits][]; + + result[0] = new byte[firstLength]; + for (int i = 1; i < numSplits; i++) { + result[i] = new byte[moreLengths[i - 1]]; + } + return result; + } + + static class WriteAnswer implements Answer { + + private final byte[][] bufs; + private int count; + + WriteAnswer(byte[][] bufs) { + this.bufs = bufs; + this.count = 0; + } + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + // write(ByteBuffer) + ByteBuffer src = (ByteBuffer) invocation.getArguments()[0]; + // Don't write all at once + byte[] buf = bufs[count++]; + int numToCopy = Math.min(buf.length, src.remaining()); + for (int i = 0; i < numToCopy; i++) { + buf[i] = src.get(); + } + return numToCopy; + } + + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageHeaderTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageHeaderTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,248 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; + +public class MessageHeaderTest { + + private static final int INT_BYTES = 4; + + @Test + public void testNewHeaderBase() throws Exception { + MessageHeader header = new MessageHeader(); + + assertEquals(MessageHeader.getDefaultProtocolVersion(), header.getProtocolVersion()); + assertEquals(MessageHeader.getDefaultHeaderSize(), header.getHeaderSize()); + + // Not part of minimal header + assertEquals(-1, header.getMessageSize()); + assertFalse(header.isMoreData()); + } + + @Test + public void testNewHeaderV1() throws Exception { + MessageHeader header = new MessageHeader(); + header.setMessageSize(50); + header.setMoreData(true); + + assertEquals(MessageHeader.getDefaultProtocolVersion(), header.getProtocolVersion()); + assertEquals(MessageHeader.getDefaultHeaderSize(), header.getHeaderSize()); + + // Protocol version 1 fields + assertEquals(50, header.getMessageSize()); + assertTrue(header.isMoreData()); + } + + @Test + public void testToBytesBase() throws Exception { + final byte[] expected = getBytes(MessageHeader.getDefaultProtocolVersion(), + MessageHeader.getDefaultHeaderSize(), -1, false); + + MessageHeader header = new MessageHeader(); + assertArrayEquals(expected, header.toByteArray()); + } + + @Test + public void testToBytesV1() throws Exception { + final byte[] expected = getBytes(1, MessageHeader.getDefaultHeaderSize(), 8000, true); + + MessageHeader header = new MessageHeader(); + // Protocol version 1 fields + header.setMessageSize(8000); + header.setMoreData(true); + + assertArrayEquals(expected, header.toByteArray()); + } + + @Test + public void testFromByteBuffer() throws Exception { + ByteBuffer buf = createByteBuffer(5, 20, 8000, true); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + assertEquals(5, header.getProtocolVersion()); + assertEquals(20, header.getHeaderSize()); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortMagic() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(3); + buf.put(new byte[] { 'T', 'H', 'E' }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortProtocolVersion() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(7); + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(new byte[] { 0x0, 0x0, 0x1 }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(11); + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(intToBytes(5)); + buf.put(new byte[] { 0x0, 0x5, 0x2 }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferNegativeProtocolVersion() throws Exception { + ByteBuffer buf = createByteBuffer(-4, 25, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferZeroProtocolVersion() throws Exception { + ByteBuffer buf = createByteBuffer(0, 25, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferNegativeHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(MessageHeader.getDefaultHeaderSize()); + fillByteBuffer(buf, 5, -22, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferZeroHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(MessageHeader.getDefaultHeaderSize()); + fillByteBuffer(buf, 5, 0, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV0() throws Exception { + ByteBuffer buf = createByteBuffer(0, MessageHeader.getDefaultHeaderSize(), 20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + + // Expect no change from default + assertEquals(-1, header.getMessageSize()); + assertEquals(false, header.isMoreData()); + } + + @Test + public void testSetRemainingFieldsV1() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + + // Expect remaining fields copied from buf + assertEquals(20, header.getMessageSize()); + assertTrue(header.isMoreData()); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ShortMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + // Subtract moreData bit, and one bit of message size + buf.limit(buf.limit() - 2); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1NegativeMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), -20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ZeroMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 0, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ShortMoreData() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + // Subtract moreData bit + buf.limit(buf.limit() - 1); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + private byte[] getBytes(int protoVer, int headerSize, int messageSize, boolean moreData) { + ByteBuffer buf = createByteBuffer(protoVer, headerSize, messageSize, moreData); + return buf.array(); + } + + private ByteBuffer createByteBuffer(int protoVer, int headerSize, int messageSize, boolean moreData) { + ByteBuffer buf = ByteBuffer.allocate(headerSize); + fillByteBuffer(buf, protoVer, headerSize, messageSize, moreData); + return buf; + } + + private void fillByteBuffer(ByteBuffer buf, int protoVer, int headerSize, int messageSize, boolean moreData) { + // Magic number + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(intToBytes(protoVer)); + buf.put(intToBytes(headerSize)); + buf.put(intToBytes(messageSize)); + buf.put((byte) (moreData ? 1 : 0)); + buf.flip(); + } + + private byte[] intToBytes(int num) { + ByteBuffer buf = ByteBuffer.allocate(INT_BYTES); + buf.putInt(num); + return buf.array(); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageReaderTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageReaderTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,354 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.joinByteArrays; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.splitByteArray; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageReader.ReadState; + +public class MessageReaderTest { + + private TestMessageReader reader; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + limits = mock(MessageLimits.class); + when(limits.getMaxHeaderSize()).thenReturn(Integer.MAX_VALUE); + when(limits.getMaxMessagePartSize()).thenReturn(Integer.MAX_VALUE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + reader = new TestMessageReader(limits); + } + + @Test + public void testOneRead() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3); + reader.readData(splitData[0]); + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + assertNull(reader.getCurrentHeader()); + } + + @Test + public void testTwoRead() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + assertNull(reader.getCurrentHeader()); + } + + @Test + public void testThreeReadGotMinHeader() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + reader.readData(splitData[2]); + + assertEquals(ReadState.MIN_HEADER_READ, reader.getState()); + assertNotNull(reader.getCurrentHeader()); + } + + @Test + public void testFourReadGotMinHeader() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE, MessageReader.MIN_HEADER_SIZE + 2); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + reader.readData(splitData[2]); + reader.readData(splitData[3]); + + assertEquals(ReadState.MIN_HEADER_READ, reader.getState()); + assertNotNull(reader.getCurrentHeader()); + + } + + @Test + public void testFiveReadGotFullHeader() throws Exception { + final String message = "hello"; + int messageSize = message.getBytes(Charset.forName("UTF-8")).length; + byte[] data = createHeader(messageSize, false); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE - 1, MessageReader.MIN_HEADER_SIZE + 2); + for (int i = 0; i < splitData.length; i++) { + reader.readData(splitData[i]); + } + + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + MessageHeader header = reader.getCurrentHeader(); + assertNotNull(header); + assertEquals(messageSize, header.getMessageSize()); + } + + @Test + public void testSixReadGotFullMessage() throws Exception { + final String message = "hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + int messageSize = messageBytes.length; + byte[] headerBytes = createHeader(messageSize, false); + byte[] data = joinByteArrays(headerBytes, messageBytes); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE - 1, + MessageReader.MIN_HEADER_SIZE + 2, headerBytes.length + 1); + for (int i = 0; i < splitData.length; i++) { + reader.readData(splitData[i]); + } + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + List fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(messageBytes), fullMessages.get(0)); + } + + @Test + public void testTwoReadGotTwoMessages() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + byte[][] splitData = splitByteArray(data, header1Bytes.length + message1Size); + + reader.readData(splitData[0]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + } + + @Test + public void testThreeReadGotTwoMessages() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + } + + @Test + public void testThreeReadGotTwoMessagesNotContinued() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, false); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + List fullMessages = reader.getFullMessages(); + assertEquals(2, fullMessages.size()); + assertEquals(ByteBuffer.wrap(message1Bytes), fullMessages.get(0)); + assertEquals(ByteBuffer.wrap(message2Bytes), fullMessages.get(1)); + } + + @Test + public void testFourReadGotThreeMessagesOneContinued() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + final String message3 = "test"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + byte[] message3Bytes = message3.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + int message3Size = message3Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] header3Bytes = createHeader(message3Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes, header3Bytes, message3Bytes); + int messageBoundary = header1Bytes.length + message1Size; + int messageBoundary2 = messageBoundary + header2Bytes.length + message2Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2, messageBoundary2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[3]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List fullMessages = reader.getFullMessages(); + assertEquals(2, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + assertEquals(ByteBuffer.wrap(message3Bytes), fullMessages.get(1)); + } + + @Test + public void testHeaderTooLarge() throws Exception { + when(limits.getMaxHeaderSize()).thenReturn(0); + byte[] data = createMinHeader(); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + } + } + + @Test + public void testMessagePartTooLarge() throws Exception { + when(limits.getMaxMessagePartSize()).thenReturn(0); + + final String message = "hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + int messageSize = messageBytes.length; + byte[] headerBytes = createHeader(messageSize, false); + byte[] data = joinByteArrays(headerBytes, messageBytes); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + } + } + + @Test + public void testMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + assertTrue(reader.getFullMessages().isEmpty()); + } + } + + private byte[] createMinHeader() { + MessageHeader header = new MessageHeader(); + return header.toByteArray(); + } + + static class TestMessageReader extends MessageReader { + + private static final int BUFFER_SIZE = 1024; + private final ByteBuffer readBuffer; + private final List fullMessages; + + TestMessageReader(MessageLimits limits) { + super(limits); + this.readBuffer = ByteBuffer.allocate(BUFFER_SIZE); + this.fullMessages = new ArrayList(); + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + fullMessages.add(fullMessage); + } + + void readData(byte[] buf) throws IOException { + readBuffer.clear(); + readBuffer.put(buf); + readBuffer.flip(); + processData(readBuffer); + } + + List getFullMessages() { + return fullMessages; + } + + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageWriterTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageWriterTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,134 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageWriter.MessageToWrite; + +public class MessageWriterTest { + + private static final int MAX_MESSAGE_SIZE = 5; + + private MessageWriter writer; + + @Before + public void setUp() throws Exception { + MessageLimits limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_SIZE); + writer = new TestMessageWriter(limits); + } + + @Test + public void testGetNextMessageSingle() { + String message = "Hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + MessageToWrite toWrite = writer.getNextMessage(buf); + + MessageHeader header = new MessageHeader(); + header.setMessageSize(5); + header.setMoreData(false); + ByteBuffer headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + ByteBuffer messageBuf = toWrite.getMessage(); + assertArrayEquals(messageBytes, messageBuf.array()); + } + + @Test + public void testGetNextMessageMulti() { + String message = "Hello World"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + + // First part + MessageToWrite toWrite = writer.getNextMessage(buf); + + MessageHeader header = new MessageHeader(); + header.setMessageSize(5); + header.setMoreData(true); + ByteBuffer headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + ByteBuffer messageBuf = toWrite.getMessage(); + ByteBuffer expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 0, 5)); + assertEquals(expectedMessage, messageBuf); + + // Second part + toWrite = writer.getNextMessage(buf); + + // Header should be the same + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + messageBuf = toWrite.getMessage(); + expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 5, 10)); + assertEquals(expectedMessage, messageBuf); + + // Third part + toWrite = writer.getNextMessage(buf); + + header = new MessageHeader(); + header.setMessageSize(1); + header.setMoreData(false); + headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + messageBuf = toWrite.getMessage(); + expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 10, 11)); + assertEquals(expectedMessage, messageBuf); + } + + private static class TestMessageWriter extends MessageWriter { + + private TestMessageWriter(MessageLimits limits) { + super(limits); + } + + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageReaderTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageReaderTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,114 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.joinByteArrays; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.splitByteArray; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.ReadAnswer; + +public class SyncMessageReaderTest { + + private static final int FULL_HEADER_SIZE = MessageHeader.getDefaultHeaderSize(); + private static final int MIN_HEADER_SIZE = MessageHeader.getMinimumHeaderSize(); + + private WinPipesChannelImpl channel; + private SyncMessageReader reader; + + @Before + public void setUp() throws Exception { + channel = mock(WinPipesChannelImpl.class); + reader = new SyncMessageReader(channel); + } + + @Test + public void testReadDataSingle() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + + byte[] data = joinByteArrays(headerBytes, messageBytes); + // Split min header, remaining header and message into two pieces each + mockReads(data, MIN_HEADER_SIZE - 3, MIN_HEADER_SIZE, + FULL_HEADER_SIZE - 3, FULL_HEADER_SIZE, + FULL_HEADER_SIZE + 3); + ByteBuffer result = reader.readData(); + assertEquals(ByteBuffer.wrap(messageBytes), result); + } + + @Test + public void testReadDataMulti() throws Exception { + final String messageString = "hello world"; + final byte[] fullMessageBytes = messageString.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, false); + + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Bytes.length; + // Split min header, remaining header and message into two pieces each + mockReads(data, MIN_HEADER_SIZE - 3, MIN_HEADER_SIZE, + FULL_HEADER_SIZE - 3, FULL_HEADER_SIZE, + FULL_HEADER_SIZE + 3, messageBoundary, + messageBoundary + MIN_HEADER_SIZE - 3, + messageBoundary + MIN_HEADER_SIZE, + messageBoundary + FULL_HEADER_SIZE - 3, + messageBoundary + FULL_HEADER_SIZE, + messageBoundary + FULL_HEADER_SIZE + 3); + ByteBuffer result = reader.readData(); + assertEquals(ByteBuffer.wrap(messageString.getBytes(Charset.forName("UTF-8"))), result); + } + + private void mockReads(byte[] data, int... splitPos) throws IOException { + byte[][] splitData = splitByteArray(data, splitPos); + when(channel.read(any(ByteBuffer.class))).thenAnswer(new ReadAnswer(splitData)); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageWriterTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/SyncMessageWriterTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,161 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createByteArrays; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.joinByteArrays; +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.ChannelTestUtils.WriteAnswer; + +public class SyncMessageWriterTest { + + private static final int MAX_MESSAGE_PART_SIZE = 5; + + private WinPipesChannelImpl channel; + private SyncMessageWriter writer; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + channel = mock(WinPipesChannelImpl.class); + limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_PART_SIZE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + writer = new SyncMessageWriter(channel, limits); + } + + @Test + public void testWriteDataSingle() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length, messageBytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + assertArrayEquals(headerBytes, results[0]); + assertArrayEquals(messageBytes, results[1]); + verify(channel, times(2)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataSplitHeader() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length - 2, 2, messageBytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + final byte[] joinedHeader = joinByteArrays(results[0], results[1]); + assertArrayEquals(headerBytes, joinedHeader); + assertArrayEquals(messageBytes, results[2]); + verify(channel, times(3)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataSplitMessage() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length, messageBytes.length - 2, 2); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + assertArrayEquals(headerBytes, results[0]); + final byte[] joinedMessage = joinByteArrays(results[1], results[2]); + assertArrayEquals(messageBytes, joinedMessage); + verify(channel, times(3)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataMulti() throws Exception { + final String message = "hello world"; + final byte[] fullMessageBytes = message.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, 10); + final byte[] message3Bytes = Arrays.copyOfRange(fullMessageBytes, 10, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, true); + final byte[] header3Bytes = createHeader(message3Bytes.length, false); + + final byte[][] results = createByteArrays(header1Bytes.length, message1Bytes.length, header2Bytes.length, message2Bytes.length, header3Bytes.length, message3Bytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(fullMessageBytes); + writer.writeData(buf); + + assertArrayEquals(header1Bytes, results[0]); + assertArrayEquals(message1Bytes, results[1]); + assertArrayEquals(header2Bytes, results[2]); + assertArrayEquals(message2Bytes, results[3]); + assertArrayEquals(header3Bytes, results[4]); + assertArrayEquals(message3Bytes, results[5]); + verify(channel, times(6)).write(any(ByteBuffer.class)); + } + + @Test(expected=IOException.class) + public void testWriteMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesChannelImplTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesChannelImplTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,147 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesChannelImpl.WinPipesChannelHelper; + +public class WinPipesChannelImplTest { + + private WinPipesChannelHelper channelHelper; + private String pipeName; + private WinPipe.Id addr; + private WinPipe impl; + + @Before + public void setUp() throws Exception { + + this.channelHelper = mock(WinPipesChannelHelper.class); + + pipeName = "test"; + addr = mock(WinPipe.Id.class); + when(addr.getPipePath()).thenReturn(pipeName); + + impl = mock(WinPipe.class); + when(impl.getPipeName()).thenReturn(pipeName); + + when(channelHelper.open(addr)).thenReturn(impl); + when(channelHelper.isOpen(impl)).thenReturn(true); + when(channelHelper.createAddress(pipeName)).thenReturn(addr); + } + + @Test + public void testOpen() throws Exception { + WinPipesChannelImpl channel = WinPipesChannelImpl.open(pipeName, channelHelper); + verify(channelHelper).createAddress(pipeName); + verify(channelHelper).open(addr); + + assertEquals(pipeName, channel.getName()); + } + + @Test + public void testReadSingle() throws IOException { + final int readSize = 20; + when(channelHelper.read(eq(impl), any(ByteBuffer.class))).thenReturn(readSize); + + WinPipesChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(readSize); + int read = channel.read(buf); + + assertEquals(readSize, read); + verify(channelHelper).read(impl, buf); + } + + @Test + public void testReadEOF() throws IOException { + final int readSize = 20; + when(channelHelper.read(eq(impl), any(ByteBuffer.class))).thenReturn(-1); + + WinPipesChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(readSize); + int read = channel.read(buf); + + assertEquals(-1, read); + + verify(channelHelper).read(impl, buf); + } + + @Test + public void testWriteSingle() throws Exception { + final int writeSize = 20; + when(channelHelper.write(eq(impl), any(ByteBuffer.class))).thenReturn(writeSize); + + WinPipesChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(writeSize); + int written = channel.write(buf); + + assertEquals(writeSize, written); + verify(channelHelper).write(impl, buf); + } + + @Test + public void testIsOpen() throws Exception { + WinPipesChannelImpl channel = createChannel(); + boolean open = channel.isOpen(); + verify(channelHelper).isOpen(impl); + assertTrue(open); + } + + @Test + public void testClose() throws Exception { + WinPipesChannelImpl channel = createChannel(); + channel.close(); + verify(channelHelper).close(impl); + } + + private WinPipesChannelImpl createChannel() throws IOException { + return new WinPipesChannelImpl(impl, channelHelper); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCPropertiesTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/common/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/WinPipesIPCPropertiesTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,105 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.common.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Properties; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties.PathUtils; + +public class WinPipesIPCPropertiesTest { + + private static final String PIPE_PREFIX = "some-prefix"; + private static final String PIPE_ID = "ABC123"; + private static final String PIPE_NAME = "pipe-name"; + private Properties jProps; + private File propFile; + private PathUtils pathUtils; + + @Before + public void setUp() throws Exception { + jProps = mock(Properties.class); + propFile = mock(File.class); + pathUtils = mock(PathUtils.class); + when(pathUtils.getProperty(jProps, WinPipesIPCProperties.PIPE_PREFIX_PROPERTY, WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX)).thenReturn(PIPE_PREFIX); + when(jProps.getProperty(WinPipesIPCProperties.PIPE_PREFIX_PROPERTY)).thenReturn(PIPE_PREFIX); + } + + @Test + public void testType() throws Exception { + WinPipesIPCProperties props = new WinPipesIPCProperties(jProps, propFile, pathUtils); + assertEquals(IPCType.WINDOWS_NAMED_PIPES, props.getType()); + } + + @Test + public void testPrefixFromProps() throws Exception { + when(pathUtils.getProperty(jProps, WinPipesIPCProperties.PIPE_PREFIX_PROPERTY, WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX)).thenReturn(PIPE_PREFIX); + + WinPipesIPCProperties props = new WinPipesIPCProperties(jProps, propFile, pathUtils); + String path = props.getPipePrefix(); + assertEquals(PIPE_PREFIX, path); + } + + @Test + public void testDefaultPrefix() throws Exception { + when(pathUtils.getProperty(jProps, WinPipesIPCProperties.PIPE_PREFIX_PROPERTY, WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX)).thenReturn(WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX); + WinPipesIPCProperties props = new WinPipesIPCProperties(jProps, propFile, pathUtils); + assertEquals(WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX, props.getPipePrefix()); + } + + @Test + public void testFullPath() throws Exception { + when(pathUtils.getProperty(jProps, WinPipesIPCProperties.PIPE_PREFIX_PROPERTY, WinPipesIPCProperties.DEFAULT_WINPIPE_PREFIX)).thenReturn(PIPE_PREFIX); + when(pathUtils.getProperty(jProps, WinPipesIPCProperties.WINPIPE_ID_PROPERTY, "")).thenReturn(PIPE_ID); + + WinPipesIPCProperties props = new WinPipesIPCProperties(jProps, propFile, pathUtils); + String path = props.getPipeName(PIPE_NAME); + assertTrue("must start with global prefix", path.startsWith(WinPipesIPCProperties.GLOBAL_WINPIPE_PREFIX)); + assertTrue("must contain pipename", path.contains(PIPE_NAME)); + assertTrue("must contain pipe prefix", path.contains(PIPE_PREFIX)); + assertTrue("must contain pipe id", path.contains(PIPE_ID)); + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/pom.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,68 @@ + + + + 4.0.0 + + + com.redhat.thermostat + thermostat-agent-ipc + 1.99.12-SNAPSHOT + + + thermostat-agent-ipc-winpipes + pom + + Thermostat Agent IPC Windows Named Pipes + + + + junit + junit + test + + + + + common + client + server + + + + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/pom.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,119 @@ + + + + 4.0.0 + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes + 1.99.12-SNAPSHOT + + + thermostat-agent-ipc-winpipes-server + bundle + + Thermostat IPC Server - Windows Named Pipes + + + + junit + junit + test + + + org.mockito + mockito-core + test + + + org.osgi + org.osgi.core + + + com.redhat.thermostat + thermostat-shared-config + ${project.version} + + + com.redhat.thermostat + thermostat-common-core + ${project.version} + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes-common + ${project.version} + + + com.redhat.thermostat + thermostat-agent-ipc-server + ${project.version} + + + com.redhat.thermostat + thermostat-common-test + ${project.version} + test + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + Red Hat, Inc. + com.redhat.thermostat.agent.ipc.winpipes.server.internal.Activator + com.redhat.thermostat.agent.ipc.winpipes.server + + com.redhat.thermostat.agent.ipc.winpipes.server.internal + + + <_nouses>true + + + + + + + + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/AcceptThread.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/AcceptThread.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,142 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageLimits; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipe; +import com.redhat.thermostat.common.utils.LoggingUtils; + +class AcceptThread extends Thread { + + private static final Logger logger = LoggingUtils.getLogger(AcceptThread.class); + private final ExecutorService execService; + private final WinPipe pipe; + private final WinPipesServerChannelImpl channel; + + // buffer size + private static final int BUFSIZE = new MessageLimits().getBufferSize(); + + // number of simulataneous clients + private final int NUM_INSTANCES = 1; + + private final WindowsEventSelector selector; + + // array of all pipe instances + private final ClientPipeInstance[] instances; + + private boolean shutdown; + + AcceptThread(WinPipesServerChannelImpl channel, ExecutorService execService) { + this.channel = channel; + this.execService = execService; + this.pipe = channel.getPipe(); + this.shutdown = false; + this.selector = new WindowsEventSelector(NUM_INSTANCES); + this.instances = new ClientPipeInstance[NUM_INSTANCES]; + } + + private void createInstances() throws IOException { + logger.info("AcceptThread '" + pipe.getPipeName() + "' creating " + NUM_INSTANCES + " pipe instances"); + for (int i = 0; i < NUM_INSTANCES && !shutdown; i++) { + final ClientPipeInstance pi = new ClientPipeInstance(pipe.getPipeName(), NUM_INSTANCES, BUFSIZE, execService, channel.getCallbacks()); + instances[i] = pi; + pi.connectToNewClient(); + logger.fine("AcceptThread '" + pipe.getPipeName() + "' created " + pi); + } + } + + @Override + public void run() { + + try { + createInstances(); + + logger.info("AcceptThread '" + pipe.getPipeName() + "' Ready to accept client pipe connections"); + + // normally usage would be to add on an enqueu and remove on a operation complete, + // but we reuse the events here to save cycles + for (final ClientPipeInstance pi : instances) { + selector.add(pi.getReadHandler()); + selector.add(pi.getWriteHandler()); + } + + // main loop + while (!shutdown) { + WindowsEventSelector.EventHandler pi = selector.waitForEvent(); + pi.processEvent(); + } + + } catch (IOException e) { + logger.log(Level.SEVERE, "Error occurred during selection", e); + shutdown = true; + } finally { + logger.info("Shutting down"); + for (ClientPipeInstance pi : instances) { + try { + pi.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Error occurred during close() of " + pi.getName(), e); + } + selector.remove(pi.getReadHandler()); + selector.remove(pi.getWriteHandler()); + } + execService.shutdown(); + } + } + + void shutdown() throws IOException { + this.shutdown = true; + // Interrupt accept thread + this.interrupt(); + + try { + this.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + boolean isShutdown() { + return shutdown; + } + +} \ No newline at end of file diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/Activator.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/Activator.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,80 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider; +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCPropertiesProvider; + +public class Activator implements BundleActivator { + + private final IPCServiceHelper helper; + private WinPipesServerTransport transport; + + public Activator() { + this(new IPCServiceHelper()); + } + + Activator(IPCServiceHelper helper) { + this.helper = helper; + } + + public void start(final BundleContext context) throws Exception { + // Register the IPC properties provider + context.registerService(IPCPropertiesProvider.class.getName(), new WinPipesIPCPropertiesProvider(), null); + transport = helper.createTransport(); + + // now that it's started, register the server service + context.registerService(ServerTransport.class.getName(), transport, null); + } + + @Override + public void stop(BundleContext context) throws Exception { + // Services unregistered automatically + } + + /* For testing purposes */ + static class IPCServiceHelper { + WinPipesServerTransport createTransport() { + return new WinPipesServerTransport(); + } + } +} + diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientHandler.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientHandler.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,160 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.AsyncMessageReader; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.AsyncMessageWriter; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageListener; +import com.redhat.thermostat.common.utils.LoggingUtils; + +class ClientHandler implements MessageListener { + + private static final Logger logger = LoggingUtils.getLogger(ClientHandler.class); + private static final boolean LOG_DATA = true; + + // Increments for each instance made. Only for debugging purposes. + private static final AtomicInteger handlerCount = new AtomicInteger(); + + private final WritableByteChannel channel; + private final ExecutorService execService; + private final ThermostatIPCCallbacks callbacks; + private final AsyncMessageReader reader; + private final AsyncMessageWriter writer; + private final MessageCreator messageCreator; + private final int handlerNum; + + ClientHandler(WritableByteChannel channel, ExecutorService execService, ThermostatIPCCallbacks callbacks) { + this.channel = channel; + this.execService = execService; + this.callbacks = callbacks; + this.reader = new AsyncMessageReader(this); + this.writer = new AsyncMessageWriter(channel); + this.messageCreator = new MessageCreator(); + this.handlerNum = handlerCount.getAndIncrement(); + } + + ClientHandler(WritableByteChannel channel, ExecutorService execService, ThermostatIPCCallbacks callbacks, + AsyncMessageReader reader, AsyncMessageWriter writer, MessageCreator messageCreator) { + this.channel = channel; + this.execService = execService; + this.callbacks = callbacks; + this.reader = reader; + this.writer = writer; + this.messageCreator = messageCreator; + this.handlerNum = handlerCount.getAndIncrement(); + } + + /** + * handle some bytes from the client. Build up a complete message over multiple calls + * + * @return true if complete message is read + */ + boolean handleRead(ByteBuffer data) throws IOException { + logger.finest("Got read from client for \"" + channel + "\" [" + handlerNum + "] bytes=" + data.remaining()); + if (LOG_DATA) { + byte[] xx = new byte[data.remaining()]; + data.get(xx); + data.position(0); + final String s = new String(xx, "UTF8"); + logger.finest("message is '" + s + "'"); + } + return reader.process(data); + } + + // only called by tests + boolean handleWrite() throws IOException { + try { + logger.finest("handleWrite() Got write for client for \"" + channel + "\" [" + handlerNum + "]"); + // write message to client + writer.writeData(); + } catch (IOException e) { + channel.close(); + throw new IOException("handleWrite() Communication error from handler " + handlerNum, e); + } + return writer.hasMoreMessages(); + } + + @Override + public void messageRead(ByteBuffer buf) { + + logger.finest("messageRead() entered"); + + // Create new message and notify caller + final MessageImpl message = messageCreator.createMessage(buf, this); + + // Execute callback in a separate thread to ensure we don't block + execService.submit(new Runnable() { + @Override + public void run() { + callbacks.messageReceived(message); + } + }); + + logger.finest("messageRead() exited"); + } + + @Override + public void writeMessage(ByteBuffer buf) throws IOException { + // Request write with selector + logger.finest("writeMessage() enquing message (length=" + buf.remaining() + ") for client \"" + channel + "\" [" + handlerNum + "]"); + + // Enqueue this message for writing when selected + writer.enqueueForWriting(buf); + + while (writer.hasMoreMessages()) { + try { + writer.writeData(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + static class MessageCreator { + MessageImpl createMessage(ByteBuffer data, MessageListener listener) { + return new MessageImpl(data, listener); + } + } +} \ No newline at end of file diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientPipeInstance.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientPipeInstance.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,192 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper; +import com.redhat.thermostat.common.utils.LoggingUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +/** + * one ClientPipeInstance per expected client + * Created when pipe is created - the pipe can't handle more clients than there are ClientPipeInstances + * + * On a connect, will create a ClientHandler (destroyed on disconnect) + * On a read or write, will delegate that to the ClientHandler, which will decide if the next action is a read or write + * Disconnect destroys the active ClientHandler + * + * see https://msdn.microsoft.com/en-us/library/windows/desktop/aa365603%28v=vs.85%29.aspx + * http://www.winsocketdotnetworkprogramming.com/winsock2programming/winsock2advancednamedpipe15.html + * see http://www.winsocketdotnetworkprogramming.com/winsock2programming/winsock2advancednamedpipe15c.html + */ +class ClientPipeInstance implements WritableByteChannel, PipeManager { + + enum PipeState { UNKNOWN_STATE, CONNECTING_STATE, CONNECTED_STATE, CLOSED_STATE } + + private PipeState state; + private static final Logger logger = LoggingUtils.getLogger(ClientPipeInstance.class); + + private final ClientPipeInstanceHelper clientHandlerCreator; + private final ExecutorService execService; + private final ThermostatIPCCallbacks ipcCallbacks; + private final String pipeName; + private final long pipeHandle; + + private ReadPipeImpl readHandler; + private WritePipeImpl writeHandler; + + ClientPipeInstance(final String name, int instances, int bufsize, ExecutorService execService, ThermostatIPCCallbacks cb) throws IOException { + this(name, instances, bufsize, execService, cb, new ClientPipeInstanceHelper()); + } + + ClientPipeInstance(final String name, int instances, int bufsize, ExecutorService execService, ThermostatIPCCallbacks cb, ClientPipeInstanceHelper cpiHelper) throws IOException { + + this.state = PipeState.UNKNOWN_STATE; + this.pipeName = name; + this.execService = execService; + this.ipcCallbacks = cb; + this.clientHandlerCreator = cpiHelper; + this.pipeHandle = clientHandlerCreator.createNamedPipe(name, instances, bufsize); + if (this.pipeHandle == 0) { + throw new IOException("can't create Windows named pipe " + name + " err=" + clientHandlerCreator.getLastError()); + } + + this.readHandler = new ReadPipeImpl(this, name, pipeHandle, bufsize); + this.writeHandler = new WritePipeImpl(this, name, pipeHandle, bufsize); + } + + public String toString() { + return "ClientPipeInstance(t=" + Thread.currentThread().getId() + "hnd=" + pipeHandle + ", read=" + readHandler + " write=" + writeHandler +")"; + } + + public String getName() { + return pipeName; + } + + WindowsEventSelector.EventHandler getReadHandler() { + return readHandler; + } + + WindowsEventSelector.EventHandler getWriteHandler() { + return writeHandler; + } + + // for WritableByteChannel + @Override + public boolean isOpen() { + return state == PipeState.CONNECTED_STATE || state == PipeState.CONNECTING_STATE; + } + + @Override + public void close() throws IOException { + if (state != PipeState.CLOSED_STATE) { + logger.finest("closing " + this); + readHandler.close(); + writeHandler.close(); + clientHandlerCreator.closeHandle(pipeHandle); + state = PipeState.CLOSED_STATE; + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + return writeHandler.write(src); + } + + /** + * Wait for a client to connect to this pipe + * Since we created the pipe in blocking mode, this call will block. + */ + boolean connectToNewClient() throws IOException { + state = PipeState.CONNECTING_STATE; + final boolean ret = readHandler.connectToNewClient(); + state = readHandler.getReadState() == ReadPipeImpl.ReadPipeState.READING_STATE ? PipeState.CONNECTED_STATE : PipeState.CONNECTING_STATE; + return ret; + } + + private void disconnect() throws IOException { + logger.finest("WinPipe disconnect() " + this); + if (!clientHandlerCreator.disconnectNamedPipe(pipeHandle)) { + throw new IOException("could not disconnect named pipe"); + } + } + + @Override + public void resetPipe() throws IOException { + logger.finest("WinPipe resetPipe() " + this); + disconnect(); + state = PipeState.UNKNOWN_STATE; + connectToNewClient(); + } + + @Override + public ClientHandler handleNewClientConnection() { + return this.clientHandlerCreator.createClientHandler(this, execService, ipcCallbacks); + } + + // for testing + static class ClientPipeInstanceHelper { + + private static WinPipesNativeHelper helper = WinPipesNativeHelper.INSTANCE; + + ClientHandler createClientHandler(ClientPipeInstance pi, ExecutorService execService, ThermostatIPCCallbacks callbacks) { + return new ClientHandler(pi, execService, callbacks); + } + + long createNamedPipe(final String pipeName, int instances, int buffersize) { + return helper.createNamedPipe(pipeName, instances, buffersize); + } + + boolean disconnectNamedPipe(long pipeHandle) { + return helper.disconnectNamedPipe(pipeHandle); + } + + boolean closeHandle(long handle) { + return helper.closeHandle(handle); + } + + int getLastError() { + return helper.getLastError(); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/MessageImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/MessageImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.redhat.thermostat.agent.ipc.server.IPCMessage; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageListener; + +public class MessageImpl implements IPCMessage { + + private final ByteBuffer data; + private final MessageListener listener; + + MessageImpl(ByteBuffer data, MessageListener listener) { + this.data = data; + this.listener = listener; + } + + @Override + public ByteBuffer get() { + return data; + } + + @Override + public void reply(ByteBuffer buf) throws IOException { + listener.writeMessage(buf); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipeManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipeManager.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,44 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; + +interface PipeManager { + void resetPipe() throws IOException; + ClientHandler handleNewClientConnection(); +} \ No newline at end of file diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipenameValidator.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipenameValidator.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,57 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +class PipenameValidator { + + private static final String REGEX = "^[A-Za-z0-9-_\\-]*$"; + + private final Pattern pattern; + + PipenameValidator() { + this.pattern = Pattern.compile(REGEX, Pattern.MULTILINE); + } + + boolean validate(String name) { + Matcher matcher = pattern.matcher(name); + return matcher.matches(); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ReadPipeImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ReadPipeImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,201 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper; +import com.redhat.thermostat.common.utils.LoggingUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +/** + * Handle all read operations on this pipe + */ +class ReadPipeImpl implements WindowsEventSelector.EventHandler { + + private static final Logger logger = LoggingUtils.getLogger(ReadPipeImpl.class); + private static WinPipesNativeHelper helper = WinPipesNativeHelper.INSTANCE; + + enum ReadPipeState { UNKNOWN_STATE, CONNECTING_STATE, READING_STATE, ERROR_STATE, CLOSED_STATE } + + private ReadPipeState readState; + + private final String pipeName; + private final PipeManager manager; + private final long pipeHandle; + private final long readEventHandle; + private final ByteBuffer readOverlap; + private final ByteBuffer readBuffer; + private ClientHandler clientHandler; + + ReadPipeImpl(PipeManager manager, String pipeName, long pipeHandle, int bufsize) throws IOException { + this.manager = manager; + this.pipeName = pipeName; + this.readState = ReadPipeState.UNKNOWN_STATE; + this.pipeHandle = pipeHandle; + this.readEventHandle = helper.createEvent(); + if (this.readEventHandle == WinPipesNativeHelper.INVALID_HANDLE) { + throw new IOException("can't create a Windows event" + " err=" + helper.getLastError()); + } + this.readOverlap = helper.createDirectOverlapStruct(readEventHandle); + this.readBuffer = helper.createDirectBuffer(bufsize); + } + + public String toString() { + return "ReadPipeImpl(h=" + pipeHandle + " '" + pipeName + "' " + readState + ")"; + } + + ReadPipeState getReadState() { + return readState; + } + + @Override + public long getHandle() { + return readEventHandle; + } + + @Override + public void processEvent() throws IOException { + if (handlePendingRead()) { + enqueueRead(); + } + } + + public void close() throws IOException { + readState = ReadPipeState.CLOSED_STATE; + helper.cancelAllIo(pipeHandle, readOverlap); + helper.freeDirectBuffer(readOverlap); + helper.freeDirectBuffer(readBuffer); + helper.closeHandle(readEventHandle); + } + + /** + * Wait for a client to connect to this pipe + * Since we created the pipe in blocking mode, this call will block. + */ + boolean connectToNewClient() throws IOException { + + logger.info("connectToNewClient - entered " + this); + final int ret = helper.connectNamedPipe(pipeHandle, readOverlap); + logger.info("connectToNewClient on " + this + " returns " + ret); + if (ret == WinPipesNativeHelper.ERROR_IO_PENDING) { + readState = ReadPipeState.CONNECTING_STATE; + } else if (ret == WinPipesNativeHelper.ERROR_SUCCESS || ret == WinPipesNativeHelper.ERROR_PIPE_CONNECTED) { + // if it's not pending, and no exception was thrown, then we must be connected + logger.info("connectToNewClient switching to READING_STATE"); + helper.resetEvent(readEventHandle); + clientHandler = manager.handleNewClientConnection(); + readState = ReadPipeState.READING_STATE; + } else { + throw new IOException("connectNamedPipe(" + pipeName + ") returns err=" + ret); + } + logger.info("connectToNewClient - exitting " + this); + return readState == ReadPipeState.CONNECTING_STATE; + } + + /** + * process the incoming read data + * - read all data until there's nothing left + * - if there's more data expected for the current message, then enqueue a read. + * @throws IOException if there's an i/o or protocol error + */ + private void enqueueRead() throws IOException { + logger.finest("enqueueRead() - entered " + this); + readBuffer.position(0); + readBuffer.limit(readBuffer.capacity()); + logger.finest("enqueueRead() calling readFileOverlapped(" + this + ")"); + final boolean ret = helper.readFileOverlapped(pipeHandle, readOverlap, readBuffer); + final int err = ret ? 0 : helper.getLastError(); + logger.finest("enqueueRead() readFileOverlapped() returns " + ret + " err=" + err); + if (ret || (err == WinPipesNativeHelper.ERROR_SUCCESS) || err == WinPipesNativeHelper.ERROR_IO_PENDING) { + readState = ReadPipeState.READING_STATE; + } else if (err == WinPipesNativeHelper.ERROR_BROKEN_PIPE) { + // the other end closed the pipe + readState = ReadPipeState.CLOSED_STATE; + manager.resetPipe(); + } else { + readState = ReadPipeState.ERROR_STATE; + manager.resetPipe(); + } + logger.finest("enqueueRead() - exiting " + this); + } + + /** + * handlePendingIO - if there is pending I/O then retrieve and process the result + * + * @return true if queueNextOperation() should be called, false otherwise + * @throws IOException if there were any errors interacting with the pipe + */ + private boolean handlePendingRead() throws IOException { + logger.finest("handlePendingRead() - entered " + this); + if (readState == ReadPipeState.READING_STATE) { + logger.finest("handlePendingRead() waiting for overlapped result on " + this + " state=" + readState); + final int bytesRead = helper.getOverlappedResult(pipeHandle, readOverlap, false); + final int err = helper.getLastError(); + logger.finest("handlePendingRead() got overlapped result (bytes=" + bytesRead + " on " + this + " err=" + err); + + switch (readState) { + case CONNECTING_STATE: + if (bytesRead < 0) { + final String msg = "Error reading pipe " + pipeName + " err=" + err; + logger.warning(msg); + throw new IOException(msg); + } + clientHandler = manager.handleNewClientConnection(); + readState = ReadPipeState.READING_STATE; + break; + case READING_STATE: + if (bytesRead < 0) { + readState = ReadPipeState.ERROR_STATE; + manager.resetPipe(); + return false; + } + readBuffer.limit(bytesRead); + /*final boolean readFully =*/ clientHandler.handleRead(readBuffer); + // there may or may not be messages on the write queue at this point + // if readFully is true, then there will be at some point *there may be some already, and we need to wait for them. + readState = ReadPipeState.READING_STATE; + break; + default: + throw new IOException("Invalid pipe state " + readState); + } + } + logger.finest("handlePendingRead() - exited " + this); + return true; + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerChannelImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerChannelImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,164 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipe; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; + +/** + * Creates the named pipe. + * There is one channel per pipe name. + * Creates and starts the accept thread for this pipe. + */ +class WinPipesServerChannelImpl { + + private WinPipesServerChannelHelper channelHelper = new WinPipesServerChannelHelper(); + + private static final ThreadCreator threadCreator = new ThreadCreator(); + + // unadorned name of this server (not full pipe name) + private final String name; + + // WinPipes implementation + private final WinPipe pipe; + + // callbacks on message completion + private final ThermostatIPCCallbacks callbacks; + + private WinPipesServerChannelImpl(String name, WinPipe pipe, ThermostatIPCCallbacks callbacks) { + this(name, pipe, callbacks, new WinPipesServerChannelHelper()); + } + private WinPipesServerChannelImpl(String name, WinPipe pipe, ThermostatIPCCallbacks callbacks, WinPipesServerChannelHelper helper) { + this.name = name; + this.pipe = pipe; + this.callbacks = callbacks; + this.channelHelper = helper; + ExecutorService execService = Executors.newFixedThreadPool(determineDefaultThreadPoolSize(), new CountingThreadFactory()); + AcceptThread acceptThread = threadCreator.createAcceptThread(this, execService); + acceptThread.start(); + } + + static WinPipesServerChannelImpl createChannel(String name, ThermostatIPCCallbacks callbacks, + WinPipesIPCProperties props) throws IOException { + return createChannel(name, callbacks, props, new WinPipesServerChannelHelper()); + } + + static WinPipesServerChannelImpl createChannel(String name, ThermostatIPCCallbacks callbacks, + WinPipesIPCProperties props, WinPipesServerChannelHelper helper) throws IOException { + final String pipeName = props.getPipeName(name); + final WinPipe pipe = helper.open(pipeName); + return helper.createServerChannel(pipeName, pipe, callbacks); + } + + boolean isOpen() { + return getChannelHelper().isOpen(getPipe()); + } + + WinPipe getPipe() { + return pipe; + } + + ThermostatIPCCallbacks getCallbacks() { + return callbacks; + } + + String getName() { + return name; + } + + public void close() throws IOException { + getChannelHelper().close(getPipe()); + } + + /* For testing purposes */ + static class ThreadCreator { + AcceptThread createAcceptThread(WinPipesServerChannelImpl channel, ExecutorService execService) { + return new AcceptThread(channel, execService); + } + } + + WinPipesServerChannelHelper getChannelHelper() { + return channelHelper; + } + + // Wraps methods that can't be mocked + static class WinPipesServerChannelHelper { + WinPipe open(final String name) throws IOException { + return new WinPipe(name); + } + + WinPipesServerChannelImpl createServerChannel(String pipeName, WinPipe pipe, ThermostatIPCCallbacks callbacks) { + return new WinPipesServerChannelImpl(pipeName, pipe, callbacks, this); + } + + boolean isOpen(WinPipe pipe) { + return pipe.isOpen(); + } + + void close(WinPipe pipe) throws IOException { + pipe.close(); + } + } + + private static int determineDefaultThreadPoolSize() { + // Make the number of default thread pool size a function of available processors. + return Runtime.getRuntime().availableProcessors() * 2; + } + + private static class CountingThreadFactory implements ThreadFactory { + + private final AtomicInteger threadCount; + + private CountingThreadFactory() { + this.threadCount = new AtomicInteger(); + } + + @Override + public Thread newThread(Runnable r) { + // Create threads with a recognizable name + return new Thread(r, "AcceptThread-" + threadCount.incrementAndGet()); + } + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerTransport.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerTransport.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,170 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import java.io.IOException; +import java.nio.file.attribute.UserPrincipal; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; +import com.redhat.thermostat.common.utils.LoggingUtils; + +/** + * Creates instances of Windows Pipes server channels. + * There is one channel per pipe name. + */ +class WinPipesServerTransport implements ServerTransport { + + private static final Logger logger = LoggingUtils.getLogger(WinPipesServerTransport.class); + + // Access/modification of this field should by synchronized + // key is pipename (as it appears in ipc.properties, not decorated with "\\pipe\\ ...") + private final Map pipes; + private final PipenameValidator validator; + private final ChannelUtils channelUtils; + + private WinPipesIPCProperties props; + + WinPipesServerTransport() { + this(new PipenameValidator(), new ChannelUtils()); + } + + WinPipesServerTransport(PipenameValidator validator, + ChannelUtils channelCreator) { + this.pipes = new HashMap<>(); + this.validator = validator; + this.channelUtils = channelCreator; + } + + @Override + public void start(IPCProperties props) throws IOException { + if (!(props instanceof WinPipesIPCProperties)) { + IPCType type = props.getType(); + throw new IOException("Unsupported IPC type: " + type.getConfigValue()); + } + this.props = (WinPipesIPCProperties) props; + // for windows pipes, each individual named pipe will have a thread. + // we don't staart one master accept thread here. + logger.info("Agent IPC Windows Pipes server service started"); + } + + @Override + public IPCType getType() { + return IPCType.WINDOWS_NAMED_PIPES; + } + + private void checkName(String name) throws IOException { + Objects.requireNonNull(name, "Server name cannot be null"); + if (name.isEmpty()) { + throw new IOException("Server name cannot be empty"); + } + // Require limited character set for name + boolean okay = validator.validate(name); + if (!okay) { + throw new IOException("Illegal server name"); + } + } + + @Override + public synchronized void createServer(String name, ThermostatIPCCallbacks callbacks) throws IOException { + + checkName(name); + + // Check if the pipe has already been created and we know about it + if (pipes.containsKey(name)) { + throw new IOException("Named pipe with name \"" + name + "\" already exists"); + } + + // Create and save pipe server + WinPipesServerChannelImpl channel = channelUtils.createServerChannel(name, callbacks, props); + pipes.put(name, channel); + } + + @Override + public void createServer(String name, ThermostatIPCCallbacks callbacks, UserPrincipal owner) throws IOException { + createServer(name,callbacks); + } + + @Override + public synchronized boolean serverExists(String name) throws IOException { + return pipes.containsKey(name); + } + + @Override + public synchronized void destroyServer(String name) throws IOException { + if (!pipes.containsKey(name)) { + throw new IOException("IPC server with name \"" + name + "\" does not exist"); + } + // Remove pipe from known pipes + WinPipesServerChannelImpl channel = pipes.remove(name); + + // Close channel and stop accept thread + channel.close(); + } + + @Override + public void shutdown() throws IOException { + // Stop accepting connections and close selector afterward + // make a copy since we'll be removing elements from the original + Set pipeNames = new HashSet<>(pipes.keySet()); + for (final String pipeName : pipeNames) { + destroyServer(pipeName); + } + logger.info("Agent IPC windows pipes server service stopped"); + } + + /* For testing purposes */ + Map getPipes() { + return pipes; + } + + /* For testing purposes */ + static class ChannelUtils { + WinPipesServerChannelImpl createServerChannel(String name, ThermostatIPCCallbacks callbacks, WinPipesIPCProperties props) throws IOException { + return WinPipesServerChannelImpl.createChannel(name, callbacks, props); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WindowsEventSelector.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WindowsEventSelector.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,114 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper; +import com.redhat.thermostat.common.utils.LoggingUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; + +/** + * WindowsEventSelector will keep track of a number of selectables (Windows event handles) and wait for one of them to activate + */ +class WindowsEventSelector { + + interface EventHandler { + long getHandle(); + void processEvent() throws IOException; + } + + private static final Logger logger = LoggingUtils.getLogger(WindowsEventSelector.class); + + private static final WinPipesNativeHelper helper = WinPipesNativeHelper.INSTANCE; + + private final Set eventHandlers; + + private ArrayList ehArray; + + // use donly by select() call + private long[] eventHandles = null; + + WindowsEventSelector(int maxInstances) { + eventHandlers = new HashSet<>(maxInstances); + } + + void add(EventHandler e) { + eventHandlers.add(e); + fixArray(); + } + + void remove(EventHandler e) { + eventHandlers.remove(e); + fixArray(); + } + + /** + * create an array oif handlers and handles suitable for a call to waitForMultipleObjects() + */ + private void fixArray() { + ehArray = new ArrayList<>(eventHandlers.size()); + for (EventHandler instance : eventHandlers) { + ehArray.add(instance); + } + eventHandles = new long[ehArray.size()]; + for (int i = 0; i < ehArray.size(); i++) { + eventHandles[i] = ehArray.get(i).getHandle(); + } + } + + /** + * AKA select() for events + * @return handler for event that was raied + * @throws IOException is there was an issue during the call + */ + EventHandler waitForEvent() throws IOException { + logger.finest("WinPipe waiting for one of " + eventHandles.length + " events"); + final int pipeNum = helper.waitForMultipleObjects(eventHandles.length, eventHandles, false, (int)WinPipesNativeHelper.INFINITE) - (int)WinPipesNativeHelper.WAIT_OBJECT_0; + if (pipeNum >= 0 && pipeNum < eventHandles.length) { + logger.finest("WinPipe got event on handle " + ehArray.get(pipeNum) + " err=" + helper.getLastError()); + return ehArray.get(pipeNum); + } else { + final String msg = "WinPipe waitForMultipleObjects returned " + pipeNum + " err=" + helper.getLastError(); + logger.info(msg); + throw new IOException(msg); + } + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WritePipeImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WritePipeImpl.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,201 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper; +import com.redhat.thermostat.common.utils.LoggingUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.logging.Logger; + +class WritePipeImpl implements WindowsEventSelector.EventHandler { + + private static final Logger logger = LoggingUtils.getLogger(WritePipeImpl.class); + private static WinPipesNativeHelper helper = WinPipesNativeHelper.INSTANCE; + + enum WritePipeState { QUIET_STATE, WRITING_STATE, FLUSHING_WRITE, ERROR_STATE, CLOSED_STATE } + + private WritePipeState writeState; + + private final PipeManager manager; + private final String pipeName; + private final long pipeHandle; + private final Queue writeQueue; + private final long writeEventHandle; + private final ByteBuffer writeOverlap; + private final ByteBuffer writeBuffer; + + WritePipeImpl(PipeManager manager, String pipeName, long pipeHandle, int bufsize) throws IOException { + this.manager = manager; + this.pipeName = pipeName; + this.writeState = WritePipeState.QUIET_STATE; + this.pipeHandle = pipeHandle; + this.writeEventHandle = helper.createEvent(); + if (this.writeEventHandle == 0) { + throw new IOException(this.pipeName + ": can't create a Windows event" + " err=" + helper.getLastError()); + } + this.writeQueue = new ArrayDeque<>(); + + this.writeOverlap = helper.createDirectOverlapStruct(writeEventHandle); + this.writeBuffer = helper.createDirectBuffer(bufsize); + } + + public String toString() { + return "WritePipeImpl(h=" + pipeHandle + " '" + pipeName + "' " + writeState + " q=" + writeQueue.size() + ")"; + } + + @Override + public long getHandle() { + return writeEventHandle; + } + + @Override + public void processEvent() throws IOException { + if (handlePendingWrite()) { + enqueueNextOperation(); + } + } + + public int write(ByteBuffer src) throws IOException { + + // this call adds buffers to a FIFO queue. + // if no writes are in progress, this call will kickstart a write on thr front of the queue + logger.finest("write() - entered " + this + " bytes=" + src.remaining()); + synchronized (writeQueue) { + writeQueue.add(src); + logger.finest("write() - adding to writeQueue (new size= " + writeQueue.size() + ") bytes=" + src.remaining()); + } + if (writeState == WritePipeState.QUIET_STATE) { + helper.setEvent(writeEventHandle); + } + logger.finest("write() - exited " + src.remaining() + " " + this); + return src.remaining(); + } + + public void close() throws IOException { + writeState = WritePipeState.CLOSED_STATE; + helper.cancelAllIo(pipeHandle, writeOverlap); + helper.freeDirectBuffer(writeOverlap); + helper.freeDirectBuffer(writeBuffer); + helper.closeHandle(writeEventHandle); + } + + private void enqueueNextOperation() throws IOException { + if (writeState == WritePipeState.QUIET_STATE) + enqueueWrite(); + } + + /** + * handlePendingIO - if there is pending I/O then retrieve and process the result + * It's possible the even was raised with no pending IO + * + * @return true if queueNextOperation() should be called, false otherwise + * @throws IOException if there were any errors interacting with the pipe + */ + private boolean handlePendingWrite() throws IOException { + logger.finest("handlePendingWrite() - entered " + this); + if (writeState != WritePipeState.QUIET_STATE) { + logger.finest("handlePendingWrite() waiting for overlapped result on " + this + " state=" + writeState); + final int bytesWritten = helper.getOverlappedResult(pipeHandle, writeOverlap, false); + final int err = helper.getLastError(); + logger.finest("handlePendingWrite() got overlapped result (bytes=" + bytesWritten + " on " + this + " err=" + err); + + switch (writeState) { + case WRITING_STATE: + case FLUSHING_WRITE: + if (bytesWritten != writeBuffer.remaining()) { + writeState = WritePipeState.ERROR_STATE; + manager.resetPipe(); + return false; + } + writeBuffer.position(0); + writeBuffer.limit(0); + break; + default: + throw new IOException("Invalid pipe state " + writeState); + } + } + this.writeState = WritePipeState.QUIET_STATE; + logger.finest("handlePendingWrite() - exited " + this); + return true; + } + + + /** + * enqueue a write, if there's any unwritten data in the writebuffer, or in the write queue + * @return true if an operation was enqueued + * @throws IOException if an IO error occurred + */ + private boolean enqueueWrite() throws IOException { + logger.finest("enqueueWrite() - entered " + this); + if (writeBuffer.remaining() == 0 && writeQueue.isEmpty()) { + if (writeState == WritePipeState.FLUSHING_WRITE) { + // all the data that's ever going to be on that queue has been fully written + writeState = WritePipeState.QUIET_STATE; + logger.finest("enqueueWrite() - exited true " + this); + return true; + } + logger.finest("enqueueWrite() nothing to write - sleeping for a bit"); + helper.resetEvent(writeEventHandle); + logger.finest("enqueueWrite() - exited false " + this); + return false; + } + synchronized (this.writeQueue) { + if (writeBuffer.remaining() == 0) { + writeBuffer.limit(writeBuffer.capacity()); + writeBuffer.put(writeQueue.remove()); + logger.finest("enqueueWrite() - grabbing next buffer from queue (new queue size=" + writeQueue.size() + ")"); + writeBuffer.flip(); + } + } + logger.finest("enqueueWrite() - start overlapped writing " + this + " bytes=" + writeBuffer.remaining()); + final boolean ret = helper.writeFileOverlapped(pipeHandle, writeOverlap, writeBuffer); + writeState = WritePipeState.WRITING_STATE; + final int err = ret ? 0 : helper.getLastError(); + logger.finest("enqueueWrite() - finished overlapped writing " + this + " bytes written=" + ret + " err=" + err); + if (!ret && err != 0 && err != 997) { + writeState = WritePipeState.ERROR_STATE; + manager.resetPipe(); + } + + logger.finest("enqueueWrite() - exited " + this); + return writeState != WritePipeState.QUIET_STATE; + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/AcceptThreadTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/AcceptThreadTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,81 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; + +public class AcceptThreadTest { + + private ExecutorService execService; + private WinPipesServerChannelImpl serverChannel; + private ClientPipeInstance clientSock; + private ThermostatIPCCallbacks callbacks; + private ClientHandler handler; + private AcceptThread thread; + + @Before + public void setUp() throws IOException { + + // Mock sockets + serverChannel = mock(WinPipesServerChannelImpl.class); + clientSock = mock(ClientPipeInstance.class); + // when(serverSock.accept()).thenReturn(clientSock); + callbacks = mock(ThermostatIPCCallbacks.class); + when(serverChannel.getCallbacks()).thenReturn(callbacks); + + + execService = mock(ExecutorService.class); + handler = mock(ClientHandler.class); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ActivatorTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ActivatorTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,78 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.osgi.framework.ServiceReference; + +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.winpipes.server.internal.Activator.IPCServiceHelper; +import com.redhat.thermostat.testutils.StubBundleContext; + +public class ActivatorTest { + + private Activator activator; + private WinPipesServerTransport service; + + @Before + public void setUp() throws Exception { + + IPCServiceHelper helper = mock(IPCServiceHelper.class); + service = mock(WinPipesServerTransport.class); + when(helper.createTransport()).thenReturn(service); + + activator = new Activator(helper); + } + + @Test + public void verifyServiceIsRegistered() throws Exception { + StubBundleContext context = new StubBundleContext(); + + activator.start(context); + + assertEquals(2, context.getAllServices().size()); + ServiceReference ref = context.getServiceReference(ServerTransport.class); + assertEquals(service, context.getService(ref)); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientHandlerTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/ClientHandlerTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,177 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static com.redhat.thermostat.shared.config.CommonPaths.THERMOSTAT_HOME; +import static com.redhat.thermostat.shared.config.CommonPaths.USER_THERMOSTAT_HOME; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.concurrent.ExecutorService; + +import com.redhat.thermostat.shared.config.NativeLibraryResolver; +import com.redhat.thermostat.shared.config.internal.CommonPathsImpl; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.AsyncMessageReader; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.AsyncMessageWriter; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageListener; +import com.redhat.thermostat.agent.ipc.winpipes.server.internal.ClientHandler.MessageCreator; + +public class ClientHandlerTest { + + private ClientPipeInstance client; + private ThermostatIPCCallbacks callbacks; + private AsyncMessageReader reader; + private AsyncMessageWriter writer; + private ExecutorService execService; + private MessageCreator messageCreator; + private MessageImpl message; + private ClientHandler handler; + + @Before + public void setup() throws Exception { + + System.setProperty(THERMOSTAT_HOME, "."); + System.setProperty(USER_THERMOSTAT_HOME, "."); + NativeLibraryResolver.setCommonPaths(new CommonPathsImpl()); + client = mock(ClientPipeInstance.class); + callbacks = mock(ThermostatIPCCallbacks.class); + reader = mock(AsyncMessageReader.class); + writer = mock(AsyncMessageWriter.class); + execService = mock(ExecutorService.class); + messageCreator = mock(MessageCreator.class); + message = mock(MessageImpl.class); + when(messageCreator.createMessage(any(ByteBuffer.class), any(MessageListener.class))).thenReturn(message); + handler = new ClientHandler(client, execService, callbacks, reader, writer, messageCreator); + } + + @Test + public void testRead() throws Exception { + ByteBuffer buff = mock(ByteBuffer.class); + handler.handleRead(buff); + verify(reader).process(buff); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testReadException() throws Exception { + + ByteBuffer buff = mock(ByteBuffer.class); + doThrow(new IOException()).when(reader).process(buff); + + try { + handler.handleRead(buff); + fail("Expected IOException"); + } catch (IOException e) { + verify(reader).process(buff); + } + } + + @Test + public void testWrite() throws Exception { + + handler.handleWrite(); + verify(writer).writeData(); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testWriteMoreMessages() throws Exception { + when(writer.hasMoreMessages()).thenReturn(true); + handler.handleWrite(); + verify(writer).writeData(); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testWriteException() throws Exception { + doThrow(new IOException()).when(writer).writeData(); + + try { + handler.handleWrite(); + fail("Expected IOException"); + } catch (IOException e) { + // Should close channel + verify(writer).writeData(); + verify(client).close(); + } + } + + @Test + public void testMessageRead() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + handler.messageRead(buf); + verify(messageCreator).createMessage(buf, handler); + + // Check callback notified + ArgumentCaptor runCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(execService).submit(runCaptor.capture()); + Runnable runnable = runCaptor.getValue(); + runnable.run(); + verify(callbacks).messageReceived(message); + } + + @Test + public void testWriteMessage() throws Exception { + + ByteBuffer buf = mock(ByteBuffer.class); + handler.writeMessage(buf); + verify(writer).enqueueForWriting(buf); + + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/MessageImplTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/MessageImplTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.MessageListener; + +public class MessageImplTest { + + private ByteBuffer data; + private MessageListener listener; + private MessageImpl message; + + @Before + public void setUp() { + data = mock(ByteBuffer.class); + listener = mock(MessageListener.class); + message = new MessageImpl(data, listener); + } + + @Test + public void testGetData() { + assertEquals(data, message.get()); + } + + @Test + public void testSendReply() throws Exception { + ByteBuffer reply = mock(ByteBuffer.class); + message.reply(reply); + verify(listener).writeMessage(reply); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipenameValidatorTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/PipenameValidatorTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,84 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; + +public class PipenameValidatorTest { + + private PipenameValidator validator; + + @Before + public void setUp() { + validator = new PipenameValidator(); + } + + @Test + public void testGoodName() { + boolean result = validator.validate("g00d-socKet_nam3"); + assertTrue(result); + } + + @Test + public void testSlash() { + boolean result = validator.validate("my/bad/socket"); + assertFalse(result); + } + + @Test + public void testNull() { + boolean result = validator.validate("myBadS\0cket"); + assertFalse(result); + } + + @Test + public void testDot() { + boolean result = validator.validate("myBadS.cket"); + assertFalse(result); + } + + @Test + public void testNewline() { + boolean result = validator.validate("myBadS\ncket"); + assertFalse(result); + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesChannelImplTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesChannelImplTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,122 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipe; +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.server.internal.WinPipesServerChannelImpl.WinPipesServerChannelHelper; + +public class WinPipesChannelImplTest { + + private static final String SERVER_NAME = "test"; + + private WinPipesServerChannelHelper channelHelper; + private final String PIPE_NAME = "//test/pipe/path"; + private WinPipe pipe; + private ThermostatIPCCallbacks callbacks; + private WinPipesIPCProperties props; + + @Before + public void setUp() throws IOException { + + channelHelper = mock(WinPipesServerChannelHelper.class); + + pipe = mock(WinPipe.class); + when(pipe.getPipeName()).thenReturn(SERVER_NAME); + when(pipe.isOpen()).thenReturn(true); + doNothing().when(pipe).close(); + + WinPipesServerChannelImpl channel = mock(WinPipesServerChannelImpl.class); + when(channel.getPipe()).thenReturn(pipe); + doCallRealMethod().when(channel).isOpen(); + doCallRealMethod().when(channel).close(); + when(channel.getChannelHelper()).thenReturn(channelHelper); + + callbacks = mock(ThermostatIPCCallbacks.class); + + when(channelHelper.open(PIPE_NAME)).thenReturn(pipe); + doCallRealMethod().when(channelHelper).isOpen(any(WinPipe.class)); + doCallRealMethod().when(channelHelper).close(any(WinPipe.class)); + //when(channelHelper.createServerChannel(PIPE_NAME, pipe, callbacks)).thenReturn(channel); + when(channelHelper.createServerChannel(anyString(), any(WinPipe.class), any(ThermostatIPCCallbacks.class))).thenReturn(channel); + + props = mock(WinPipesIPCProperties.class); + File propFile = mock(File.class); + when(props.getPipeName(SERVER_NAME)).thenReturn(PIPE_NAME); + when(props.getPropertiesFile()).thenReturn(propFile); + } + + @Test + public void testOpen() throws Exception { + WinPipesServerChannelImpl channel = createChannel(); + } + + @Test + public void testIsOpen() throws IOException { + WinPipesServerChannelImpl channel = createChannel(); + channel.isOpen(); + verify(pipe).isOpen(); + } + + @Test + public void testClose() throws IOException { + WinPipesServerChannelImpl channel = createChannel(); + channel.close(); + verify(pipe).close(); + } + + private WinPipesServerChannelImpl createChannel() throws IOException { + return WinPipesServerChannelImpl.createChannel(SERVER_NAME, callbacks, props, channelHelper); + } +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerTransportTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/windows-named-pipes/server/src/test/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WinPipesServerTransportTest.java Fri Feb 10 15:52:51 2017 -0500 @@ -0,0 +1,111 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * . + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.winpipes.server.internal; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.nio.channels.spi.AbstractSelector; +import java.nio.channels.spi.SelectorProvider; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.UserPrincipalLookupService; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesIPCProperties; +import org.junit.Before; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.winpipes.server.internal.WinPipesServerTransport.ChannelUtils; + +public class WinPipesServerTransportTest { + + private static final String SERVER_NAME = "test"; + + private WinPipesServerTransport transport; + private SelectorProvider provider; + private AbstractSelector selector; + private ExecutorService execService; + private PipenameValidator validator; + private Path socketDirPath; + private FileAttribute> fileAttr; + private Path socketPath; + private ThermostatIPCCallbacks callbacks; + private ChannelUtils channelUtils; + private WinPipesServerChannelImpl channel; + private WinPipesIPCProperties props; + private UserPrincipalLookupService lookup; + + @SuppressWarnings("unchecked") + @Before + public void setup() throws Exception { + provider = mock(SelectorProvider.class); + selector = mock(AbstractSelector.class); + when(provider.openSelector()).thenReturn(selector); + + props = mock(WinPipesIPCProperties.class); + File sockDirFile = mock(File.class); + // when(props.getPipePrefix()).thenReturn(sockDirFile); + socketDirPath = mock(Path.class); + when(socketDirPath.toAbsolutePath()).thenReturn(socketDirPath); + when(socketDirPath.normalize()).thenReturn(socketDirPath); + when(sockDirFile.toPath()).thenReturn(socketDirPath); + socketPath = mock(Path.class); + //when(socketDirPath.resolve(WinPipesServerTransport. + SERVER_NAME)).thenReturn(socketPath); + + + + execService = mock(ExecutorService.class); + validator = mock(PipenameValidator.class); + when(validator.validate(any(String.class))).thenReturn(true); + + + channelUtils = mock(ChannelUtils.class); + channel = mock(WinPipesServerChannelImpl.class); + File socketFile = mock(File.class); + when(socketFile.toPath()).thenReturn(socketPath); + + + callbacks = mock(ThermostatIPCCallbacks.class); + + } + +} diff -r ac0f70abdcb1 -r a6ba41a449c8 agent/proxy/server/src/main/java/com/redhat/thermostat/agent/proxy/server/AgentProxy.java --- a/agent/proxy/server/src/main/java/com/redhat/thermostat/agent/proxy/server/AgentProxy.java Thu Feb 09 15:16:54 2017 +0100 +++ b/agent/proxy/server/src/main/java/com/redhat/thermostat/agent/proxy/server/AgentProxy.java Fri Feb 10 15:52:51 2017 -0500 @@ -49,6 +49,9 @@ import com.redhat.thermostat.agent.ipc.client.ClientIPCServiceFactory; import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; import com.redhat.thermostat.common.utils.LoggingUtils; +import com.redhat.thermostat.shared.config.NativeLibraryResolver; +import com.redhat.thermostat.shared.config.OS; +import com.redhat.thermostat.shared.config.internal.CommonPathsImpl; import com.sun.tools.attach.AttachNotSupportedException; public class AgentProxy { @@ -65,7 +68,12 @@ if (args.length < 2) { usage(); } - + + // Windows named pipes has some native code - must set paths to find the DLL + if (OS.IS_WINDOWS) { + NativeLibraryResolver.setCommonPaths(new CommonPathsImpl()); + } + // Get IPC configuration file location from system property String configFileStr = System.getProperty(CONFIG_FILE_PROP); if (configFileStr == null) { diff -r ac0f70abdcb1 -r a6ba41a449c8 config/src/main/java/com/redhat/thermostat/shared/config/NativeLibraryResolver.java --- a/config/src/main/java/com/redhat/thermostat/shared/config/NativeLibraryResolver.java Thu Feb 09 15:16:54 2017 +0100 +++ b/config/src/main/java/com/redhat/thermostat/shared/config/NativeLibraryResolver.java Fri Feb 10 15:52:51 2017 -0500 @@ -43,7 +43,7 @@ /** * Class which enables resolving of native libraries placed in - * {@link CommonPaths#getNativeLibsRoot()}. + * {@link CommonPaths#getSystemNativeLibsRoot()}. * */ public class NativeLibraryResolver { @@ -57,7 +57,7 @@ /** * Gets the absolute path of a native library. The native library must be - * placed in directory as returned by {@link CommonPathsImpl#getNativeLibsRoot()}. + * placed in directory as returned by {@link CommonPathsImpl#getSystemNativeLibsRoot()}. * * @param libraryName * The name of the library. Specified in the same fashion as for diff -r ac0f70abdcb1 -r a6ba41a449c8 distribution/assembly/core-assembly-windows.xml --- a/distribution/assembly/core-assembly-windows.xml Thu Feb 09 15:16:54 2017 +0100 +++ b/distribution/assembly/core-assembly-windows.xml Fri Feb 10 15:52:51 2017 -0500 @@ -63,6 +63,8 @@ com.redhat.thermostat:thermostat-agent-proxy-server com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-server com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-client + com.redhat.thermostat:thermostat-agent-ipc-winpipes-server + com.redhat.thermostat:thermostat-agent-ipc-winpipes-client com.redhat.thermostat:thermostat-common-core com.redhat.thermostat:thermostat-common-command com.redhat.thermostat:thermostat-common-portability diff -r ac0f70abdcb1 -r a6ba41a449c8 distribution/pom.xml --- a/distribution/pom.xml Thu Feb 09 15:16:54 2017 +0100 +++ b/distribution/pom.xml Fri Feb 10 15:52:51 2017 -0500 @@ -228,9 +228,21 @@ -windows - - + ,com.redhat.thermostat.agent.ipc.winpipes.server=${project.version} + ,com.redhat.thermostat.agent.ipc.winpipes.server=${project.version} + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes-server + ${project.version} + + + com.redhat.thermostat + thermostat-agent-ipc-winpipes-client + ${project.version} + + @@ -250,6 +262,8 @@ todir="${project.build.directory}/image/libs/native" /> + diff -r ac0f70abdcb1 -r a6ba41a449c8 distribution/scripts/thermostat-ipc-client-common --- a/distribution/scripts/thermostat-ipc-client-common Thu Feb 09 15:16:54 2017 +0100 +++ b/distribution/scripts/thermostat-ipc-client-common Fri Feb 10 15:52:51 2017 -0500 @@ -23,3 +23,8 @@ IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/asm-util-${asm.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/asm-analysis-${asm.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/asm-tree-${asm.version}.jar" + +if [ $CYGWIN_MODE -eq 1 ]; then + IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-winpipes-client-${project.version}.jar" + IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-winpipes-common-${project.version}.jar" +fi diff -r ac0f70abdcb1 -r a6ba41a449c8 distribution/windows/scripts/thermostat-ipc-client-common.cmd --- a/distribution/windows/scripts/thermostat-ipc-client-common.cmd Thu Feb 09 15:16:54 2017 +0100 +++ b/distribution/windows/scripts/thermostat-ipc-client-common.cmd Fri Feb 10 15:52:51 2017 -0500 @@ -46,6 +46,8 @@ set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\thermostat-agent-ipc-common-@project.version@.jar set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\thermostat-agent-ipc-tcpsocket-client-@project.version@.jar set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\thermostat-agent-ipc-tcpsocket-common-@project.version@.jar +set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\thermostat-agent-ipc-winpipes-client-@project.version@.jar +set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\thermostat-agent-ipc-winpipes-common-@project.version@.jar set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\jnr-enxio-@jnr-enxio.version@.jar set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\jnr-constants-@jnr-constants.version@.jar set IPC_CLASSPATH=%IPC_CLASSPATH%;%THERMOSTAT_LIBS%\jnr-posix-@jnr-posix.version@.jar