Mercurial > hg > release > thermostat-1.4
changeset 1799:c72a9502c19d
Failing to launch the cmd-channel process should fail the agent
This patch reworks the command channel process in order for the agent to
shutdown when the command channel server fails to start. This is done by
the server process outputting a new '<SERVER STARTED>' token once the
command channel is ready to accept requests. Should the process fail to
start (e.g. missing classes on classpath), the agent will get an EOF
instead of this token and shut itself down.
Reviewed-by: jerboaa
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2015-September/016178.html
PR2630
line wrap: on
line diff
--- a/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelConstants.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelConstants.java Fri Sep 18 11:24:03 2015 -0400 @@ -47,5 +47,6 @@ String END_REQUEST_TOKEN = "<END REQUEST>"; String BEGIN_RESPONSE_TOKEN = "<BEGIN RESPONSE>"; String END_RESPONSE_TOKEN = "<END RESPONSE>"; + String SERVER_STARTED_TOKEN = "<SERVER STARTED>"; }
--- a/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImpl.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImpl.java Fri Sep 18 11:24:03 2015 -0400 @@ -37,6 +37,7 @@ package com.redhat.thermostat.agent.command.server.internal; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.logging.Level; import java.util.logging.Logger; @@ -45,16 +46,21 @@ import org.jboss.netty.channel.ChannelException; import com.redhat.thermostat.agent.command.ConfigurationServer; -import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerContext; import com.redhat.thermostat.common.utils.LoggingUtils; class CommandChannelServerImpl implements ConfigurationServer { + private static final Logger logger = LoggingUtils.getLogger(CommandChannelServerImpl.class); private final CommandChannelServerContext ctx; - private static final Logger logger = LoggingUtils.getLogger(CommandChannelServerImpl.class); + private final PrintStream printer; + + CommandChannelServerImpl(CommandChannelServerContext ctx) { + this(ctx, System.out); + } - CommandChannelServerImpl(CommandChannelServerContext ctx) { + CommandChannelServerImpl(CommandChannelServerContext ctx, PrintStream printer) { this.ctx = ctx; + this.printer = printer; } @Override @@ -67,6 +73,9 @@ try { // Bind and start to accept incoming connections. bootstrap.bind(addr); + + // Output server started token to agent + printer.println(CommandChannelConstants.SERVER_STARTED_TOKEN); } catch (ChannelException e) { throw new IOException("Failed to bind command channel server (" + e.getMessage() + ")", e); }
--- a/agent/command-server/src/test/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImplTest.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command-server/src/test/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImplTest.java Fri Sep 18 11:24:03 2015 -0400 @@ -40,10 +40,12 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -54,14 +56,12 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerContext; -import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerImpl; - public class CommandChannelServerImplTest { private CommandChannelServerContext ctx; - ChannelGroup cg; - ServerBootstrap bootstrap; + private ChannelGroup cg; + private ServerBootstrap bootstrap; + private PrintStream printer; @Before public void setUp() { @@ -72,15 +72,17 @@ ctx = mock(CommandChannelServerContext.class); when(ctx.getBootstrap()).thenReturn(bootstrap); when(ctx.getChannelGroup()).thenReturn(cg); + printer = mock(PrintStream.class); } @Test public void testStartListening() throws IOException { - CommandChannelServerImpl server = new CommandChannelServerImpl(ctx); + CommandChannelServerImpl server = new CommandChannelServerImpl(ctx, printer); server.startListening("127.0.0.1", 123); ArgumentCaptor<InetSocketAddress> argument = ArgumentCaptor.forClass(InetSocketAddress.class); verify(bootstrap).bind(argument.capture()); + verify(printer).println(CommandChannelConstants.SERVER_STARTED_TOKEN); InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 123); assertEquals(addr, argument.getValue()); @@ -89,7 +91,7 @@ @SuppressWarnings("unchecked") @Test public void startListeningFailureThrowsException() { - CommandChannelServerImpl server = new CommandChannelServerImpl(ctx); + CommandChannelServerImpl server = new CommandChannelServerImpl(ctx, printer); when(bootstrap.bind(any(InetSocketAddress.class))).thenThrow(ChannelException.class); @@ -99,6 +101,8 @@ } catch (IOException e) { // pass } + + verify(printer, never()).println(CommandChannelConstants.SERVER_STARTED_TOKEN); } @Test
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelConstants.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelConstants.java Fri Sep 18 11:24:03 2015 -0400 @@ -47,5 +47,6 @@ String END_REQUEST_TOKEN = "<END REQUEST>"; String BEGIN_RESPONSE_TOKEN = "<BEGIN RESPONSE>"; String END_RESPONSE_TOKEN = "<END RESPONSE>"; + String SERVER_STARTED_TOKEN = "<SERVER STARTED>"; }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegate.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegate.java Fri Sep 18 11:24:03 2015 -0400 @@ -36,8 +36,11 @@ package com.redhat.thermostat.agent.command.internal; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.lang.ProcessBuilder.Redirect; @@ -74,22 +77,24 @@ private final StorageGetter storageGetter; private final File binPath; private final ProcessCreator procCreator; + private final ReaderCreator readerCreator; private Process process; private ProcessStreamReader stdoutReader; private PrintWriter printer; CommandChannelDelegate(ReceiverRegistry receivers, SSLConfiguration sslConf, File binPath) { - this(receivers, sslConf, binPath, new StorageGetter(), new ProcessCreator()); + this(receivers, sslConf, binPath, new StorageGetter(), new ProcessCreator(), new ReaderCreator()); } /** For testing only */ CommandChannelDelegate(ReceiverRegistry receivers, SSLConfiguration sslConf, File binPath, - StorageGetter getter, ProcessCreator procCreator) { + StorageGetter getter, ProcessCreator procCreator, ReaderCreator readerCreator) { this.storageGetter = getter; this.receivers = receivers; this.sslConf = sslConf; this.binPath = binPath; this.procCreator = procCreator; + this.readerCreator = readerCreator; } @Override @@ -120,23 +125,41 @@ writeResponse(new Response(ResponseType.ERROR)); } }; - stdoutReader = new ProcessOutputStreamReader(process.getInputStream(), this, exceptionListener); + // Must be instantiated before starting output reader printer = new PrintWriter(new OutputStreamWriter(process.getOutputStream(), "UTF-8")); + SSLConfigurationWriter sslWriter = new SSLConfigurationWriter(printer); + sslWriter.writeSSLConfiguration(sslConf); + + // Wait for started notification + waitForStarted(); + + stdoutReader = new ProcessOutputStreamReader(process.getInputStream(), this, exceptionListener); stdoutReader.start(); - SSLConfigurationWriter sslWriter = new SSLConfigurationWriter(printer); - sslWriter.writeSSLConfiguration(sslConf); + } + + private void waitForStarted() throws IOException { + BufferedReader br = readerCreator.createReader(process.getInputStream()); + String token = br.readLine(); + if (token == null || !CommandChannelConstants.SERVER_STARTED_TOKEN.equals(token)) { + throw new IOException("Command channel server failed to start"); + } + logger.info("Command channel server ready to accept requests"); } private void shutdownProcess() throws IOException { // Interrupt the reader thread to stop processing - stdoutReader.interrupt(); + if (stdoutReader != null) { + stdoutReader.interrupt(); + } process.destroy(); try { - stdoutReader.join(); + if (stdoutReader != null) { + stdoutReader.join(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -248,5 +271,12 @@ return builder.start(); } } + + /** for testing only */ + static class ReaderCreator { + BufferedReader createReader(InputStream in) throws IOException { + return new BufferedReader(new InputStreamReader(in, "UTF-8")); + } + } }
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegateTest.java Thu Sep 17 17:14:11 2015 +0200 +++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegateTest.java Fri Sep 18 11:24:03 2015 -0400 @@ -46,6 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -61,6 +62,7 @@ import com.redhat.thermostat.agent.command.ReceiverRegistry; import com.redhat.thermostat.agent.command.RequestReceiver; import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.ProcessCreator; +import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.ReaderCreator; import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.StorageGetter; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Request.RequestType; @@ -74,6 +76,7 @@ private StorageGetter storageGetter; private ProcessCreator processCreator; + private ReaderCreator readerCreator; private ReceiverRegistry receivers; private File binPath; private CommandChannelDelegate delegate; @@ -90,16 +93,21 @@ storageGetter = mock(StorageGetter.class); processCreator = mock(ProcessCreator.class); process = mock(Process.class); + + readerCreator = mock(ReaderCreator.class); stdout = mock(InputStream.class); - when(stdout.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); + BufferedReader br = mock(BufferedReader.class); + when(br.readLine()).thenReturn(CommandChannelConstants.SERVER_STARTED_TOKEN); + when(readerCreator.createReader(stdout)).thenReturn(br); stderr = mock(InputStream.class); when(stderr.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); stdin = mock(OutputStream.class); + when(process.getInputStream()).thenReturn(stdout); when(process.getErrorStream()).thenReturn(stderr); when(process.getOutputStream()).thenReturn(stdin); when(processCreator.startProcess(any(String[].class))).thenReturn(process); - delegate = new CommandChannelDelegate(receivers, sslConf, binPath, storageGetter, processCreator); + delegate = new CommandChannelDelegate(receivers, sslConf, binPath, storageGetter, processCreator, readerCreator); } @Test @@ -130,6 +138,13 @@ }).when(stdin).write(any(byte[].class), anyInt(), anyInt()); } + @Test(expected=IOException.class) + public void testServerFailsToStart() throws IOException { + BufferedReader br = mock(BufferedReader.class); + when(readerCreator.createReader(stdout)).thenReturn(br); + delegate.startListening("127.0.0.1", 123); + } + @Test public void testStopListening() throws IOException { delegate.startListening("127.0.0.1", 123);