Mercurial > hg > thermostat-ng > agent
changeset 2468:797b8bd043f2
Add TCP IPC protocol
- expected to reimplement with security and more flexibility
- available on all platforms but only the default on Windows
Reviewed-by: omajid,ebaron,jkang
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2016-September/021120.html
line wrap: on
line diff
--- a/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCPropertiesBuilder.java Fri Sep 30 10:15:14 2016 -0400 +++ b/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCPropertiesBuilder.java Fri Sep 30 11:20:00 2016 -0400 @@ -68,7 +68,7 @@ } IPCType type = IPCType.fromConfigValue(typeString); if (type == null) { - throw new IOException("Unable to determine IPC type from property file"); + throw new IOException("Unable to determine IPC type from property file ('"+typeString+"' not recognized)"); } return getPropertiesForType(type, props, ipcProperties); } finally {
--- a/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCType.java Fri Sep 30 10:15:14 2016 -0400 +++ b/agent/ipc/common/src/main/java/com/redhat/thermostat/agent/ipc/common/internal/IPCType.java Fri Sep 30 11:20:00 2016 -0400 @@ -43,6 +43,7 @@ public enum IPCType { UNIX_SOCKET("unixsocket"), + TCP_SOCKET("tcpsocket"), UNKNOWN(null); // Used for testing // Reverse lookup
--- a/agent/ipc/pom.xml Fri Sep 30 10:15:14 2016 -0400 +++ b/agent/ipc/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -63,6 +63,7 @@ <module>client</module> <module>server</module> <module>unix-socket</module> + <module>tcp-socket</module> </modules> </project>
--- a/agent/ipc/server/src/main/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriter.java Fri Sep 30 10:15:14 2016 -0400 +++ b/agent/ipc/server/src/main/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriter.java Fri Sep 30 11:20:00 2016 -0400 @@ -39,6 +39,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.ServerSocket; import java.util.Properties; import com.redhat.thermostat.agent.ipc.common.internal.IPCType; @@ -46,12 +47,34 @@ class IPCConfigurationWriter { static final String PROP_IPC_TYPE = "type"; - static final String PROP_UNIX_SOCKET_DIR = "unixsocket.dir"; - private static final String COMMENTS = "Configuration for Inter-process Communication (IPC) used in the Thermostat agent.\n" - + "The agent is configured to use Unix sockets for IPC by default.\n" - + "The options below can be set to modify the defaults used by the agent:\n\n" - + "Directory where Unix sockets are created, which may be deleted if it already exists.\n" - + PROP_UNIX_SOCKET_DIR + "=/path/to/unix/sockets\n"; + private static final String PROP_UNIX_SOCKET_DIR = "unixsocket.dir"; + + + private static final String PROP_TCP_SOCKET_SUFFIX= ".tcpsocket.port"; + //private static final String TCP_SOCKET_JUMBO_FRAMES = "tcpsocket.jumboframes"; + + // suggest some default vaules for TCP sockets - test this range for unused sockets + private static int TEST_SOCKET_LOW = 51200; + private static int TEST_SOCKET_HIGH = 55000; + + private static final boolean IS_UNIX = !System.getProperty("os.name").contains("Windows"); + + private static final String COMMENTS = + "Configuration for Inter-process Communication (IPC) used in the Thermostat agent.\n" + + "The agent is configured to use Unix sockets for IPC by default on Linux,\n" + + "or TCP sockets on Windows.\n" + + "The options below can be set to modify the defaults used by the agent:\n\n" + + + "Directory where Unix sockets are created, which may be deleted if it already exists.\n" + + PROP_UNIX_SOCKET_DIR + "=/path/to/unix/sockets\n\n" + + + "TCP socket port numbers for various services.\n" + + "command-channel" + PROP_TCP_SOCKET_SUFFIX + "=NNNN\n" + + "agent-proxy" + PROP_TCP_SOCKET_SUFFIX + "=MMMM\n\n"; + + //in the future, will allow jumbo frames for performance + //+ "TCP parameters\n" + //+ TCP_SOCKET_JUMBO_FRAMES + "=false\n\n"; private final File configFile; private final PropertiesHelper helper; @@ -70,13 +93,48 @@ configFile.createNewFile(); Properties props = helper.createProperties(); - // Leave remainder of properties as defaults - props.setProperty(PROP_IPC_TYPE, IPCType.UNIX_SOCKET.getConfigValue()); - + + props.setProperty(PROP_IPC_TYPE, IS_UNIX ? IPCType.UNIX_SOCKET.getConfigValue() : IPCType.TCP_SOCKET.getConfigValue()); + + // unix socket will work without configuration (creates sockets in tmp directory + // but tcpsocket always needs ports (in the future, should support service discovery) + + // this implementation is flawed; + // the unused ports might be in used by a process that simply wasn't running at thermostat setup time. + + { + int cmdPort = findUnusedTCPSocket(TEST_SOCKET_LOW,TEST_SOCKET_HIGH); + int aport = cmdPort == 0 ? 0 : findUnusedTCPSocket(cmdPort+1,TEST_SOCKET_HIGH); + + props.setProperty("command-channel" + PROP_TCP_SOCKET_SUFFIX, Integer.toString(cmdPort)); + props.setProperty("agent-proxy" + PROP_TCP_SOCKET_SUFFIX, Integer.toString(aport)); + } + try (FileOutputStream fos = helper.createStream(configFile)) { props.store(fos, COMMENTS); } } + + private static int findUnusedTCPSocket(int lowPort, int highPort) { + for (int port=lowPort; port<=highPort; port++) { + if (isTCPPortAvailable(port)) { + return port; + } + } + return 0; + } + + private static boolean isTCPPortAvailable(int tcpport) { + try { + final ServerSocket socket = new ServerSocket(tcpport); + socket.close(); + return true; + } + catch (IOException e) { + // socket already in use + } + return false; + } // For testing purposes static class PropertiesHelper {
--- a/agent/ipc/server/src/test/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriterTest.java Fri Sep 30 10:15:14 2016 -0400 +++ b/agent/ipc/server/src/test/java/com/redhat/thermostat/agent/ipc/server/internal/IPCConfigurationWriterTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -59,7 +59,9 @@ import com.redhat.thermostat.agent.ipc.server.internal.IPCConfigurationWriter.PropertiesHelper; public class IPCConfigurationWriterTest { - + + private static final boolean IS_UNIX = !System.getProperty("os.name").contains("Windows"); + private IPCConfigurationWriter writer; private Properties props; private FileOutputStream fos; @@ -81,8 +83,9 @@ @Test public void testWrite() throws Exception { writer.write(); - - verify(props).setProperty(IPCConfigurationWriter.PROP_IPC_TYPE, IPCType.UNIX_SOCKET.getConfigValue()); + + final IPCType expectedType = IS_UNIX ? IPCType.UNIX_SOCKET : IPCType.TCP_SOCKET; + verify(props).setProperty(IPCConfigurationWriter.PROP_IPC_TYPE, expectedType.getConfigValue()); verify(props).store(eq(fos), anyString()); verify(fos).close(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright 2012-2016 Red Hat, Inc. + + This file is part of Thermostat. + + Thermostat is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + Thermostat is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Thermostat; see the file COPYING. If not see + <http://www.gnu.org/licenses/>. + + Linking this code with other modules is making a combined work + based on this code. Thus, the terms and conditions of the GNU + General Public License cover the whole combination. + + As a special exception, the copyright holders of this code give + you permission to link this code with independent modules to + produce an executable, regardless of the license terms of these + independent modules, and to copy and distribute the resulting + executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions + of the license of that module. An independent module is a module + which is not derived from or based on this code. If you modify + this code, you may extend this exception to your version of the + library, but you are not obligated to do so. If you do not wish + to do so, delete this exception statement from your version. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket</artifactId> + <version>1.99.12-SNAPSHOT</version> + </parent> + + <artifactId>thermostat-agent-ipc-tcpsocket-client</artifactId> + <packaging>jar</packaging> + + <name>Thermostat IPC Client</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-client</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <!-- We don't want to depend on Thermostat annotations, + so we override parent pom's configuration --> + <configuration combine.self="override" /> + </plugin> + <!-- This is a dependency of the Byteman helper and that needs source + level JDK 6 in order to be able to use the helper for JVMs running + on JDK 6. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/client/internal/TcpSocketMessageChannel.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,84 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.client.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.SyncMessageReader; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.SyncMessageWriter; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl; + +public class TcpSocketMessageChannel implements IPCMessageChannel { + + private final ThermostatSocketChannelImpl socketChannel; + private final SyncMessageReader reader; + private final SyncMessageWriter writer; + + public TcpSocketMessageChannel(ThermostatSocketChannelImpl socketChannel) { + this(socketChannel, new SyncMessageReader(socketChannel), new SyncMessageWriter(socketChannel)); + } + + TcpSocketMessageChannel(ThermostatSocketChannelImpl socketChannel, + SyncMessageReader reader, SyncMessageWriter writer) { + this.socketChannel = socketChannel; + this.reader = reader; + this.writer = writer; + } + + @Override + public boolean isOpen() { + return socketChannel.isOpen(); + } + + @Override + public void close() throws IOException { + socketChannel.close(); + } + + @Override + public ByteBuffer readMessage() throws IOException { + return reader.readData(); + } + + @Override + public void writeMessage(ByteBuffer message) throws IOException { + writer.writeData(message); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/client/internal/TcpSocketTransportImpl.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,93 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.client.internal; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.net.InetAddress; + +import java.io.IOException; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransport; +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties; + +public class TcpSocketTransportImpl implements ClientTransport { + + private final TcpSocketIPCProperties socketProps; + private final SocketHelper sockHelper; + + TcpSocketTransportImpl(IPCProperties props) throws IOException { + this(props, new SocketHelper()); + } + + TcpSocketTransportImpl(IPCProperties props, SocketHelper sockHelper) throws IOException { + if (!(props instanceof TcpSocketIPCProperties)) { + throw new IOException("Unexpected IPC properties for 'socket' type"); + } + this.socketProps = (TcpSocketIPCProperties) props; + this.sockHelper = sockHelper; + } + + @Override + public IPCMessageChannel connect(final String serverName) throws IOException { + final SocketAddress addr = socketProps.getSocketAddr(serverName); + final ThermostatSocketChannelImpl sockChannel = sockHelper.openSocketChannel(serverName, addr); + return sockHelper.createMessageChannel(sockChannel); + } + + // java.lang.Objects is JDK 7+ and we need this to be JDK 6 compat. + private static void requireNonNull(Object item, String message) { + if (item == null) { + throw new NullPointerException(message); + } + } + + // Helper class for testing + static class SocketHelper { + ThermostatSocketChannelImpl openSocketChannel(String name, SocketAddress addr) throws IOException { + return ThermostatSocketChannelImpl.open(name, addr); + } + + TcpSocketMessageChannel createMessageChannel(ThermostatSocketChannelImpl sockChannel) { + return new TcpSocketMessageChannel(sockChannel); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/client/internal/TcpSocketTransportProvider.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.client.internal; + +import java.io.IOException; + +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransport; +import com.redhat.thermostat.agent.ipc.client.internal.ClientTransportProvider; +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; + +public class TcpSocketTransportProvider implements ClientTransportProvider { + + @Override + public ClientTransport create(IPCProperties props) throws IOException { + return new TcpSocketTransportImpl(props); + } + + @Override + public IPCType getType() { + return IPCType.TCP_SOCKET; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.client.internal.ClientTransportProvider Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,1 @@ +com.redhat.thermostat.agent.ipc.tcpsocket.client.internal.TcpSocketTransportProvider
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/client/internal/TcpSocketMessageChannelTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,97 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.client.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.SyncMessageReader; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.SyncMessageWriter; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl; + +public class TcpSocketMessageChannelTest { + + private SyncMessageReader reader; + private SyncMessageWriter writer; + private TcpSocketMessageChannel channel; + private ThermostatSocketChannelImpl sock; + + @Before + public void setUp() { + sock = mock(ThermostatSocketChannelImpl.class); + reader = mock(SyncMessageReader.class); + writer = mock(SyncMessageWriter.class); + channel = new TcpSocketMessageChannel(sock, reader, writer); + } + + @Test + public void testReadMessage() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + when(reader.readData()).thenReturn(buf); + ByteBuffer result = channel.readMessage(); + assertEquals(buf, result); + } + + @Test + public void testWriteMessage() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + channel.writeMessage(buf); + verify(writer).writeData(buf); + } + + @Test + public void testIsOpen() throws Exception { + when(sock.isOpen()).thenReturn(true); + assertTrue(channel.isOpen()); + verify(sock).isOpen(); + } + + @Test + public void testClose() throws Exception { + channel.close(); + verify(sock).close(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/client/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/client/internal/TcpSocketTransportImplTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,121 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.client.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.SocketAddress; + +import java.io.File; +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.client.IPCMessageChannel; +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.tcpsocket.client.internal.TcpSocketTransportImpl.SocketHelper; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties; + +public class TcpSocketTransportImplTest { + + private static final String SERVER_NAME = "test"; + private SocketAddress socketAddr; + private SocketHelper sockHelper; + private TcpSocketMessageChannel messageChannel; + private TcpSocketIPCProperties props; + + @Before + public void setUp() throws Exception { + socketAddr = mock(SocketAddress.class); + sockHelper = mock(SocketHelper.class); + ThermostatSocketChannelImpl sockChannel = mock(ThermostatSocketChannelImpl.class); + when(sockHelper.openSocketChannel(eq(SERVER_NAME), eq(socketAddr))).thenReturn(sockChannel); + when(sockHelper.createMessageChannel(sockChannel)).thenReturn(messageChannel); + + props = mock(TcpSocketIPCProperties.class); + when(props.getSocketAddr(SERVER_NAME)).thenReturn(socketAddr); + } + + @Test + public void testConnectToServer() throws Exception { + TcpSocketTransportImpl service = new TcpSocketTransportImpl(props, sockHelper); + IPCMessageChannel result = service.connect(SERVER_NAME); + assertEquals(messageChannel, result); + } + + @Test(expected=IOException.class) + public void testBadProperties() throws Exception { + // Not TcpSocketIPCProperties + IPCProperties props = mock(IPCProperties.class); + new TcpSocketTransportImpl(props, sockHelper); + } +/** + @Test + public void testConnectToServerNoOpen() throws Exception { + TcpSocketTransportImpl service = new TcpSocketTransportImpl(props, sockHelper); + + try { + service.connect(SERVER_NAME); + fail("Expected IOException"); + } catch (IOException ignored) { + verify(sockHelper, never()).openSocketChannel(SERVER_NAME, socketAddr); + } + } + **/ + + @Test + public void testConnectToServerBadSocket() throws Exception { + when(sockHelper.openSocketChannel(SERVER_NAME, socketAddr)).thenThrow(new IOException()); + TcpSocketTransportImpl service = new TcpSocketTransportImpl(props, sockHelper); + + try { + service.connect(SERVER_NAME); + fail("Expected IOException"); + } catch (IOException ignored) { + verify(sockHelper).openSocketChannel(SERVER_NAME, socketAddr); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,112 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright 2012-2016 Red Hat, Inc. + + This file is part of Thermostat. + + Thermostat is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + Thermostat is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Thermostat; see the file COPYING. If not see + <http://www.gnu.org/licenses/>. + + Linking this code with other modules is making a combined work + based on this code. Thus, the terms and conditions of the GNU + General Public License cover the whole combination. + + As a special exception, the copyright holders of this code give + you permission to link this code with independent modules to + produce an executable, regardless of the license terms of these + independent modules, and to copy and distribute the resulting + executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions + of the license of that module. An independent module is a module + which is not derived from or based on this code. If you modify + this code, you may extend this exception to your version of the + library, but you are not obligated to do so. If you do not wish + to do so, delete this exception statement from your version. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket</artifactId> + <version>1.99.12-SNAPSHOT</version> + </parent> + + <artifactId>thermostat-agent-ipc-tcpsocket-common</artifactId> + <packaging>bundle</packaging> + + <name>Thermostat IPC Common</name> + + <dependencies> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-Vendor>Red Hat, Inc.</Bundle-Vendor> + <Bundle-SymbolicName>com.redhat.thermostat.agent.ipc.tcpsocket.common</Bundle-SymbolicName> + <Export-Package> + <!-- Exported, but not for use outside of Agent --> + com.redhat.thermostat.agent.ipc.tcpsocket.common.internal + </Export-Package> + <!-- Do not autogenerate uses clauses in Manifests --> + <_nouses>true</_nouses> + </instructions> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <!-- We don't want to depend on Thermostat annotations, + so we override parent pom's configuration --> + <configuration combine.self="override" /> + </plugin> + <!-- This is a dependency of the Byteman helper and that needs source + level JDK 6 in order to be able to use the helper for JVMs running + on JDK 6. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/AsyncMessageReader.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,77 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class AsyncMessageReader extends MessageReader { + + private final ByteBuffer readBuffer; + private final ThermostatSocketChannelImpl channel; + private final MessageListener listener; + + public AsyncMessageReader(ThermostatSocketChannelImpl channel, MessageListener listener) { + this(channel, listener, new MessageLimits()); + } + + public AsyncMessageReader(ThermostatSocketChannelImpl channel, MessageListener listener, MessageLimits limits) { + super(limits); + this.readBuffer = ByteBuffer.allocate(limits.getBufferSize()); + this.channel = channel; + this.listener = listener; + } + + public void readData() throws IOException { + // Read message from client + readBuffer.clear(); + int read = channel.read(readBuffer); + if (read < 0) { + // Received EOF + channel.close(); + } else { + readBuffer.flip(); + processData(readBuffer); + } + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + listener.messageRead(fullMessage); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/AsyncMessageWriter.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,117 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +public class AsyncMessageWriter extends MessageWriter { + + private final ThermostatSocketChannelImpl channel; + private final Queue<MessageToWrite> messages; + private final MessageLimits limits; + + private boolean headerSent; + + public AsyncMessageWriter(ThermostatSocketChannelImpl channel) { + this(channel, new MessageLimits()); + } + + AsyncMessageWriter(ThermostatSocketChannelImpl channel, MessageLimits limits) { + super(limits); + this.channel = channel; + this.limits = limits; + this.messages = new LinkedList<MessageToWrite>(); + this.headerSent = false; + } + + public synchronized void writeData() throws IOException { + if (messages.isEmpty()) { + throw new IllegalStateException("Write requested, but nothing to write"); + } + + boolean done = writeMessage(messages.peek()); + if (done) { + // Reset state for next message + headerSent = false; + // Remove header and message from the queue + messages.remove(); + } + } + + private boolean writeMessage(MessageToWrite toWrite) throws IOException { + boolean done = false; + ByteBuffer currentHeader = toWrite.getHeader(); + ByteBuffer currentMessage = toWrite.getMessage(); + + // Write the header if not yet written fully + if (!headerSent) { + channel.write(currentHeader); + // Check if the entire header was written + headerSent = !currentHeader.hasRemaining(); + } + + // Write the message, if we've sent the full header + if (headerSent) { + channel.write(currentMessage); + // Check if the message has been fully written + done = !currentMessage.hasRemaining(); + } + return done; + } + + public synchronized boolean hasMoreMessages() { + return !messages.isEmpty(); + } + + public synchronized void enqueueForWriting(ByteBuffer buf) throws IOException { + if (buf.remaining() > limits.getMaxMessageSize()) { + throw new IOException("Total message size is larger than maximum of " + + limits.getMaxMessageSize() + " bytes"); + } + // Split into messages and add headers + ByteBuffer fullMessage = buf.duplicate(); + while (fullMessage.hasRemaining()) { + MessageToWrite message = getNextMessage(fullMessage); + // Add header and message to queue + messages.add(message); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageHeader.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,187 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +class MessageHeader { + + private static final int INTEGER_BYTES = (Integer.SIZE/Byte.SIZE); + private static final int BYTE_BYTES = 1; + private static final byte[] MAGIC = { 'T', 'H', 'E', 'R' }; + // Size upto and including header size value + // Should remain constant across protocol versions + private static final int MINIMUM_HEADER_SIZE = MAGIC.length + INTEGER_BYTES * 2; + // Size of (magic + protocolVersion + headerSize + messageSize + moreData) + private static final int DEFAULT_HEADER_SIZE = MAGIC.length + INTEGER_BYTES * 3 + BYTE_BYTES; + // Protocol version to use for writing new MessageHeaders + private static final int DEFAULT_PROTOCOL_VERSION = 1; + + private final int protocolVersion; + private final int headerSize; + private int messageSize; + // If true, then the next message is a continuation of the data in this one + private boolean moreData; + + MessageHeader() { + this(DEFAULT_PROTOCOL_VERSION, DEFAULT_HEADER_SIZE); + } + + // Only used for reading MessageHeaders + MessageHeader(int protocolVersion, int headerSize) { + this.protocolVersion = protocolVersion; + this.headerSize = headerSize; + this.messageSize = -1; + this.moreData = false; + } + + byte[] toByteArray() { + ByteBuffer buf = ByteBuffer.allocate(getHeaderSize()); + buf.put(MAGIC); + buf.putInt(protocolVersion); + buf.putInt(headerSize); + buf.putInt(messageSize); + putBoolean(buf, moreData); + return buf.array(); + } + + static MessageHeader fromByteBuffer(ByteBuffer buf) throws IOException { + checkMagic(buf); + int protoVersion = getPositiveIntOrThrow(buf, "Protocol version"); + int headerSize = getPositiveIntOrThrow(buf, "Header size"); + return new MessageHeader(protoVersion, headerSize); + } + + void setRemainingFields(ByteBuffer buf) throws IOException { + int effectiveProtocolVersion = getEffectiveProtocolVersion(); + if (effectiveProtocolVersion <= 0) { + throw new IOException("Invalid protocol version: " + effectiveProtocolVersion); + } + if (effectiveProtocolVersion >= 1) { + this.messageSize = getPositiveIntOrThrow(buf, "Message size"); + this.moreData = getBoolean(buf); + } + } + + private int getEffectiveProtocolVersion() { + // If this header's protocol version is greater than we can handle + // just use the maximum we know about + return Math.min(protocolVersion, DEFAULT_PROTOCOL_VERSION); + } + + private static void checkMagic(ByteBuffer buf) throws IOException { + checkBytesRemaining(buf, MAGIC.length); + + byte[] bufMagic = new byte[MAGIC.length]; + buf.get(bufMagic); + if (!Arrays.equals(bufMagic, MAGIC)) { + throw new IOException("MessageHeader is invalid"); + } + } + + private static void checkBytesRemaining(ByteBuffer buf, int size) throws IOException { + if (buf.remaining() < size) { + throw new IOException("MessageHeader too short"); + } + } + + private static int getPositiveIntOrThrow(ByteBuffer buf, String errorMsgPrefix) throws IOException { + checkBytesRemaining(buf, INTEGER_BYTES); + int result = buf.getInt(); + if (result <= 0) { + throw new IOException(errorMsgPrefix + " must be greater than zero"); + } + return result; + } + + static int getDefaultProtocolVersion() { + return DEFAULT_PROTOCOL_VERSION; + } + + int getProtocolVersion() { + return protocolVersion; + } + + static int getMinimumHeaderSize() { + return MINIMUM_HEADER_SIZE; + } + + static int getDefaultHeaderSize() { + return DEFAULT_HEADER_SIZE; + } + + int getHeaderSize() { + return headerSize; + } + + int getMessageSize() { + return messageSize; + } + + void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + + boolean isMoreData() { + return moreData; + } + + void setMoreData(boolean moreData) { + this.moreData = moreData; + } + + void dumpHeader(String prefix) { + System.out.println(prefix + "Protocol Version: " + protocolVersion); + System.out.println(prefix + "Header Length: " + headerSize); + System.out.println(prefix + "Message Length: " + messageSize); + System.out.println(prefix + "More Data: " + moreData); + } + + private boolean getBoolean(ByteBuffer buf) throws IOException { + checkBytesRemaining(buf, 1); + byte b = buf.get(); + return (b != 0); + } + + private void putBoolean(ByteBuffer buf, boolean value) { + byte b = (byte) (value ? 1 : 0); + buf.put(b); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageLimits.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,82 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +class MessageLimits { + + private static final int DEFAULT_MAX_MESSAGE_PART_SIZE = 0x800; // 2 KiB + private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x20000; // 128 KiB + private static final int DEFAULT_MAX_HEADER_SIZE = 0x80; // 128 B + private static final int DEFAULT_BUFFER_SIZE = DEFAULT_MAX_MESSAGE_PART_SIZE; + + /* + * Values below should adhere to the following: + * 0 <= MinHdr <= MaxHdr <= MaxMsgPart <= Buffer <= MaxMsg + */ + // Maximum value for a single part of a message + private final int maxMessagePartSize; + // Maximum value for the combined payload of a multi-part message + private final int maxMessageSize; + // Maximum value for a message header + private final int maxHeaderSize; + // Size used for read/write buffers + private final int bufferSize; + + MessageLimits() { + this.maxMessagePartSize = DEFAULT_MAX_MESSAGE_PART_SIZE; + this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + this.maxHeaderSize = DEFAULT_MAX_HEADER_SIZE; + this.bufferSize = DEFAULT_BUFFER_SIZE; + } + + int getMaxMessagePartSize() { + return maxMessagePartSize; + } + + int getMaxMessageSize() { + return maxMessageSize; + } + + int getMaxHeaderSize() { + return maxHeaderSize; + } + + int getBufferSize() { + return bufferSize; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageListener.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,48 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MessageListener { + + void messageRead(ByteBuffer buf); + + void writeMessage(ByteBuffer buf) throws IOException; + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageReader.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,232 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +abstract class MessageReader { + + // States used to track how much of a message (header) we've processed + protected static enum ReadState { NEW_MESSAGE, MIN_HEADER_READ, FULL_HEADER_READ, ERROR }; + // Number of bytes to read from the header to determine the total header size + protected static final int MIN_HEADER_SIZE = MessageHeader.getMinimumHeaderSize(); + // If true, dumps header information for each header read/written + private static final boolean DEBUG_HEADER = false; + + // Fixed size buffer used to read the initial part of a message header + private final ByteBuffer minHeaderBuf; + // Various message-related limits + private final MessageLimits limits; + + // Current state of data that has been processed by this reader + protected ReadState state; + // Message header for the message currently being processed + protected MessageHeader currentHeader; + // Buffer used for the remainder of the message header, + // after the minimum has been read + private ByteBuffer headerBuf; + // Buffer used for the message payload of the message being processed currently + private ByteBuffer messageBuf; + // Messages that have finished being processed, but are part of a multi-part message + // that has not been fully processed yet. + private List<ByteBuffer> messages; + + protected MessageReader(MessageLimits limits) { + this.state = ReadState.NEW_MESSAGE; + this.currentHeader = null; + this.minHeaderBuf = ByteBuffer.allocate(MIN_HEADER_SIZE); + this.limits = limits; + this.headerBuf = null; + this.messageBuf = null; + this.messages = new ArrayList<ByteBuffer>(); + } + + protected void processData(ByteBuffer readBuffer) throws IOException { + try { + while (readBuffer.hasRemaining()) { + switch (state) { + case NEW_MESSAGE: + // Append data to current buffer, if room + putMinRemaining(minHeaderBuf, readBuffer); + + // Is there enough data to read the minimum header size? + if (minHeaderBuf.remaining() == 0) { + minHeaderBuf.position(0); + if (currentHeader != null || headerBuf != null) { + throw new IllegalStateException("Header already processed"); + } + // Create Header + currentHeader = MessageHeader.fromByteBuffer(minHeaderBuf); + minHeaderBuf.clear(); + + // Check header size + // Set up reader to process remainder of header + int headerSize = currentHeader.getHeaderSize(); + if (headerSize > limits.getMaxHeaderSize()) { + throw new IOException("Message header size larger than maximum of " + + limits.getMaxHeaderSize() + " bytes"); + } + int remainingHeader = headerSize - MIN_HEADER_SIZE; + headerBuf = ByteBuffer.allocate(remainingHeader); + state = ReadState.MIN_HEADER_READ; + } + break; + case MIN_HEADER_READ: + if (currentHeader == null || headerBuf == null) { + throw new IllegalStateException("No header available"); + } else if (messageBuf != null) { + throw new IllegalStateException("Message already processed"); + } + + // Append data to current buffer, if room + putMinRemaining(headerBuf, readBuffer); + + // Have we read the full header? + if (headerBuf.remaining() == 0) { + headerBuf.position(0); + + // Finish setting MessageHeader object fields + currentHeader.setRemainingFields(headerBuf); + + // Set up reader to process message payload + int messageSize = currentHeader.getMessageSize(); + if (messageSize > limits.getMaxMessagePartSize()) { + throw new IOException("Message part size larger than maximum of " + + limits.getMaxMessagePartSize() + " bytes"); + } + messageBuf = ByteBuffer.allocate(messageSize); + headerBuf = null; + state = ReadState.FULL_HEADER_READ; + + // Dump header information if requested + if (DEBUG_HEADER) { + currentHeader.dumpHeader("[Read] "); + } + } + break; + case FULL_HEADER_READ: + if (currentHeader == null || messageBuf == null) { + throw new IllegalStateException("Missing header or message"); + } + // Append data to current buffer, if room + putMinRemaining(messageBuf, readBuffer); + + // Have we read the full message? + if (messageBuf.remaining() == 0) { + messageBuf.position(0); + + // Store this message until we received all parts + messages.add(messageBuf); + // Did we receive all parts of this message? + if (!currentHeader.isMoreData()) { + // Notify listener + ByteBuffer fullMessage = joinMessages(); + readFullMessage(fullMessage); + // Start new list of message parts + messages = new ArrayList<ByteBuffer>(); + } + + // Reset reader state + messageBuf = null; + currentHeader = null; + state = ReadState.NEW_MESSAGE; + } + break; + case ERROR: + throw new IOException("Reader state corrupted by previous fatal error"); + default: + throw new IllegalStateException("Unknown state: " + state.name()); + } + } + } catch (IOException e) { + // Set to error state to stop this reader from processing more data + this.state = ReadState.ERROR; + throw e; + } catch (IllegalStateException e) { + // Set to error state to stop this reader from processing more data + this.state = ReadState.ERROR; + throw e; + } + } + + protected abstract void readFullMessage(ByteBuffer fullMessage); + + private ByteBuffer joinMessages() throws IOException { + // Single part shortcut + if (messages.size() == 1) { + return messages.get(0); + } + + int totalSize = 0; + int maxMessageSize = limits.getMaxMessageSize(); + for (ByteBuffer buf : messages) { + totalSize += buf.limit(); + // Check for overflow as well as size limit + if (totalSize < 0 || totalSize > maxMessageSize) { + throw new IOException("Total message size is larger than maximum of " + maxMessageSize + " bytes"); + } + } + + ByteBuffer fullMessage = ByteBuffer.allocate(totalSize); + for (ByteBuffer buf : messages) { + fullMessage.put(buf); + } + fullMessage.flip(); + return fullMessage; + } + + private void putMinRemaining(ByteBuffer dst, ByteBuffer src) { + int minRemaining = Math.min(dst.remaining(), src.remaining()); + for (int i = 0; i < minRemaining; i++) { + dst.put(src.get()); + } + } + + // For testing purposes + ReadState getState() { + return state; + } + + // For testing purposes + MessageHeader getCurrentHeader() { + return currentHeader; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageWriter.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,100 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.nio.ByteBuffer; + +abstract class MessageWriter { + + // If true, dumps header information for each header read/written + private static final boolean DEBUG_HEADER = false; + + // Various message-related limits + private final MessageLimits limits; + + protected MessageWriter(MessageLimits limits) { + this.limits = limits; + } + + protected MessageToWrite getNextMessage(ByteBuffer fullMessage) { + ByteBuffer message = ByteBuffer.allocate(limits.getMaxMessagePartSize()); + putMinRemaining(message, fullMessage); + message.flip(); + + // Create a message header for this part + int messageSize = message.limit(); + boolean moreData = fullMessage.hasRemaining(); + MessageHeader header = new MessageHeader(); + header.setMessageSize(messageSize); + header.setMoreData(moreData); + + // Dump header information if requested + if (DEBUG_HEADER) { + header.dumpHeader("[Write] "); + } + + ByteBuffer headerBuf = ByteBuffer.wrap(header.toByteArray()); + return new MessageToWrite(headerBuf, message); + } + + private void putMinRemaining(ByteBuffer dst, ByteBuffer src) { + int minRemaining = Math.min(dst.remaining(), src.remaining()); + for (int i = 0; i < minRemaining; i++) { + dst.put(src.get()); + } + } + + // Container class to hold a single header and message + protected static class MessageToWrite { + private final ByteBuffer header; + private final ByteBuffer message; + + public MessageToWrite(ByteBuffer header, ByteBuffer message) { + this.header = header; + this.message = message; + } + + public ByteBuffer getHeader() { + return header; + } + + public ByteBuffer getMessage() { + return message; + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/SyncMessageReader.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,111 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SyncMessageReader extends MessageReader { + + private final ByteBuffer readBuffer; + private final ThermostatSocketChannelImpl channel; + private ByteBuffer fullMessage; + + public SyncMessageReader(ThermostatSocketChannelImpl channel) { + this(channel, new MessageLimits()); + } + + public SyncMessageReader(ThermostatSocketChannelImpl channel, MessageLimits limits) { + super(limits); + this.readBuffer = ByteBuffer.allocate(limits.getBufferSize()); + this.channel = channel; + this.fullMessage = null; + } + + public ByteBuffer readData() throws IOException { + fullMessage = null; + boolean moreData = true; + while (moreData) { + doBlockingRead(readBuffer, MIN_HEADER_SIZE); + processData(readBuffer); + assertState(ReadState.MIN_HEADER_READ); + + int remainingHeaderSize = currentHeader.getHeaderSize() - MIN_HEADER_SIZE; + doBlockingRead(readBuffer, remainingHeaderSize); + processData(readBuffer); + assertState(ReadState.FULL_HEADER_READ); + + int messageSize = currentHeader.getMessageSize(); + moreData = currentHeader.isMoreData(); + doBlockingRead(readBuffer, messageSize); + processData(readBuffer); + assertState(ReadState.NEW_MESSAGE); + } + + if (fullMessage == null) { + throw new IllegalStateException("No message was read"); + } + return fullMessage; + } + + private void doBlockingRead(ByteBuffer buf, int expected) throws IOException { + buf.clear(); + // Use a precise limit to read exact number of bytes + buf.limit(expected); + + while (buf.hasRemaining()) { + int read = channel.read(buf); + if (read < 0) { + throw new IOException("Stream closed unexpectedly while reading message"); + } + } + buf.flip(); + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + this.fullMessage = fullMessage; + } + + private void assertState(ReadState expected) { + if (state != expected) { + throw new IllegalStateException("Expected ReadState \'" + expected.name() + + "\', but was \'" + state.name() + "\'"); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/SyncMessageWriter.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,77 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SyncMessageWriter extends MessageWriter { + + private final ThermostatSocketChannelImpl channel; + private final MessageLimits limits; + + public SyncMessageWriter(ThermostatSocketChannelImpl channel) { + this(channel, new MessageLimits()); + } + + SyncMessageWriter(ThermostatSocketChannelImpl channel, MessageLimits limits) { + super(limits); + this.channel = channel; + this.limits = limits; + } + + public void writeData(ByteBuffer data) throws IOException { + if (data.remaining() > limits.getMaxMessageSize()) { + throw new IOException("Total message size is larger than maximum of " + + limits.getMaxMessageSize() + " bytes"); + } + // Split into messages and add headers + ByteBuffer fullMessage = data.duplicate(); + while (fullMessage.hasRemaining()) { + MessageToWrite message = getNextMessage(fullMessage); + writeFully(message.getHeader()); + writeFully(message.getMessage()); + } + } + + private void writeFully(ByteBuffer buf) throws IOException { + while (buf.hasRemaining()) { + channel.write(buf); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/TcpSocketIPCProperties.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,99 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.File; +import java.io.IOException; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.net.InetAddress; + +import java.util.Properties; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; + +public class TcpSocketIPCProperties extends IPCProperties { + + private static final String PROP_TCP_PREFIX = ""; + private static final String PROP_TCP_SUFFIX = ".tcpsocket.port"; + + private final Properties props; + + TcpSocketIPCProperties(Properties props, File propFile) throws IOException { + this(props, propFile, new PathUtils()); + } + + TcpSocketIPCProperties(Properties props, File propFile, PathUtils pathUtils) throws IOException { + super(IPCType.TCP_SOCKET, propFile); + this.props = props; + } + + public SocketAddress getSocketAddr(final String serverName) throws IOException { + final String propName = getPropertyNameFromServerName(serverName); + final String sockPortStr = props.getProperty(propName); + if (sockPortStr == null) + throw new IOException("Property '"+propName+"' not found for server '"+serverName+"'."); + final int sockPort; + try { + sockPort = Integer.parseInt(sockPortStr); + } catch (NumberFormatException e) { + throw new IOException("Invalid port '"+sockPortStr+"' specified for property '"+propName+"'."); + } + // this code works on Java 1.6, but forces the use of IPV4; localaddr = InetAddress.getLoopbackAddress() is a better option for java 1.7+ + final byte[] loopbackAddr = new byte[] { 127, 0, 0, 1 }; + final InetAddress localhost = InetAddress.getByAddress(loopbackAddr); + return new InetSocketAddress(localhost,sockPort); + } + + public static String getPropertyNameFromServerName(final String serverName) { + return (serverName == null || serverName.isEmpty()) + ? PROP_TCP_PREFIX + "default" + PROP_TCP_SUFFIX + : PROP_TCP_PREFIX + serverName + PROP_TCP_SUFFIX; + } + + // Helper class for testing purposes + static class PathUtils { + String getSystemProperty(String name) { + return System.getProperty(name); + } + String getEnvironmentVariable(String name) { + return System.getenv(name); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/TcpSocketIPCPropertiesProvider.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; + +public class TcpSocketIPCPropertiesProvider implements IPCPropertiesProvider { + + @Override + public TcpSocketIPCProperties create(Properties props, File propFile) throws IOException { + return new TcpSocketIPCProperties(props, propFile); + } + + @Override + public IPCType getType() { + return IPCType.TCP_SOCKET; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/ThermostatSocketChannelImpl.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,119 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.net.SocketAddress; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public class ThermostatSocketChannelImpl implements ByteChannel { + + private static TcpSocketChannelHelper channelHelper = new TcpSocketChannelHelper(); + + protected final SocketChannel impl; + private final String name; + + protected ThermostatSocketChannelImpl(String name, SocketChannel impl) { + this.name = name; + this.impl = impl; + } + + public static ThermostatSocketChannelImpl open(String name, SocketAddress addr) throws IOException { + SocketChannel impl = channelHelper.open(addr); + return new ThermostatSocketChannelImpl(name, impl); + } + + public String getName() { + return name; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return channelHelper.read(impl, dst); + } + + public int write(ByteBuffer src) throws IOException { + return channelHelper.write(impl, src); + } + + @Override + public boolean isOpen() { + return channelHelper.isOpen(impl); + } + + @Override + public void close() throws IOException { + channelHelper.close(impl); + } + + // ---- For testing purposes ---- + + // Wraps methods that can't be mocked + static class TcpSocketChannelHelper { + SocketChannel open(final SocketAddress addr) throws IOException { + return SocketChannel.open(addr); + } + + int read(final SocketChannel channel, final ByteBuffer dst) throws IOException { + return channel.read(dst); + } + + int write(final SocketChannel channel, final ByteBuffer src) throws IOException { + return channel.write(src); + } + + boolean isOpen(final SocketChannel channel) { + return channel.isOpen(); + } + + void close(final AbstractInterruptibleChannel channel) throws IOException { + channel.close(); + } + + ////SocketAddress createAddress(File path) throws IOException { + // return new TcpSocketAddress(path); + //} + } + + public static void setChannelHelper(TcpSocketChannelHelper helper) { + ThermostatSocketChannelImpl.channelHelper = helper; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/main/resources/META-INF/services/com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,1 @@ +com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCPropertiesProvider
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/AsyncMessageReaderTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,106 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.joinByteArrays; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.ReadAnswer; + +public class AsyncMessageReaderTest { + + private ThermostatSocketChannelImpl channel; + private MessageListener listener; + private AsyncMessageReader reader; + + @Before + public void setUp() throws Exception { + channel = mock(ThermostatSocketChannelImpl.class); + listener = mock(MessageListener.class); + reader = new AsyncMessageReader(channel, listener); + } + + @Test + public void testReadDataSingle() throws Exception { + final byte[] messageBytes = "Hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + + // First read header, then message + when(channel.read(any(ByteBuffer.class))).thenAnswer(new ReadAnswer(new byte[][] { headerBytes, messageBytes })); + + reader.readData(); + verify(listener, never()).messageRead(any(ByteBuffer.class)); + + reader.readData(); + ByteBuffer expected = ByteBuffer.wrap(messageBytes); + verify(listener).messageRead(expected); + } + + @Test + public void testReadDataMulti() throws Exception { + final byte[] fullMessageBytes = "Hello World.".getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, fullMessageBytes.length); + final byte[] header2Bytes = createHeader(message2Bytes.length, false); + + // First first header, then all remaining data + byte[] joined = joinByteArrays(message1Bytes, header2Bytes, message2Bytes); + byte[][] bufs = new byte[][] { header1Bytes, joined }; + when(channel.read(any(ByteBuffer.class))).thenAnswer(new ReadAnswer(bufs)); + + reader.readData(); + verify(listener, never()).messageRead(any(ByteBuffer.class)); + + reader.readData(); + ByteBuffer expected = ByteBuffer.wrap(fullMessageBytes); + verify(listener).messageRead(expected); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/AsyncMessageWriterTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,157 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createByteArrays; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createHeader; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.WriteAnswer; + +public class AsyncMessageWriterTest { + + private static final int MAX_MESSAGE_PART_SIZE = 5; + + private ThermostatSocketChannelImpl channel; + private AsyncMessageWriter writer; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + channel = mock(ThermostatSocketChannelImpl.class); + limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_PART_SIZE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + writer = new AsyncMessageWriter(channel, limits); + } + + @Test + public void testWriteDataNotFullHeader() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + byte[][] results = new byte[][] { new byte[headerBytes.length - 1] }; + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + + // Should not attempt to write message, since header was not fully written + byte[] expected = Arrays.copyOfRange(headerBytes, 0, headerBytes.length - 1); + assertArrayEquals(expected, results[0]); + assertTrue(writer.hasMoreMessages()); + verify(channel).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataNotFullMessage() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + byte[][] results = createByteArrays(headerBytes.length, messageBytes.length - 1); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + + assertArrayEquals(headerBytes, results[0]); + assertArrayEquals(Arrays.copyOfRange(messageBytes, 0, messageBytes.length - 1), results[1]); + assertTrue(writer.hasMoreMessages()); + verify(channel, times(2)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataMulti() throws Exception { + final String message = "hello world"; + final byte[] fullMessageBytes = message.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, 10); + final byte[] message3Bytes = Arrays.copyOfRange(fullMessageBytes, 10, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, true); + final byte[] header3Bytes = createHeader(message3Bytes.length, false); + + byte[][] results = createByteArrays(header1Bytes.length, message1Bytes.length, + header2Bytes.length, message2Bytes.length, header3Bytes.length, message3Bytes.length); + + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(fullMessageBytes); + writer.enqueueForWriting(buf); + writer.writeData(); + assertTrue(writer.hasMoreMessages()); + writer.writeData(); + assertTrue(writer.hasMoreMessages()); + writer.writeData(); + assertFalse(writer.hasMoreMessages()); + + assertArrayEquals(header1Bytes, results[0]); + assertArrayEquals(message1Bytes, results[1]); + assertArrayEquals(header2Bytes, results[2]); + assertArrayEquals(message2Bytes, results[3]); + assertArrayEquals(header3Bytes, results[4]); + assertArrayEquals(message3Bytes, results[5]); + verify(channel, times(6)).write(any(ByteBuffer.class)); + } + + @Test(expected=IOException.class) + public void testWriteMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.enqueueForWriting(buf); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/ChannelTestUtils.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,147 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +class ChannelTestUtils { + + // Expected to be in-order position arguments + static byte[][] splitByteArray(byte[] data, int... splitPos) { + int numSplits = splitPos.length; + if (numSplits == 0) { + throw new IllegalArgumentException("Give at least one split position"); + } + byte[][] result = new byte[numSplits + 1][]; + + result[0] = Arrays.copyOfRange(data, 0, splitPos[0]); + for (int i = 1; i < numSplits; i++) { + result[i] = Arrays.copyOfRange(data, splitPos[i - 1], splitPos[i]); + } + result[numSplits] = Arrays.copyOfRange(data, splitPos[numSplits - 1], data.length); + return result; + } + + static byte[] joinByteArrays(byte[] first, byte[]... more) { + if (more.length == 0) { + return first; + } + + int size = first.length; + for (byte[] other : more) { + size += other.length; + } + + byte[] result = new byte[size]; + System.arraycopy(first, 0, result, 0, first.length); + int ix = first.length; + for (byte[] other : more) { + System.arraycopy(other, 0, result, ix, other.length); + ix += other.length; + } + return result; + } + + static byte[] createHeader(int messageSize, boolean moreData) { + MessageHeader header = new MessageHeader(); + header.setMessageSize(messageSize); + header.setMoreData(moreData); + return header.toByteArray(); + } + + static class ReadAnswer implements Answer<Integer> { + + private byte[][] bufs; + private int count; + + ReadAnswer(byte[][] bufs) { + this.bufs = bufs; + this.count = 0; + } + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + // read(ByteBuffer) + ByteBuffer dst = (ByteBuffer) invocation.getArguments()[0]; + byte[] buf = bufs[count++]; + dst.put(buf); + return buf.length; + } + + } + + static byte[][] createByteArrays(int firstLength, int... moreLengths) { + int numSplits = moreLengths.length + 1; + byte[][] result = new byte[numSplits][]; + + result[0] = new byte[firstLength]; + for (int i = 1; i < numSplits; i++) { + result[i] = new byte[moreLengths[i - 1]]; + } + return result; + } + + static class WriteAnswer implements Answer<Integer> { + + private final byte[][] bufs; + private int count; + + WriteAnswer(byte[][] bufs) { + this.bufs = bufs; + this.count = 0; + } + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + // write(ByteBuffer) + ByteBuffer src = (ByteBuffer) invocation.getArguments()[0]; + // Don't write all at once + byte[] buf = bufs[count++]; + int numToCopy = Math.min(buf.length, src.remaining()); + for (int i = 0; i < numToCopy; i++) { + buf[i] = src.get(); + } + return numToCopy; + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageHeaderTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,250 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageHeader; + +public class MessageHeaderTest { + + private static final int INT_BYTES = 4; + + @Test + public void testNewHeaderBase() throws Exception { + MessageHeader header = new MessageHeader(); + + assertEquals(MessageHeader.getDefaultProtocolVersion(), header.getProtocolVersion()); + assertEquals(MessageHeader.getDefaultHeaderSize(), header.getHeaderSize()); + + // Not part of minimal header + assertEquals(-1, header.getMessageSize()); + assertFalse(header.isMoreData()); + } + + @Test + public void testNewHeaderV1() throws Exception { + MessageHeader header = new MessageHeader(); + header.setMessageSize(50); + header.setMoreData(true); + + assertEquals(MessageHeader.getDefaultProtocolVersion(), header.getProtocolVersion()); + assertEquals(MessageHeader.getDefaultHeaderSize(), header.getHeaderSize()); + + // Protocol version 1 fields + assertEquals(50, header.getMessageSize()); + assertTrue(header.isMoreData()); + } + + @Test + public void testToBytesBase() throws Exception { + final byte[] expected = getBytes(MessageHeader.getDefaultProtocolVersion(), + MessageHeader.getDefaultHeaderSize(), -1, false); + + MessageHeader header = new MessageHeader(); + assertArrayEquals(expected, header.toByteArray()); + } + + @Test + public void testToBytesV1() throws Exception { + final byte[] expected = getBytes(1, MessageHeader.getDefaultHeaderSize(), 8000, true); + + MessageHeader header = new MessageHeader(); + // Protocol version 1 fields + header.setMessageSize(8000); + header.setMoreData(true); + + assertArrayEquals(expected, header.toByteArray()); + } + + @Test + public void testFromByteBuffer() throws Exception { + ByteBuffer buf = createByteBuffer(5, 20, 8000, true); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + assertEquals(5, header.getProtocolVersion()); + assertEquals(20, header.getHeaderSize()); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortMagic() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(3); + buf.put(new byte[] { 'T', 'H', 'E' }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortProtocolVersion() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(7); + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(new byte[] { 0x0, 0x0, 0x1 }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferShortHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(11); + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(intToBytes(5)); + buf.put(new byte[] { 0x0, 0x5, 0x2 }); + buf.flip(); + + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferNegativeProtocolVersion() throws Exception { + ByteBuffer buf = createByteBuffer(-4, 25, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferZeroProtocolVersion() throws Exception { + ByteBuffer buf = createByteBuffer(0, 25, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferNegativeHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(MessageHeader.getDefaultHeaderSize()); + fillByteBuffer(buf, 5, -22, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testFromByteBufferZeroHeaderSize() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(MessageHeader.getDefaultHeaderSize()); + fillByteBuffer(buf, 5, 0, 20, true); + MessageHeader.fromByteBuffer(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV0() throws Exception { + ByteBuffer buf = createByteBuffer(0, MessageHeader.getDefaultHeaderSize(), 20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + + // Expect no change from default + assertEquals(-1, header.getMessageSize()); + assertEquals(false, header.isMoreData()); + } + + @Test + public void testSetRemainingFieldsV1() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + + // Expect remaining fields copied from buf + assertEquals(20, header.getMessageSize()); + assertTrue(header.isMoreData()); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ShortMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + // Subtract moreData bit, and one bit of message size + buf.limit(buf.limit() - 2); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1NegativeMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), -20, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ZeroMessageSize() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 0, true); + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + @Test(expected=IOException.class) + public void testSetRemainingFieldsV1ShortMoreData() throws Exception { + ByteBuffer buf = createByteBuffer(1, MessageHeader.getDefaultHeaderSize(), 20, true); + // Subtract moreData bit + buf.limit(buf.limit() - 1); + + MessageHeader header = MessageHeader.fromByteBuffer(buf); + header.setRemainingFields(buf); + } + + private byte[] getBytes(int protoVer, int headerSize, int messageSize, boolean moreData) { + ByteBuffer buf = createByteBuffer(protoVer, headerSize, messageSize, moreData); + return buf.array(); + } + + private ByteBuffer createByteBuffer(int protoVer, int headerSize, int messageSize, boolean moreData) { + ByteBuffer buf = ByteBuffer.allocate(headerSize); + fillByteBuffer(buf, protoVer, headerSize, messageSize, moreData); + return buf; + } + + private void fillByteBuffer(ByteBuffer buf, int protoVer, int headerSize, int messageSize, boolean moreData) { + // Magic number + buf.put(new byte[] { 'T', 'H', 'E', 'R' }); + buf.put(intToBytes(protoVer)); + buf.put(intToBytes(headerSize)); + buf.put(intToBytes(messageSize)); + buf.put((byte) (moreData ? 1 : 0)); + buf.flip(); + } + + private byte[] intToBytes(int num) { + ByteBuffer buf = ByteBuffer.allocate(INT_BYTES); + buf.putInt(num); + return buf.array(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageReaderTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,354 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.joinByteArrays; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.splitByteArray; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageReader.ReadState; + +public class MessageReaderTest { + + private TestMessageReader reader; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + limits = mock(MessageLimits.class); + when(limits.getMaxHeaderSize()).thenReturn(Integer.MAX_VALUE); + when(limits.getMaxMessagePartSize()).thenReturn(Integer.MAX_VALUE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + reader = new TestMessageReader(limits); + } + + @Test + public void testOneRead() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3); + reader.readData(splitData[0]); + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + assertNull(reader.getCurrentHeader()); + } + + @Test + public void testTwoRead() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + assertNull(reader.getCurrentHeader()); + } + + @Test + public void testThreeReadGotMinHeader() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + reader.readData(splitData[2]); + + assertEquals(ReadState.MIN_HEADER_READ, reader.getState()); + assertNotNull(reader.getCurrentHeader()); + } + + @Test + public void testFourReadGotMinHeader() throws Exception { + byte[] data = createMinHeader(); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE, MessageReader.MIN_HEADER_SIZE + 2); + reader.readData(splitData[0]); + reader.readData(splitData[1]); + reader.readData(splitData[2]); + reader.readData(splitData[3]); + + assertEquals(ReadState.MIN_HEADER_READ, reader.getState()); + assertNotNull(reader.getCurrentHeader()); + + } + + @Test + public void testFiveReadGotFullHeader() throws Exception { + final String message = "hello"; + int messageSize = message.getBytes(Charset.forName("UTF-8")).length; + byte[] data = createHeader(messageSize, false); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE - 1, MessageReader.MIN_HEADER_SIZE + 2); + for (int i = 0; i < splitData.length; i++) { + reader.readData(splitData[i]); + } + + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + MessageHeader header = reader.getCurrentHeader(); + assertNotNull(header); + assertEquals(messageSize, header.getMessageSize()); + } + + @Test + public void testSixReadGotFullMessage() throws Exception { + final String message = "hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + int messageSize = messageBytes.length; + byte[] headerBytes = createHeader(messageSize, false); + byte[] data = joinByteArrays(headerBytes, messageBytes); + byte[][] splitData = splitByteArray(data, 3, 7, MessageReader.MIN_HEADER_SIZE - 1, + MessageReader.MIN_HEADER_SIZE + 2, headerBytes.length + 1); + for (int i = 0; i < splitData.length; i++) { + reader.readData(splitData[i]); + } + + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + List<ByteBuffer> fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(messageBytes), fullMessages.get(0)); + } + + @Test + public void testTwoReadGotTwoMessages() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + byte[][] splitData = splitByteArray(data, header1Bytes.length + message1Size); + + reader.readData(splitData[0]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List<ByteBuffer> fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + } + + @Test + public void testThreeReadGotTwoMessages() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List<ByteBuffer> fullMessages = reader.getFullMessages(); + assertEquals(1, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + } + + @Test + public void testThreeReadGotTwoMessagesNotContinued() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, false); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + List<ByteBuffer> fullMessages = reader.getFullMessages(); + assertEquals(2, fullMessages.size()); + assertEquals(ByteBuffer.wrap(message1Bytes), fullMessages.get(0)); + assertEquals(ByteBuffer.wrap(message2Bytes), fullMessages.get(1)); + } + + @Test + public void testFourReadGotThreeMessagesOneContinued() throws Exception { + final String message1 = "hello"; + final String message2 = "world!"; + final String message3 = "test"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + byte[] message3Bytes = message3.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + int message3Size = message3Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] header3Bytes = createHeader(message3Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes, header3Bytes, message3Bytes); + int messageBoundary = header1Bytes.length + message1Size; + int messageBoundary2 = messageBoundary + header2Bytes.length + message2Size; + byte[][] splitData = splitByteArray(data, messageBoundary - 2, messageBoundary + 2, messageBoundary2); + + reader.readData(splitData[0]); + assertEquals(ReadState.FULL_HEADER_READ, reader.getState()); + reader.readData(splitData[1]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[2]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + reader.readData(splitData[3]); + assertEquals(ReadState.NEW_MESSAGE, reader.getState()); + + byte[] expected = message1.concat(message2).getBytes(Charset.forName("UTF-8")); + List<ByteBuffer> fullMessages = reader.getFullMessages(); + assertEquals(2, fullMessages.size()); + assertEquals(ByteBuffer.wrap(expected), fullMessages.get(0)); + assertEquals(ByteBuffer.wrap(message3Bytes), fullMessages.get(1)); + } + + @Test + public void testHeaderTooLarge() throws Exception { + when(limits.getMaxHeaderSize()).thenReturn(0); + byte[] data = createMinHeader(); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + } + } + + @Test + public void testMessagePartTooLarge() throws Exception { + when(limits.getMaxMessagePartSize()).thenReturn(0); + + final String message = "hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + int messageSize = messageBytes.length; + byte[] headerBytes = createHeader(messageSize, false); + byte[] data = joinByteArrays(headerBytes, messageBytes); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + } + } + + @Test + public void testMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + + final String message1 = "hello"; + final String message2 = "world!"; + byte[] message1Bytes = message1.getBytes(Charset.forName("UTF-8")); + byte[] message2Bytes = message2.getBytes(Charset.forName("UTF-8")); + int message1Size = message1Bytes.length; + int message2Size = message2Bytes.length; + byte[] header1Bytes = createHeader(message1Size, true); + byte[] header2Bytes = createHeader(message2Size, false); + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + try { + reader.readData(data); + fail("Expected IOException"); + } catch (IOException ignored) { + assertEquals(ReadState.ERROR, reader.getState()); + assertTrue(reader.getFullMessages().isEmpty()); + } + } + + private byte[] createMinHeader() { + MessageHeader header = new MessageHeader(); + return header.toByteArray(); + } + + static class TestMessageReader extends MessageReader { + + private static final int BUFFER_SIZE = 1024; + private final ByteBuffer readBuffer; + private final List<ByteBuffer> fullMessages; + + TestMessageReader(MessageLimits limits) { + super(limits); + this.readBuffer = ByteBuffer.allocate(BUFFER_SIZE); + this.fullMessages = new ArrayList<ByteBuffer>(); + } + + @Override + protected void readFullMessage(ByteBuffer fullMessage) { + fullMessages.add(fullMessage); + } + + void readData(byte[] buf) throws IOException { + readBuffer.clear(); + readBuffer.put(buf); + readBuffer.flip(); + processData(readBuffer); + } + + List<ByteBuffer> getFullMessages() { + return fullMessages; + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/MessageWriterTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,134 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageWriter.MessageToWrite; + +public class MessageWriterTest { + + private static final int MAX_MESSAGE_SIZE = 5; + + private MessageWriter writer; + + @Before + public void setUp() throws Exception { + MessageLimits limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_SIZE); + writer = new TestMessageWriter(limits); + } + + @Test + public void testGetNextMessageSingle() { + String message = "Hello"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + MessageToWrite toWrite = writer.getNextMessage(buf); + + MessageHeader header = new MessageHeader(); + header.setMessageSize(5); + header.setMoreData(false); + ByteBuffer headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + ByteBuffer messageBuf = toWrite.getMessage(); + assertArrayEquals(messageBytes, messageBuf.array()); + } + + @Test + public void testGetNextMessageMulti() { + String message = "Hello World"; + byte[] messageBytes = message.getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + + // First part + MessageToWrite toWrite = writer.getNextMessage(buf); + + MessageHeader header = new MessageHeader(); + header.setMessageSize(5); + header.setMoreData(true); + ByteBuffer headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + ByteBuffer messageBuf = toWrite.getMessage(); + ByteBuffer expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 0, 5)); + assertEquals(expectedMessage, messageBuf); + + // Second part + toWrite = writer.getNextMessage(buf); + + // Header should be the same + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + messageBuf = toWrite.getMessage(); + expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 5, 10)); + assertEquals(expectedMessage, messageBuf); + + // Third part + toWrite = writer.getNextMessage(buf); + + header = new MessageHeader(); + header.setMessageSize(1); + header.setMoreData(false); + headerBuf = toWrite.getHeader(); + assertArrayEquals(header.toByteArray(), headerBuf.array()); + + messageBuf = toWrite.getMessage(); + expectedMessage = ByteBuffer.wrap(Arrays.copyOfRange(messageBytes, 10, 11)); + assertEquals(expectedMessage, messageBuf); + } + + private static class TestMessageWriter extends MessageWriter { + + private TestMessageWriter(MessageLimits limits) { + super(limits); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/SyncMessageReaderTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,114 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.joinByteArrays; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.splitByteArray; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.ReadAnswer; + +public class SyncMessageReaderTest { + + private static final int FULL_HEADER_SIZE = MessageHeader.getDefaultHeaderSize(); + private static final int MIN_HEADER_SIZE = MessageHeader.getMinimumHeaderSize(); + + private ThermostatSocketChannelImpl channel; + private SyncMessageReader reader; + + @Before + public void setUp() throws Exception { + channel = mock(ThermostatSocketChannelImpl.class); + reader = new SyncMessageReader(channel); + } + + @Test + public void testReadDataSingle() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + + byte[] data = joinByteArrays(headerBytes, messageBytes); + // Split min header, remaining header and message into two pieces each + mockReads(data, MIN_HEADER_SIZE - 3, MIN_HEADER_SIZE, + FULL_HEADER_SIZE - 3, FULL_HEADER_SIZE, + FULL_HEADER_SIZE + 3); + ByteBuffer result = reader.readData(); + assertEquals(ByteBuffer.wrap(messageBytes), result); + } + + @Test + public void testReadDataMulti() throws Exception { + final String messageString = "hello world"; + final byte[] fullMessageBytes = messageString.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, false); + + byte[] data = joinByteArrays(header1Bytes, message1Bytes, header2Bytes, message2Bytes); + int messageBoundary = header1Bytes.length + message1Bytes.length; + // Split min header, remaining header and message into two pieces each + mockReads(data, MIN_HEADER_SIZE - 3, MIN_HEADER_SIZE, + FULL_HEADER_SIZE - 3, FULL_HEADER_SIZE, + FULL_HEADER_SIZE + 3, messageBoundary, + messageBoundary + MIN_HEADER_SIZE - 3, + messageBoundary + MIN_HEADER_SIZE, + messageBoundary + FULL_HEADER_SIZE - 3, + messageBoundary + FULL_HEADER_SIZE, + messageBoundary + FULL_HEADER_SIZE + 3); + ByteBuffer result = reader.readData(); + assertEquals(ByteBuffer.wrap(messageString.getBytes(Charset.forName("UTF-8"))), result); + } + + private void mockReads(byte[] data, int... splitPos) throws IOException { + byte[][] splitData = splitByteArray(data, splitPos); + when(channel.read(any(ByteBuffer.class))).thenAnswer(new ReadAnswer(splitData)); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/SyncMessageWriterTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,161 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createByteArrays; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.createHeader; +import static com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.joinByteArrays; +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ChannelTestUtils.WriteAnswer; + +public class SyncMessageWriterTest { + + private static final int MAX_MESSAGE_PART_SIZE = 5; + + private ThermostatSocketChannelImpl channel; + private SyncMessageWriter writer; + private MessageLimits limits; + + @Before + public void setUp() throws Exception { + channel = mock(ThermostatSocketChannelImpl.class); + limits = mock(MessageLimits.class); + when(limits.getMaxMessagePartSize()).thenReturn(MAX_MESSAGE_PART_SIZE); + when(limits.getMaxMessageSize()).thenReturn(Integer.MAX_VALUE); + writer = new SyncMessageWriter(channel, limits); + } + + @Test + public void testWriteDataSingle() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length, messageBytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + assertArrayEquals(headerBytes, results[0]); + assertArrayEquals(messageBytes, results[1]); + verify(channel, times(2)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataSplitHeader() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length - 2, 2, messageBytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + final byte[] joinedHeader = joinByteArrays(results[0], results[1]); + assertArrayEquals(headerBytes, joinedHeader); + assertArrayEquals(messageBytes, results[2]); + verify(channel, times(3)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataSplitMessage() throws Exception { + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + final byte[] headerBytes = createHeader(messageBytes.length, false); + final byte[][] results = createByteArrays(headerBytes.length, messageBytes.length - 2, 2); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + + assertArrayEquals(headerBytes, results[0]); + final byte[] joinedMessage = joinByteArrays(results[1], results[2]); + assertArrayEquals(messageBytes, joinedMessage); + verify(channel, times(3)).write(any(ByteBuffer.class)); + } + + @Test + public void testWriteDataMulti() throws Exception { + final String message = "hello world"; + final byte[] fullMessageBytes = message.getBytes(Charset.forName("UTF-8")); + final byte[] message1Bytes = Arrays.copyOfRange(fullMessageBytes, 0, 5); + final byte[] message2Bytes = Arrays.copyOfRange(fullMessageBytes, 5, 10); + final byte[] message3Bytes = Arrays.copyOfRange(fullMessageBytes, 10, fullMessageBytes.length); + final byte[] header1Bytes = createHeader(message1Bytes.length, true); + final byte[] header2Bytes = createHeader(message2Bytes.length, true); + final byte[] header3Bytes = createHeader(message3Bytes.length, false); + + final byte[][] results = createByteArrays(header1Bytes.length, message1Bytes.length, header2Bytes.length, message2Bytes.length, header3Bytes.length, message3Bytes.length); + WriteAnswer answer = new WriteAnswer(results); + when(channel.write(any(ByteBuffer.class))).thenAnswer(answer); + + ByteBuffer buf = ByteBuffer.wrap(fullMessageBytes); + writer.writeData(buf); + + assertArrayEquals(header1Bytes, results[0]); + assertArrayEquals(message1Bytes, results[1]); + assertArrayEquals(header2Bytes, results[2]); + assertArrayEquals(message2Bytes, results[3]); + assertArrayEquals(header3Bytes, results[4]); + assertArrayEquals(message3Bytes, results[5]); + verify(channel, times(6)).write(any(ByteBuffer.class)); + } + + @Test(expected=IOException.class) + public void testWriteMessageTooLarge() throws Exception { + when(limits.getMaxMessageSize()).thenReturn(0); + final byte[] messageBytes = "hello".getBytes(Charset.forName("UTF-8")); + ByteBuffer buf = ByteBuffer.wrap(messageBytes); + writer.writeData(buf); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/TcpSocketIPCPropertiesTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,96 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties.PathUtils; + +public class TcpSocketIPCPropertiesTest { + + private static final String TEST_SERVER_NAME = "testserver"; + private final String UNKNOWN_SERVER_NAME = "unknownserver"; + + private static final String BAD_SOCKET_PORT_STRING = "not an int"; + private static final int TEST_SOCKET_PORT = 9876; + + private Properties jProps; + private File propFile; + private PathUtils pathUtils; + + @Before + public void setUp() throws Exception { + jProps = mock(Properties.class); + pathUtils = mock(PathUtils.class); + propFile = mock(File.class); + final String propName = TcpSocketIPCProperties.getPropertyNameFromServerName(TEST_SERVER_NAME); + when(jProps.getProperty(propName)).thenReturn(Integer.toString(TEST_SOCKET_PORT)); + } + + @Test + public void testType() throws Exception { + TcpSocketIPCProperties props = new TcpSocketIPCProperties(jProps, propFile, pathUtils); + assertEquals(IPCType.TCP_SOCKET, props.getType()); + } + + @Test(expected=IOException.class) + public void testBadPort() throws Exception { + final String propName = TcpSocketIPCProperties.getPropertyNameFromServerName(TEST_SERVER_NAME); + when(jProps.getProperty(propName)).thenReturn(BAD_SOCKET_PORT_STRING); + //when(pathUtils.getSystemProperty(propName)).thenReturn(BAD_SOCKET_PORT_STRING); + new TcpSocketIPCProperties(jProps, propFile, pathUtils).getSocketAddr(TEST_SERVER_NAME); + } + + @Test(expected=IOException.class) + public void testBadServer() throws Exception { + final String propName = TcpSocketIPCProperties.getPropertyNameFromServerName(UNKNOWN_SERVER_NAME); + when(jProps.getProperty(propName)).thenReturn(null); + //when(pathUtils.getSystemProperty(propName)).thenReturn(null); + new TcpSocketIPCProperties(jProps, propFile, pathUtils).getSocketAddr(UNKNOWN_SERVER_NAME); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/common/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/common/internal/ThermostatSocketChannelImplTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,149 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.common.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl.TcpSocketChannelHelper; + + +public class ThermostatSocketChannelImplTest { + + private TcpSocketChannelHelper channelHelper; + private SocketAddress addr; + private SocketChannel impl; + + @Before + public void setUp() throws Exception { + this.channelHelper = mock(TcpSocketChannelHelper.class); + addr = mock(SocketAddress.class); + + impl = mock(SocketChannel.class); + when(channelHelper.open(addr)).thenReturn(impl); + when(channelHelper.isOpen(impl)).thenReturn(true); + + ThermostatSocketChannelImpl.setChannelHelper(channelHelper); + } + + @After + public void tearDown() throws Exception { + ThermostatSocketChannelImpl.setChannelHelper(new TcpSocketChannelHelper()); + } + + @Test + public void testOpen() throws Exception { + ThermostatSocketChannelImpl channel = ThermostatSocketChannelImpl.open("test", addr); + verify(channelHelper).open(addr); + + assertEquals("test", channel.getName()); + } + + @Test + public void testReadSingle() throws IOException { + final int readSize = 20; + when(channelHelper.read(eq(impl), any(ByteBuffer.class))).thenReturn(readSize); + + ThermostatSocketChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(readSize); + int read = channel.read(buf); + + assertEquals(readSize, read); + verify(channelHelper).read(impl, buf); + } + + @Test + public void testReadEOF() throws IOException { + final int readSize = 20; + when(channelHelper.read(eq(impl), any(ByteBuffer.class))).thenReturn(-1); + + ThermostatSocketChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(readSize); + int read = channel.read(buf); + + assertEquals(-1, read); + + verify(channelHelper).read(impl, buf); + } + + @Test + public void testWriteSingle() throws Exception { + final int writeSize = 20; + when(channelHelper.write(eq(impl), any(ByteBuffer.class))).thenReturn(writeSize); + + ThermostatSocketChannelImpl channel = createChannel(); + ByteBuffer buf = ByteBuffer.allocate(writeSize); + int written = channel.write(buf); + + assertEquals(writeSize, written); + verify(channelHelper).write(impl, buf); + } + + @Test + public void testIsOpen() throws Exception { + ThermostatSocketChannelImpl channel = createChannel(); + boolean open = channel.isOpen(); + verify(channelHelper).isOpen(impl); + assertTrue(open); + } + + @Test + public void testClose() throws Exception { + ThermostatSocketChannelImpl channel = createChannel(); + channel.close(); + verify(channelHelper).close(impl); + } + + private ThermostatSocketChannelImpl createChannel() throws IOException { + return new ThermostatSocketChannelImpl("test", impl); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright 2012-2016 Red Hat, Inc. + + This file is part of Thermostat. + + Thermostat is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + Thermostat is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Thermostat; see the file COPYING. If not see + <http://www.gnu.org/licenses/>. + + Linking this code with other modules is making a combined work + based on this code. Thus, the terms and conditions of the GNU + General Public License cover the whole combination. + + As a special exception, the copyright holders of this code give + you permission to link this code with independent modules to + produce an executable, regardless of the license terms of these + independent modules, and to copy and distribute the resulting + executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions + of the license of that module. An independent module is a module + which is not derived from or based on this code. If you modify + this code, you may extend this exception to your version of the + library, but you are not obligated to do so. If you do not wish + to do so, delete this exception statement from your version. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc</artifactId> + <version>1.99.12-SNAPSHOT</version> + </parent> + + <artifactId>thermostat-agent-ipc-tcpsocket</artifactId> + <packaging>pom</packaging> + + <name>Thermostat Agent IPC TCP Sockets</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <modules> + <module>common</module> + <module>client</module> + <module>server</module> + </modules> + +</project> +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,119 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright 2012-2016 Red Hat, Inc. + + This file is part of Thermostat. + + Thermostat is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + Thermostat is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Thermostat; see the file COPYING. If not see + <http://www.gnu.org/licenses/>. + + Linking this code with other modules is making a combined work + based on this code. Thus, the terms and conditions of the GNU + General Public License cover the whole combination. + + As a special exception, the copyright holders of this code give + you permission to link this code with independent modules to + produce an executable, regardless of the license terms of these + independent modules, and to copy and distribute the resulting + executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions + of the license of that module. An independent module is a module + which is not derived from or based on this code. If you modify + this code, you may extend this exception to your version of the + library, but you are not obligated to do so. If you do not wish + to do so, delete this exception statement from your version. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket</artifactId> + <version>1.99.12-SNAPSHOT</version> + </parent> + + <artifactId>thermostat-agent-ipc-tcpsocket-server</artifactId> + <packaging>bundle</packaging> + + <name>Thermostat IPC Server</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-shared-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-common-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-common-test</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-Vendor>Red Hat, Inc.</Bundle-Vendor> + <Bundle-Activator>com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.Activator</Bundle-Activator> + <Bundle-SymbolicName>com.redhat.thermostat.agent.ipc.tcpsocket.server</Bundle-SymbolicName> + <Private-Package> + com.redhat.thermostat.agent.ipc.tcpsocket.server.internal + </Private-Package> + <!-- Do not autogenerate uses clauses in Manifests --> + <_nouses>true</_nouses> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + +</project> +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThread.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,170 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.common.utils.LoggingUtils; + +class AcceptThread extends Thread { + + private static final Logger logger = LoggingUtils.getLogger(AcceptThread.class); + private final ExecutorService execService; + private final Selector selector; + private final ClientHandlerCreator handlerCreator; + + private boolean shutdown; + + AcceptThread(Selector selector, ExecutorService execService) { + this(selector, execService, new ClientHandlerCreator()); + } + + AcceptThread(Selector selector, ExecutorService execService, ClientHandlerCreator handlerCreator) { + this.selector = selector; + this.execService = execService; + this.handlerCreator = handlerCreator; + this.shutdown = false; + } + + @Override + public void run() { + logger.info("Ready to accept client connections"); + try { + while (!shutdown) { + int selected = selector.select(); + if (selected < 0) { + // Something bad happened + throw new IOException("Error occurred while selecting channel"); + } else if (selected > 0) { + Set<SelectionKey> selectedKeys = selector.selectedKeys(); + // Use a copy to prevent concurrent modification + Set<SelectionKey> selectedCopy = new HashSet<>(selectedKeys); + for (SelectionKey key : selectedCopy) { + // Remove this key from selected set to indicate we've processed it + selectedKeys.remove(key); + processKey(key); + } + } + } + } catch (IOException e) { + logger.log(Level.SEVERE, "Error occurred during selection", e); + shutdown = true; + } finally { + logger.info("Shutting down"); + execService.shutdown(); + } + } + + private void processKey(SelectionKey key) { + // Ensure this key is still valid and hasn't been cancelled (e.g. by closing the socket) + if (key.readyOps() > 0 && key.isValid()) { + logger.finest("Got selection operation: " + key.readyOps()); + try { + if (key.isAcceptable()) { + // We stored our wrapper socket as the attachment + ThermostatServerSocketChannelImpl channel = + safeGetAttachment(key, ThermostatServerSocketChannelImpl.class); + // Accept client connection + AcceptedSocketChannelImpl client = channel.accept(); + // Create handler for accepted client with provided callbacks + ThermostatIPCCallbacks callbacks = channel.getCallbacks(); + ClientHandler handler = handlerCreator.createHandler(client, execService, callbacks); + // Store handler as attachment + SelectionKey clientKey = client.getSelectionKey(); + clientKey.attach(handler); + logger.fine("Accepted client for \"" + channel.getName() + "\""); + } else { + if (key.isReadable()) { + // Call handler for client to perform read + ClientHandler handler = safeGetAttachment(key, ClientHandler.class); + handler.handleRead(); + } + // Check key hasn't been cancelled by a prior read operation + if (key.isValid() && key.isWritable()) { + // Call handler for client to perform write + ClientHandler handler = safeGetAttachment(key, ClientHandler.class); + handler.handleWrite(); + } + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to process socket event", e); + } + } + } + + void shutdown() throws IOException { + this.shutdown = true; + // Interrupt accept thread + this.interrupt(); + + try { + this.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + boolean isShutdown() { + return shutdown; + } + + private <T> T safeGetAttachment(SelectionKey key, Class<T> clazz) throws IOException { + Object attachment = key.attachment(); + Objects.requireNonNull(attachment, "Expected attachment in SelectionKey"); + if (!clazz.isAssignableFrom(attachment.getClass())) { + throw new IOException("Invalid SelectionKey"); + } + return (T) clazz.cast(attachment); + } + + static class ClientHandlerCreator { + ClientHandler createHandler(AcceptedSocketChannelImpl client, ExecutorService execService, + ThermostatIPCCallbacks callbacks) { + return new ClientHandler(client, execService, callbacks); + } + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptedSocketChannelImpl.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,89 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.ThermostatSocketChannelImpl; + +class AcceptedSocketChannelImpl extends ThermostatSocketChannelImpl { + + private SelectionKey key; + private SocketHelper helper; + + AcceptedSocketChannelImpl(String name, SocketChannel impl, SelectionKey key) { + this(name, impl, key, new SocketHelper()); + } + + AcceptedSocketChannelImpl(String name, SocketChannel impl, SelectionKey key, SocketHelper helper) { + super(name, impl); + this.key = key; + this.helper = helper; + } + + private void unregister() { + if (key != null) { + key.cancel(); + key = null; + } + } + + @Override + public void close() throws IOException { + unregister(); + helper.closeSocket(this); + } + + SelectionKey getSelectionKey() { + return key; + } + + // For use by SocketHelper + private void implClose() throws IOException { + super.close(); + } + + // For testing purposes, methods that can't be mocked + static class SocketHelper { + void closeSocket(AcceptedSocketChannelImpl channel) throws IOException { + channel.implClose(); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/Activator.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,84 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; + +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCPropertiesProvider; +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCPropertiesProvider; + +public class Activator implements BundleActivator { + + private final SelectorProvider provider; + private final IPCServiceHelper helper; + private TcpSocketServerTransport transport; + + public Activator() { + this(SelectorProvider.provider(), new IPCServiceHelper()); + } + + Activator(SelectorProvider provider, IPCServiceHelper helper) { + this.provider = provider; + this.helper = helper; + } + + public void start(final BundleContext context) throws Exception { + // Register the Tcp socket IPC properties provider + context.registerService(IPCPropertiesProvider.class.getName(), new TcpSocketIPCPropertiesProvider(), null); + transport = helper.createTransport(provider); + context.registerService(ServerTransport.class.getName(), transport, null); + } + + @Override + public void stop(BundleContext context) throws Exception { + // Services unregistered automatically + } + + /* For testing purposes */ + static class IPCServiceHelper { + TcpSocketServerTransport createTransport(SelectorProvider provider) { + return new TcpSocketServerTransport(provider); + } + } + +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ClientHandler.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,149 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.AsyncMessageReader; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.AsyncMessageWriter; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageListener; +import com.redhat.thermostat.common.utils.LoggingUtils; + +class ClientHandler implements MessageListener { + + private static final Logger logger = LoggingUtils.getLogger(ClientHandler.class); + + // Increments for each instance made. Only for debugging purposes. + private static final AtomicInteger handlerCount = new AtomicInteger(); + + private final AcceptedSocketChannelImpl client; + private final ExecutorService execService; + private final ThermostatIPCCallbacks callbacks; + private final AsyncMessageReader reader; + private final AsyncMessageWriter writer; + private final MessageCreator messageCreator; + private final int handlerNum; + + ClientHandler(AcceptedSocketChannelImpl client, ExecutorService execService, ThermostatIPCCallbacks callbacks) { + this.client = client; + this.execService = execService; + this.callbacks = callbacks; + this.reader = new AsyncMessageReader(client, this); + this.writer = new AsyncMessageWriter(client); + this.messageCreator = new MessageCreator(); + this.handlerNum = handlerCount.getAndIncrement(); + } + + ClientHandler(AcceptedSocketChannelImpl client, ExecutorService execService, ThermostatIPCCallbacks callbacks, + AsyncMessageReader reader, AsyncMessageWriter writer, MessageCreator messageCreator) { + this.client = client; + this.callbacks = callbacks; + this.execService = execService; + this.reader = reader; + this.writer = writer; + this.messageCreator = messageCreator; + this.handlerNum = handlerCount.getAndIncrement(); + } + + void handleRead() throws IOException { + try { + logger.fine("Got read from client for \"" + client.getName() + "\" [" + handlerNum + "]"); + // Read message from client + reader.readData(); + } catch (IOException e) { + client.close(); + throw new IOException("Communication error from handler " + handlerNum, e); + } + } + + void handleWrite() throws IOException { + try { + logger.fine("Got write for client for \"" + client.getName() + "\" [" + handlerNum + "]"); + // Read message from client + writer.writeData(); + + // If no more messages, remove write from interestOps + if (!writer.hasMoreMessages()) { + SelectionKey key = client.getSelectionKey(); + int ops = key.interestOps(); + key.interestOps(ops & ~SelectionKey.OP_WRITE); + } + } catch (IOException e) { + client.close(); + throw new IOException("Communication error from handler " + handlerNum, e); + } + } + + @Override + public void messageRead(ByteBuffer buf) { + // Create new message and notify caller + final MessageImpl message = messageCreator.createMessage(buf, this); + // Execute callback in a separate thread to ensure we don't block + execService.submit(new Runnable() { + @Override + public void run() { + callbacks.messageReceived(message); + } + }); + } + + @Override + public void writeMessage(ByteBuffer buf) throws IOException { + // Request write with selector + SelectionKey key = client.getSelectionKey(); + int ops = key.interestOps(); + key.interestOps(ops | SelectionKey.OP_WRITE); + + // Enqueue this message for writing when selected + writer.enqueueForWriting(buf); + + // Wakeup selector since we've changed this key's interest set from another thread + key.selector().wakeup(); + } + + static class MessageCreator { + MessageImpl createMessage(ByteBuffer data, MessageListener listener) { + return new MessageImpl(data, listener); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/FilenameValidator.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,57 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +class FilenameValidator { + + private static final String REGEX = "^[A-Za-z0-9-_]*$"; + + private final Pattern pattern; + + FilenameValidator() { + this.pattern = Pattern.compile(REGEX, Pattern.MULTILINE); + } + + boolean validate(String name) { + Matcher matcher = pattern.matcher(name); + return matcher.matches(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/MessageImpl.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.redhat.thermostat.agent.ipc.server.IPCMessage; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageListener; + +public class MessageImpl implements IPCMessage { + + private final ByteBuffer data; + private final MessageListener listener; + + MessageImpl(ByteBuffer data, MessageListener listener) { + this.data = data; + this.listener = listener; + } + + @Override + public ByteBuffer get() { + return data; + } + + @Override + public void reply(ByteBuffer buf) throws IOException { + listener.writeMessage(buf); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransport.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,286 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.nio.file.DirectoryStream; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.nio.file.attribute.UserPrincipal; +import java.nio.file.attribute.UserPrincipalLookupService; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties; +import com.redhat.thermostat.common.utils.LoggingUtils; + +class TcpSocketServerTransport implements ServerTransport { + + private static final Logger logger = LoggingUtils.getLogger(TcpSocketServerTransport.class); + + private final SelectorProvider selectorProvider; + // Access/modification of this field should by synchronized + private final Map<String, ThermostatServerSocketChannelImpl> sockets; + private final ExecutorService execService; + private final FileUtils fileUtils; + private final ChannelUtils channelUtils; + private final ThreadCreator threadCreator; + + private TcpSocketIPCProperties props; + private AcceptThread acceptThread; + private Selector selector; + + TcpSocketServerTransport(SelectorProvider selectorProvider) { + this(selectorProvider, Executors.newFixedThreadPool(determineDefaultThreadPoolSize(), new CountingThreadFactory()), + new FileUtils(), new ThreadCreator(), new ChannelUtils()); + } + + TcpSocketServerTransport(SelectorProvider selectorProvider, ExecutorService execService, + FileUtils fileUtils, ThreadCreator threadCreator, ChannelUtils channelCreator) { + this.selectorProvider = selectorProvider; + this.sockets = new HashMap<>(); + this.execService = execService; + this.fileUtils = fileUtils; + this.channelUtils = channelCreator; + this.threadCreator = threadCreator; + } + + @Override + public void start(IPCProperties props) throws IOException { + if (!(props instanceof TcpSocketIPCProperties)) { + IPCType type = props.getType(); + throw new IOException("Unsupported IPC type: " + type.getConfigValue()); + } + this.props = (TcpSocketIPCProperties) props; + // Prepare socket directory with strict permissions, which will contain the socket file when bound + + // Open the Selector and start accepting connections + this.selector = selectorProvider.openSelector(); + this.acceptThread = threadCreator.createAcceptThread(selector, execService); + //acceptThread.start(); + logger.info("Agent IPC service started"); + } + + @Override + public IPCType getType() { + return IPCType.TCP_SOCKET; + } + + + private void checkName(String name) throws IOException { + Objects.requireNonNull(name, "Server name cannot be null"); + if (name.isEmpty()) { + throw new IOException("Server name cannot be empty"); + } + } + + @Override + public synchronized void createServer(String name, ThermostatIPCCallbacks callbacks) throws IOException { + // Check if the socket has already been created and we know about it + if (sockets.containsKey(name)) { + throw new IOException("IPC server with name \"" + name + "\" already exists"); + } + + // TODO - check permissions, owner, etc if appropriate for protocol + + final SocketAddress addr = this.props.getSocketAddr(name); + + // Create socket + ThermostatServerSocketChannelImpl socket = + channelUtils.createServerSocketChannel(name, addr, callbacks, props, selector); + + sockets.put(name, socket); + if (!acceptThread.isAlive()) + acceptThread.start(); + } + + @Override + public synchronized boolean serverExists(String name) throws IOException { + return sockets.containsKey(name); + } + + @Override + public synchronized void destroyServer(String name) throws IOException { + if (!sockets.containsKey(name)) { + throw new IOException("IPC server with name \"" + name + "\" does not exist"); + } + // Remove socket from known sockets + ThermostatServerSocketChannelImpl socket = sockets.remove(name); + + try { + // Close socket + socket.close(); + } finally { + ; + } + } + + @Override + public void shutdown() throws IOException { + // Stop accepting connections and close selector afterward + acceptThread.shutdown(); + channelUtils.closeSelector(selector); + logger.info("Agent IPC service stopped"); + } + + private static int determineDefaultThreadPoolSize() { + // Make the number of default thread pool size a function of available + // processors. + return Runtime.getRuntime().availableProcessors() * 2; + } + + private static class CountingThreadFactory implements ThreadFactory { + + private final AtomicInteger threadCount; + + private CountingThreadFactory() { + this.threadCount = new AtomicInteger(); + } + + @Override + public Thread newThread(Runnable r) { + // Create threads with a recognizable name + return new Thread(r, "AcceptThread-" + threadCount.incrementAndGet()); + } + + } + + + /* For testing purposes */ + Map<String, ThermostatServerSocketChannelImpl> getSockets() { + return sockets; + } + + /* for testing purposes */ + Thread getAcceptThread() { return acceptThread; } + + /* For testing purposes */ + static class FileUtils { + + boolean exists(Path path) { + return Files.exists(path); + } + + void delete(Path path) throws IOException { + Files.delete(path); + } + + boolean isDirectory(Path path) { + return Files.isDirectory(path); + } + + Path createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException { + return Files.createDirectory(dir, attrs); + } + + Path createDirectories(Path dir, FileAttribute<?>... attrs) throws IOException { + return Files.createDirectories(dir, attrs); + } + + Set<PosixFilePermission> getPosixFilePermissions(Path path) throws IOException { + return Files.getPosixFilePermissions(path); + } + + FileAttribute<Set<PosixFilePermission>> toFileAttribute(Set<PosixFilePermission> perms) { + return PosixFilePermissions.asFileAttribute(perms); + } + + DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException { + return Files.newDirectoryStream(dir); + } + + UserPrincipalLookupService getUserPrincipalLookupService() { + return FileSystems.getDefault().getUserPrincipalLookupService(); + } + + String getUsername() { + return System.getProperty("user.name"); + } + + UserPrincipal getOwner(Path path) throws IOException { + return Files.getOwner(path); + } + + } + + /* For testing purposes */ + static class ThreadCreator { + AcceptThread createAcceptThread(Selector selector, ExecutorService execService) { + return new AcceptThread(selector, execService); + } + } + + /* For testing purposes */ + static class ChannelUtils { + ThermostatServerSocketChannelImpl createServerSocketChannel(String name, SocketAddress addr, + ThermostatIPCCallbacks callbacks, IPCProperties props, Selector selector) throws IOException { + return ThermostatServerSocketChannelImpl.open(name, addr, callbacks, props, selector); + } + void closeSelector(Selector selector) throws IOException { + selector.close(); + } + } + + /* For testing purposes */ + static class ProcessCreator { + Process startProcess(ProcessBuilder builder) throws IOException { + return builder.start(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/main/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImpl.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,175 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import java.io.File; +import java.io.IOException; + +import java.net.SocketAddress; +import java.nio.channels.Channel; +import java.nio.channels.SocketChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AbstractInterruptibleChannel; +import java.nio.channels.spi.AbstractSelectableChannel; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; + +class ThermostatServerSocketChannelImpl implements Channel { + + // See manual for unix(7) + private static final int UNIX_PATH_MAX = 108; + + private static TcpServerSocketChannelHelper channelHelper = new TcpServerSocketChannelHelper(); + + private final String name; + private final ServerSocketChannel impl; + private final SocketAddress addr; + private final ThermostatIPCCallbacks callbacks; + private final Selector selector; + + private SelectionKey key; + + ThermostatServerSocketChannelImpl(String name, ServerSocketChannel impl, SocketAddress addr, + ThermostatIPCCallbacks callbacks, Selector selector, SelectionKey key) { + this.name = name; + this.impl = impl; + this.addr = addr; + this.callbacks = callbacks; + this.selector = selector; + this.key = key; + } + + static ThermostatServerSocketChannelImpl open(String name, SocketAddress addr, ThermostatIPCCallbacks callbacks, + IPCProperties props, Selector selector) throws IOException { + + ServerSocketChannel impl = channelHelper.open(); + + // Bind to socket file + channelHelper.bind(impl, addr); + // Set non-blocking + channelHelper.configureBlocking(impl, false); + // Register for selection + SelectionKey key = channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT); + // Attach wrapper socket to key, for use in select loop + ThermostatServerSocketChannelImpl sock = + new ThermostatServerSocketChannelImpl(name, impl, addr, callbacks, selector, key); + channelHelper.attachToKey(key, sock); + // Send wakeup to trigger re-selection + selector.wakeup(); + return sock; + } + + public boolean isOpen() { + return channelHelper.isOpen(impl); + } + + private void unregister() { + key.cancel(); + } + + AcceptedSocketChannelImpl accept() throws IOException { + if (!isOpen()) { + throw new IOException("Socket is closed"); + } + SocketChannel clientImpl = impl.accept(); + // Set non-blocking + channelHelper.configureBlocking(clientImpl, false); + // Register for selection + SelectionKey key = channelHelper.register(clientImpl, selector, SelectionKey.OP_READ); + return new AcceptedSocketChannelImpl(name, clientImpl, key); + } + + SocketAddress getSocketAddr() { + return addr; + } + + ThermostatIPCCallbacks getCallbacks() { + return callbacks; + } + + String getName() { + return name; + } + + public void close() throws IOException { + unregister(); + channelHelper.close(impl); + } + + // ---- For testing purposes ---- + + // Wraps methods that can't be mocked + static class TcpServerSocketChannelHelper { + ServerSocketChannel open() throws IOException { + return ServerSocketChannel.open(); + } + + void bind(ServerSocketChannel channel, SocketAddress addr) throws IOException { + channel.socket().bind(addr); + } + + SelectionKey register(SelectableChannel channel, Selector sel, int ops) throws IOException { + sel.wakeup(); + return channel.register(sel, ops); + } + + SelectableChannel configureBlocking(AbstractSelectableChannel channel, boolean block) throws IOException { + return channel.configureBlocking(block); + } + + Object attachToKey(SelectionKey key, Object attachment) { + return key.attach(attachment); + } + + boolean isOpen(ServerSocketChannel channel) { + return channel.isOpen(); + } + + void close(AbstractInterruptibleChannel channel) throws IOException { + channel.close(); + } + } + + static void setChannelHelper(TcpServerSocketChannelHelper helper) { + ThermostatServerSocketChannelImpl.channelHelper = helper; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptThreadTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,298 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.AcceptThread.ClientHandlerCreator; + +public class AcceptThreadTest { + + private Selector selector; + private ExecutorService execService; + private ClientHandlerCreator handlerCreator; + private ThermostatServerSocketChannelImpl serverSock; + private AcceptedSocketChannelImpl clientSock; + private ThermostatIPCCallbacks callbacks; + private ClientHandler handler; + private SelectionKey clientKey; + private SelectionKey acceptKey; + private AcceptThread thread; + + @Before + public void setUp() throws IOException { + selector = mock(Selector.class); + + // Mock sockets + serverSock = mock(ThermostatServerSocketChannelImpl.class); + clientSock = mock(AcceptedSocketChannelImpl.class); + when(serverSock.accept()).thenReturn(clientSock); + callbacks = mock(ThermostatIPCCallbacks.class); + when(serverSock.getCallbacks()).thenReturn(callbacks); + + // Mock selection keys + clientKey = mock(SelectionKey.class); + when(clientKey.readyOps()).thenReturn(SelectionKey.OP_READ); + when(clientKey.isValid()).thenReturn(true); + when(clientSock.getSelectionKey()).thenReturn(clientKey); + acceptKey = mock(SelectionKey.class); + acceptKey.attach(serverSock); + when(acceptKey.readyOps()).thenReturn(SelectionKey.OP_ACCEPT); + when(acceptKey.isValid()).thenReturn(true); + + execService = mock(ExecutorService.class); + handlerCreator = mock(ClientHandlerCreator.class); + handler = mock(ClientHandler.class); + clientKey.attach(handler); + when(handlerCreator.createHandler(clientSock, execService, callbacks)).thenReturn(handler); + thread = new AcceptThread(selector, execService, handlerCreator); + } + + @Test + public void testSelectOneAccept() throws IOException { + mockSelectionKeys(acceptKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(serverSock).accept(); + verify(handlerCreator).createHandler(clientSock, execService, callbacks); + assertEquals(handler, clientKey.attachment()); + + verify(handler, never()).handleRead(); + verify(handler, never()).handleWrite(); + } + + @Test + public void testSelectOneAcceptInvalid() throws IOException { + when(acceptKey.isValid()).thenReturn(false); + mockSelectionKeys(acceptKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(serverSock, never()).accept(); + assertEquals(handler, clientKey.attachment()); + + verify(handler, never()).handleRead(); + verify(handler, never()).handleWrite(); + } + + private void selectAndShutdown(AcceptThread thread, int returnValue) throws IOException { + selectAndShutdown(thread, 1, new int[] { returnValue }); + } + + private void selectAndShutdown(AcceptThread thread, int numIterations, int[] returnValues) throws IOException { + when(selector.select()).thenAnswer(new SelectAnswer(thread, numIterations, returnValues)); + } + + @Test + public void testSelectOneRead() throws IOException { + mockSelectionKeys(clientKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(handler).handleRead(); + + verify(serverSock, never()).accept(); + } + + @Test + public void testSelectOneAcceptAndRead() throws IOException { + mockSelectionKeys(acceptKey, clientKey); + selectAndShutdown(thread, 2, new int[] { 1, 1 }); + thread.run(); + + verify(selector, times(2)).select(); + verify(serverSock).accept(); + verify(handlerCreator).createHandler(clientSock, execService, callbacks); + assertEquals(handler, clientKey.attachment()); + + verify(handler).handleRead(); + } + + @Test + public void testSelectTwoAcceptAndRead() throws IOException { + mockSelectionKeys(acceptKey, clientKey); + selectAndShutdown(thread, 2); + thread.run(); + + verify(selector).select(); + verify(serverSock).accept(); + verify(handlerCreator).createHandler(clientSock, execService, callbacks); + assertEquals(handler, clientKey.attachment()); + + verify(handler).handleRead(); + } + + @Test + public void testSelectOneAcceptAndReadWithZeroReturn() throws IOException { + mockSelectionKeys(acceptKey, clientKey); + selectAndShutdown(thread, 3, new int[] { 1, 0, 1 }); + thread.run(); + + verify(selector, times(3)).select(); + verify(serverSock).accept(); + verify(handlerCreator).createHandler(clientSock, execService, callbacks); + assertEquals(handler, clientKey.attachment()); + + verify(handler).handleRead(); + } + + @Test + public void testSelectOneWrite() throws IOException { + when(clientKey.readyOps()).thenReturn(SelectionKey.OP_WRITE); + mockSelectionKeys(clientKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(handler).handleWrite(); + + verify(serverSock, never()).accept(); + verify(handler, never()).handleRead(); + } + + @Test + public void testSelectOneAcceptAndWrite() throws IOException { + when(clientKey.readyOps()).thenReturn(SelectionKey.OP_WRITE); + mockSelectionKeys(acceptKey, clientKey); + selectAndShutdown(thread, 2, new int[] { 1, 1 }); + thread.run(); + + verify(selector, times(2)).select(); + verify(serverSock).accept(); + verify(handlerCreator).createHandler(clientSock, execService, callbacks); + assertEquals(handler, clientKey.attachment()); + + verify(handler).handleWrite(); + } + + @Test + public void testSelectReadAndWrite() throws IOException { + when(clientKey.readyOps()).thenReturn(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + mockSelectionKeys(clientKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(handler).handleRead(); + verify(handler).handleWrite(); + verify(serverSock, never()).accept(); + } + + @Test + public void testSelectInvalidWrite() throws IOException { + when(clientKey.isValid()).thenReturn(true).thenReturn(false); + when(clientKey.readyOps()).thenReturn(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + mockSelectionKeys(clientKey); + selectAndShutdown(thread, 1); + thread.run(); + + verify(selector).select(); + verify(handler).handleRead(); + verify(handler, never()).handleWrite(); + verify(serverSock, never()).accept(); + } + + @Test + public void testSelectError() throws IOException { + mockSelectionKeys(acceptKey); + // Use numIterations == 2 to bypass normal shutdown + selectAndShutdown(thread, 2, new int[] { -1 }); + + thread.run(); + + verify(selector).select(); + assertTrue(thread.isShutdown()); + } + + private void mockSelectionKeys(SelectionKey... key) { + Set<SelectionKey> keys = new HashSet<>(); + for (SelectionKey k : key) { + keys.add(k); + } + when(selector.selectedKeys()).thenReturn(keys); + } + + private static class SelectAnswer implements Answer<Integer> { + + private final AcceptThread thread; + private final int numIterations; + private final int[] returnValues; + + private int count; + + private SelectAnswer(AcceptThread thread, int numIterations, int[] returnValues) { + this.thread = thread; + this.numIterations = numIterations; + this.returnValues = returnValues; + this.count = 0; + } + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + if (count + 1 >= numIterations) { + // Call shutdown to end select loop after this iteration + thread.shutdown(); + } + int retVal = returnValues[count]; + count++; + return retVal; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/AcceptedSocketChannelImplTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,82 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.AcceptedSocketChannelImpl; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.AcceptedSocketChannelImpl.SocketHelper; + +public class AcceptedSocketChannelImplTest { + + private AcceptedSocketChannelImpl channel; + private SelectionKey key; + private SocketHelper helper; + + @Before + public void setup() throws Exception { + key = mock(SelectionKey.class); + SocketChannel impl = mock(SocketChannel.class); + helper = mock(SocketHelper.class); + channel = new AcceptedSocketChannelImpl("test", impl, key, helper); + } + + @Test + public void testUnregisterOnClose() throws Exception { + // Verify selection key is cancelled when channel is closed + channel.close(); + verify(key).cancel(); + verify(helper).closeSocket(channel); + } + + @Test + public void testGetSelectionKey() throws Exception { + assertEquals(key, channel.getSelectionKey()); + channel.close(); + assertNull(channel.getSelectionKey()); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ActivatorTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,80 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.channels.spi.SelectorProvider; + +import org.junit.Before; +import org.junit.Test; +import org.osgi.framework.ServiceReference; + +import com.redhat.thermostat.agent.ipc.server.ServerTransport; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.Activator.IPCServiceHelper; +import com.redhat.thermostat.testutils.StubBundleContext; + +public class ActivatorTest { + + private Activator activator; + private TcpSocketServerTransport service; + + @Before + public void setUp() throws Exception { + SelectorProvider provider = mock(SelectorProvider.class); + IPCServiceHelper helper = mock(IPCServiceHelper.class); + service = mock(TcpSocketServerTransport.class); + when(helper.createTransport(eq(provider))).thenReturn(service); + + activator = new Activator(provider, helper); + } + + @Test + public void verifyServiceIsRegistered() throws Exception { + StubBundleContext context = new StubBundleContext(); + + activator.start(context); + + assertEquals(2, context.getAllServices().size()); + ServiceReference ref = context.getServiceReference(ServerTransport.class); + assertEquals(service, context.getService(ref)); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ClientHandlerTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,188 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.concurrent.ExecutorService; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.AsyncMessageReader; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.AsyncMessageWriter; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageListener; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.ClientHandler.MessageCreator; + +public class ClientHandlerTest { + + private AcceptedSocketChannelImpl client; + private ThermostatIPCCallbacks callbacks; + private SelectionKey key; + private Selector selector; + private AsyncMessageReader reader; + private AsyncMessageWriter writer; + private ExecutorService execService; + private MessageCreator messageCreator; + private MessageImpl message; + private ClientHandler handler; + + @Before + public void setup() throws Exception { + client = mock(AcceptedSocketChannelImpl.class); + + key = mock(SelectionKey.class); + selector = mock(Selector.class); + when(key.selector()).thenReturn(selector); + when(client.getSelectionKey()).thenReturn(key); + + callbacks = mock(ThermostatIPCCallbacks.class); + reader = mock(AsyncMessageReader.class); + writer = mock(AsyncMessageWriter.class); + execService = mock(ExecutorService.class); + messageCreator = mock(MessageCreator.class); + message = mock(MessageImpl.class); + when(messageCreator.createMessage(any(ByteBuffer.class), any(MessageListener.class))).thenReturn(message); + handler = new ClientHandler(client, execService, callbacks, reader, writer, messageCreator); + } + + @Test + public void testRead() throws Exception { + handler.handleRead(); + verify(reader).readData(); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testReadException() throws Exception { + doThrow(new IOException()).when(reader).readData(); + + try { + handler.handleRead(); + fail("Expected IOException"); + } catch (IOException e) { + // Should close channel + verify(reader).readData(); + verify(client).close(); + } + } + + @Test + public void testWrite() throws Exception { + final int ops = 8000; + when(key.interestOps()).thenReturn(ops); + handler.handleWrite(); + verify(writer).writeData(); + + // Check write removed from interest set + verify(key).interestOps(ops & ~SelectionKey.OP_WRITE); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testWriteMoreMessages() throws Exception { + when(writer.hasMoreMessages()).thenReturn(true); + handler.handleWrite(); + verify(writer).writeData(); + + // Check write not removed from interest set + verify(key, never()).interestOps(anyInt()); + + // Should not close connection + verify(client, never()).close(); + } + + @Test + public void testWriteException() throws Exception { + doThrow(new IOException()).when(writer).writeData(); + + try { + handler.handleWrite(); + fail("Expected IOException"); + } catch (IOException e) { + // Should close channel + verify(writer).writeData(); + verify(client).close(); + } + } + + @Test + public void testMessageRead() throws Exception { + ByteBuffer buf = mock(ByteBuffer.class); + handler.messageRead(buf); + verify(messageCreator).createMessage(buf, handler); + + // Check callback notified + ArgumentCaptor<Runnable> runCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(execService).submit(runCaptor.capture()); + Runnable runnable = runCaptor.getValue(); + runnable.run(); + verify(callbacks).messageReceived(message); + } + + @Test + public void testWriteMessage() throws Exception { + final int ops = 8000; + when(key.interestOps()).thenReturn(ops); + ByteBuffer buf = mock(ByteBuffer.class); + handler.writeMessage(buf); + + // Check write added to interest set + verify(key).interestOps(ops | SelectionKey.OP_WRITE); + + verify(writer).enqueueForWriting(buf); + verify(selector).wakeup(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/FilenameValidatorTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,86 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.FilenameValidator; + +public class FilenameValidatorTest { + + private FilenameValidator validator; + + @Before + public void setUp() { + validator = new FilenameValidator(); + } + + @Test + public void testGoodName() { + boolean result = validator.validate("g00d-socKet_nam3"); + assertTrue(result); + } + + @Test + public void testSlash() { + boolean result = validator.validate("my/bad/socket"); + assertFalse(result); + } + + @Test + public void testNull() { + boolean result = validator.validate("myBadS\0cket"); + assertFalse(result); + } + + @Test + public void testDot() { + boolean result = validator.validate("myBadS.cket"); + assertFalse(result); + } + + @Test + public void testNewline() { + boolean result = validator.validate("myBadS\ncket"); + assertFalse(result); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/MessageImplTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.MessageListener; + +public class MessageImplTest { + + private ByteBuffer data; + private MessageListener listener; + private MessageImpl message; + + @Before + public void setUp() { + data = mock(ByteBuffer.class); + listener = mock(MessageListener.class); + message = new MessageImpl(data, listener); + } + + @Test + public void testGetData() { + assertEquals(data, message.get()); + } + + @Test + public void testSendReply() throws Exception { + ByteBuffer reply = mock(ByteBuffer.class); + message.reply(reply); + verify(listener).writeMessage(reply); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/TcpSocketServerTransportTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,249 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySetOf; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.net.InetAddress; + +import java.nio.channels.spi.AbstractSelector; +import java.nio.channels.spi.SelectorProvider; +import java.nio.file.DirectoryStream; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.nio.file.attribute.UserPrincipal; +import java.nio.file.attribute.UserPrincipalLookupService; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.common.internal.IPCType; +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.common.internal.TcpSocketIPCProperties; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ChannelUtils; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.FileUtils; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.TcpSocketServerTransport.ThreadCreator; + +public class TcpSocketServerTransportTest { + + private static final String SERVER_NAME = "test"; + private static final String BAD_SERVER_NAME = "junk"; + private static final String USERNAME = "testUser"; + private static final int TEST_PORT = 9876; + + private TcpSocketServerTransport transport; + private SelectorProvider provider; + private AbstractSelector selector; + private ExecutorService execService; + private FileUtils fileUtils; + private AcceptThread acceptThread; + private ThreadCreator threadCreator; + private ThermostatIPCCallbacks callbacks; + private ChannelUtils channelUtils; + private ThermostatServerSocketChannelImpl channel; + private TcpSocketIPCProperties props; + private UserPrincipalLookupService lookup; + private SocketAddress addr; + + @SuppressWarnings("unchecked") + @Before + public void setup() throws Exception { + provider = mock(SelectorProvider.class); + selector = mock(AbstractSelector.class); + when(provider.openSelector()).thenReturn(selector); + + + fileUtils = mock(FileUtils.class); + addr = mock(SocketAddress.class); + + props = mock(TcpSocketIPCProperties.class); + //when(props.getSocketAddr(SERVER_NAME)).thenReturn(new InetSocketAddress(InetAddress.getLoopbackAddress(),TEST_PORT)); + when(props.getSocketAddr(SERVER_NAME)).thenReturn(addr); + when(props.getType()).thenReturn(IPCType.TCP_SOCKET); + doThrow(new IOException()).when(props).getSocketAddr(BAD_SERVER_NAME); + + lookup = mock(UserPrincipalLookupService.class); + when(fileUtils.getUserPrincipalLookupService()).thenReturn(lookup); + when(fileUtils.getUsername()).thenReturn(USERNAME); + UserPrincipal principal = mock(UserPrincipal.class); + when(lookup.lookupPrincipalByName(USERNAME)).thenReturn(principal); + + execService = mock(ExecutorService.class); + + acceptThread = mock(AcceptThread.class); + threadCreator = mock(ThreadCreator.class); + when(threadCreator.createAcceptThread(selector, execService)).thenReturn(acceptThread); + + channelUtils = mock(ChannelUtils.class); + channel = mock(ThermostatServerSocketChannelImpl.class); + + callbacks = mock(ThermostatIPCCallbacks.class); + when(channelUtils.createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector)).thenReturn(channel); + + transport = new TcpSocketServerTransport(provider, execService, fileUtils, + threadCreator, channelUtils); + } + + @Test + public void testInit() throws Exception { + transport.start(props); + verify(provider).openSelector(); + verify(threadCreator).createAcceptThread(selector, execService); + } + + @Test(expected=IOException.class) + public void testStartBadProperties() throws Exception { + // Not TcpSocketIPCProperties + IPCProperties badProps = mock(IPCProperties.class); + when(badProps.getType()).thenReturn(IPCType.UNKNOWN); + transport = new TcpSocketServerTransport(provider, execService, fileUtils, + threadCreator, channelUtils); + transport.start(badProps); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testStartSuccess() throws Exception { + transport.start(props); + + // this test (in unixsocket) used to verify the thread to accept connections is started + // this is not true for TCP - locks up verify(acceptThread).start(); + + // instead, make sure we haven't started the new thread + verify(acceptThread,never()).start(); + + assertEquals(acceptThread, transport.getAcceptThread()); + } + + @Test + public void testShutdownSuccess() throws Exception { + transport.start(props); + transport.shutdown(); + verify(acceptThread).shutdown(); + verify(channelUtils).closeSelector(selector); + } + + //@Test + @Test(expected=IOException.class) + public void testShutdownFailure() throws Exception { + transport.start(props); + doThrow(new IOException()).when(acceptThread).shutdown(); + + //try { + transport.shutdown(); + fail("Expected IO Exception"); + //} catch (IOException e) { + //} + } + + @Test + public void testCreateServer() throws Exception { + transport.start(props); + transport.createServer(SERVER_NAME, callbacks); + checkChannel(); + } + + private void checkChannel() throws IOException { + verify(channelUtils).createServerSocketChannel(SERVER_NAME, addr, callbacks, props, selector); + ThermostatServerSocketChannelImpl result = transport.getSockets().get(SERVER_NAME); + assertEquals(channel, result); + } + + @Test(expected=IOException.class) + public void testCreateServerServerExists() throws Exception { + ThermostatServerSocketChannelImpl channel = mock(ThermostatServerSocketChannelImpl.class); + transport.getSockets().put(SERVER_NAME, channel); + transport.createServer(SERVER_NAME, callbacks); + } + + @Test(expected=IOException.class) + public void testCreateServerUnknownName() throws Exception { + transport.start(props); + transport.createServer(BAD_SERVER_NAME, callbacks); + } + + @Test + public void testServerExists() throws Exception { + assertFalse(transport.serverExists(SERVER_NAME)); + transport.getSockets().put(SERVER_NAME, channel); + assertTrue(transport.serverExists(SERVER_NAME)); + } + + @Test + public void testDestroyServer() throws Exception { + transport.getSockets().put(SERVER_NAME, channel); + transport.destroyServer(SERVER_NAME); + verify(channel).close(); + } + + @Test(expected=IOException.class) + public void testDestroyServerNotExist() throws Exception { + transport.destroyServer(BAD_SERVER_NAME); + } + + //@Test + @Test(expected=IOException.class) + public void testDestroyServerCloseFails() throws Exception { + doThrow(new IOException()).when(channel).close(); + transport.start(props); + transport.getSockets().put(SERVER_NAME, channel); + transport.destroyServer(SERVER_NAME); + //fail("Expected IOException"); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/agent/ipc/tcp-socket/server/src/test/java/com/redhat/thermostat/agent/ipc/tcpsocket/server/internal/ThermostatServerSocketChannelImplTest.java Fri Sep 30 11:20:00 2016 -0400 @@ -0,0 +1,183 @@ +/* + * Copyright 2012-2016 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.agent.ipc.tcpsocket.server.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.nio.channels.ServerSocketChannel; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.agent.ipc.common.internal.IPCProperties; +import com.redhat.thermostat.agent.ipc.server.ThermostatIPCCallbacks; +import com.redhat.thermostat.agent.ipc.tcpsocket.server.internal.ThermostatServerSocketChannelImpl.TcpServerSocketChannelHelper; + +public class ThermostatServerSocketChannelImplTest { + + private static final String SERVER_NAME = "test"; + + private TcpServerSocketChannelHelper channelHelper; + private SocketAddress addr; + private ServerSocketChannel impl; + private Selector selector; + private SelectionKey key; + private ThermostatIPCCallbacks callbacks; + private IPCProperties props; + + @Before + public void setUp() throws IOException { + channelHelper = mock(TcpServerSocketChannelHelper.class); + ThermostatServerSocketChannelImpl.setChannelHelper(channelHelper); + addr = mock(SocketAddress.class); + + impl = mock(ServerSocketChannel.class); + when(channelHelper.open()).thenReturn(impl); + when(channelHelper.isOpen(impl)).thenReturn(true); + + selector = mock(Selector.class); + callbacks = mock(ThermostatIPCCallbacks.class); + + key = mock(SelectionKey.class); + when(channelHelper.register(impl, selector, SelectionKey.OP_ACCEPT)).thenReturn(key); + props = mock(IPCProperties.class); + File propFile = mock(File.class); + when(props.getPropertiesFile()).thenReturn(propFile); + } + + @After + public void tearDown() { + ThermostatServerSocketChannelImpl.setChannelHelper(new TcpServerSocketChannelHelper()); + } + + @Test + public void testOpen() throws Exception { + ThermostatServerSocketChannelImpl channel = createChannel(); + verifyOpenSuccess(channel); + } + + @Test + public void testCreateChannel() throws Exception { + try { + createChannel(); + } catch (IOException e) { + verifyOpenFailure(); + } + } + + private void verifyOpenSuccess(ThermostatServerSocketChannelImpl channel) throws IOException { + verify(channelHelper).open(); + verify(channelHelper).bind(impl, addr); + verify(channelHelper).configureBlocking(impl, false); + verify(channelHelper).register(impl, selector, SelectionKey.OP_ACCEPT); + verify(channelHelper).attachToKey(key, channel); + verify(selector).wakeup(); + + assertEquals(SERVER_NAME, channel.getName()); + assertEquals(callbacks, channel.getCallbacks()); + } + + private void verifyOpenFailure() throws IOException { + verify(channelHelper, never()).open(); + verify(channelHelper, never()).bind(impl, addr); + verify(channelHelper, never()).configureBlocking(impl, false); + verify(channelHelper, never()).register(impl, selector, SelectionKey.OP_ACCEPT); + verify(channelHelper, never()).attachToKey(eq(key), any()); + verify(selector, never()).wakeup(); + } + + @Test + public void testIsOpen() throws IOException { + ThermostatServerSocketChannelImpl channel = createChannel(); + channel.isOpen(); + verify(channelHelper).isOpen(impl); + } + + @Test + public void testAccept() throws IOException { + ThermostatServerSocketChannelImpl channel = createChannel(); + SocketChannel clientImpl = mock(SocketChannel.class); + when(impl.accept()).thenReturn(clientImpl); + channel.accept(); + verify(impl).accept(); + verify(channelHelper).configureBlocking(clientImpl, false); + verify(channelHelper).register(clientImpl, selector, SelectionKey.OP_READ); + } + + @Test + public void testAcceptClosed() throws IOException { + when(channelHelper.isOpen(impl)).thenReturn(false); + ThermostatServerSocketChannelImpl channel = createChannel(); + SocketChannel clientImpl = mock(SocketChannel.class); + when(impl.accept()).thenReturn(clientImpl); + + try { + channel.accept(); + fail("Expected IOException"); + } catch (IOException e) { + verify(impl, never()).accept(); + } + } + + @Test + public void testClose() throws IOException { + ThermostatServerSocketChannelImpl channel = createChannel(); + channel.close(); + verify(key).cancel(); + verify(channelHelper).close(impl); + } + + private ThermostatServerSocketChannelImpl createChannel() throws IOException { + return ThermostatServerSocketChannelImpl.open(SERVER_NAME, addr, callbacks, props, selector); + } + +}
--- a/dev/ipc-test/test-launcher/src/main/resources/com/redhat/thermostat/dev/ipc/test/launcher/internal/bundles.properties Fri Sep 30 10:15:14 2016 -0400 +++ b/dev/ipc-test/test-launcher/src/main/resources/com/redhat/thermostat/dev/ipc/test/launcher/internal/bundles.properties Fri Sep 30 11:20:00 2016 -0400 @@ -3,6 +3,8 @@ ${thermostat.home}/libs/thermostat-agent-ipc-unixsocket-server-${project.version}.jar, \ ${thermostat.home}/libs/thermostat-agent-ipc-common-${project.version}.jar, \ ${thermostat.home}/libs/thermostat-agent-ipc-unixsocket-common-${project.version}.jar, \ + ${thermostat.home}/libs/thermostat-agent-ipc-tcpsocket-server-${project.version}.jar, \ + ${thermostat.home}/libs/thermostat-agent-ipc-tcpsocket-common-${project.version}.jar, \ ${thermostat.home}/libs/thermostat-common-core-${project.version}.jar, \ ${thermostat.home}/libs/thermostat-shared-config-${project.version}.jar, \ ${thermostat.home}/libs/jnr-unixsocket-${jnr-unixsocket.version}.jar, \
--- a/distribution/assembly/core-assembly-windows.xml Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/assembly/core-assembly-windows.xml Fri Sep 30 11:20:00 2016 -0400 @@ -61,16 +61,14 @@ <include>com.redhat.thermostat:thermostat-agent-command</include> <include>com.redhat.thermostat:thermostat-agent-command-server</include> <include>com.redhat.thermostat:thermostat-agent-proxy-server</include> - <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-server</include> - <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-client</include> - <!--include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-server</include> - <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-client</include--> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-server</include> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-client</include> <include>com.redhat.thermostat:thermostat-common-core</include> <include>com.redhat.thermostat:thermostat-common-command</include> <include>com.redhat.thermostat:thermostat-process-handler</include> <include>com.redhat.thermostat:thermostat-storage-cli</include> <include>com.redhat.thermostat:thermostat-storage-mongodb</include> - <!--include>com.redhat.thermostat:thermostat-keyring</include--> + <include>com.redhat.thermostat:thermostat-keyring</include> <include>com.redhat.thermostat:thermostat-web-client</include> <include>com.redhat.thermostat:thermostat-system-backend</include> <include>com.redhat.thermostat:thermostat-laf-utils</include>
--- a/distribution/assembly/core-assembly.xml Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/assembly/core-assembly.xml Fri Sep 30 11:20:00 2016 -0400 @@ -61,6 +61,8 @@ <include>com.redhat.thermostat:thermostat-agent-command</include> <include>com.redhat.thermostat:thermostat-agent-command-server</include> <include>com.redhat.thermostat:thermostat-agent-proxy-server</include> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-server</include> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-client</include> <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-server</include> <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-client</include> <include>com.redhat.thermostat:thermostat-common-core</include>
--- a/distribution/config/commands/agent.properties Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/config/commands/agent.properties Fri Sep 30 11:20:00 2016 -0400 @@ -1,6 +1,7 @@ bundles = com.redhat.thermostat.web.client=${project.version}, \ com.redhat.thermostat.agent.cli=${project.version}, \ com.redhat.thermostat.backend.system=${project.version}, \ + com.redhat.thermostat.agent.ipc.tcpsocket.server=${project.version}, \ com.redhat.thermostat.agent.ipc.unixsocket.server=${project.version}, \ ${jffi-native.bundle.symbolic.name}=${jffi.version}, \ com.redhat.thermostat.storage.mongodb=${project.version}, \
--- a/distribution/config/commands/service.properties Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/config/commands/service.properties Fri Sep 30 11:20:00 2016 -0400 @@ -3,6 +3,7 @@ com.redhat.thermostat.agent.command=${project.version}, \ com.redhat.thermostat.storage.cli=${project.version}, \ com.redhat.thermostat.agent.cli=${project.version}, \ + com.redhat.thermostat.agent.ipc.tcpsocket.server=${project.version}, \ com.redhat.thermostat.agent.ipc.unixsocket.server=${project.version}, \ ${jffi-native.bundle.symbolic.name}=${jffi.version}, \
--- a/distribution/pom.xml Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -84,6 +84,16 @@ <artifactId>thermostat-keyring</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-unixsocket-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-unixsocket-client</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <plugins> @@ -501,12 +511,12 @@ </dependency> <dependency> <groupId>com.redhat.thermostat</groupId> - <artifactId>thermostat-agent-ipc-unixsocket-server</artifactId> + <artifactId>thermostat-agent-ipc-tcpsocket-server</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.redhat.thermostat</groupId> - <artifactId>thermostat-agent-ipc-unixsocket-client</artifactId> + <artifactId>thermostat-agent-ipc-tcpsocket-client</artifactId> <version>${project.version}</version> </dependency> <dependency>
--- a/distribution/scripts/thermostat-ipc-client-common Fri Sep 30 10:15:14 2016 -0400 +++ b/distribution/scripts/thermostat-ipc-client-common Fri Sep 30 11:20:00 2016 -0400 @@ -8,6 +8,8 @@ IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-common-${project.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-unixsocket-client-${project.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-unixsocket-common-${project.version}.jar" +IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-tcpsocket-client-${project.version}.jar" +IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/thermostat-agent-ipc-tcpsocket-common-${project.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/jnr-unixsocket-${jnr-unixsocket.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/jnr-enxio-${jnr-enxio.version}.jar" IPC_CLASSPATH="${IPC_CLASSPATH}:${THERMOSTAT_LIBS}/jnr-constants-${jnr-constants.version}.jar"
--- a/vm-byteman/byteman-helper/pom.xml Fri Sep 30 10:15:14 2016 -0400 +++ b/vm-byteman/byteman-helper/pom.xml Fri Sep 30 11:20:00 2016 -0400 @@ -68,6 +68,8 @@ <include>com.redhat.thermostat:thermostat-agent-ipc-common</include> <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-client</include> <include>com.redhat.thermostat:thermostat-agent-ipc-unixsocket-common</include> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-client</include> + <include>com.redhat.thermostat:thermostat-agent-ipc-tcpsocket-common</include> <include>com.github.jnr:jnr-ffi</include> <include>com.github.jnr:jnr-enxio</include> <include>com.github.jnr:jnr-constants</include> @@ -205,6 +207,11 @@ </dependency> <dependency> <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-ipc-tcpsocket-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> <artifactId>thermostat-annotations</artifactId> <version>${project.version}</version> <scope>test</scope>