changeset 1799:c72a9502c19d

Failing to launch the cmd-channel process should fail the agent This patch reworks the command channel process in order for the agent to shutdown when the command channel server fails to start. This is done by the server process outputting a new '<SERVER STARTED>' token once the command channel is ready to accept requests. Should the process fail to start (e.g. missing classes on classpath), the agent will get an EOF instead of this token and shut itself down. Reviewed-by: jerboaa Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2015-September/016178.html PR2630
author Elliott Baron <ebaron@redhat.com>
date Fri, 18 Sep 2015 11:24:03 -0400
parents e955837e95de
children fbe6fc7c6bc6
files agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelConstants.java agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImpl.java agent/command-server/src/test/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImplTest.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelConstants.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegate.java agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegateTest.java
diffstat 6 files changed, 79 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelConstants.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelConstants.java	Fri Sep 18 11:24:03 2015 -0400
@@ -47,5 +47,6 @@
     String END_REQUEST_TOKEN = "<END REQUEST>";
     String BEGIN_RESPONSE_TOKEN = "<BEGIN RESPONSE>";
     String END_RESPONSE_TOKEN = "<END RESPONSE>";
+    String SERVER_STARTED_TOKEN = "<SERVER STARTED>";
 
 }
--- a/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImpl.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command-server/src/main/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImpl.java	Fri Sep 18 11:24:03 2015 -0400
@@ -37,6 +37,7 @@
 package com.redhat.thermostat.agent.command.server.internal;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -45,16 +46,21 @@
 import org.jboss.netty.channel.ChannelException;
 
 import com.redhat.thermostat.agent.command.ConfigurationServer;
-import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerContext;
 import com.redhat.thermostat.common.utils.LoggingUtils;
 
 class CommandChannelServerImpl implements ConfigurationServer {
 
+    private static final Logger logger = LoggingUtils.getLogger(CommandChannelServerImpl.class);
     private final CommandChannelServerContext ctx;
-    private static final Logger logger = LoggingUtils.getLogger(CommandChannelServerImpl.class);
+    private final PrintStream printer;
+    
+    CommandChannelServerImpl(CommandChannelServerContext ctx) {
+        this(ctx, System.out);
+    }
 
-    CommandChannelServerImpl(CommandChannelServerContext ctx) {
+    CommandChannelServerImpl(CommandChannelServerContext ctx, PrintStream printer) {
         this.ctx = ctx;
+        this.printer = printer;
     }
 
     @Override
@@ -67,6 +73,9 @@
         try {
             // Bind and start to accept incoming connections.
             bootstrap.bind(addr);
+            
+            // Output server started token to agent
+            printer.println(CommandChannelConstants.SERVER_STARTED_TOKEN);
         } catch (ChannelException e) {
             throw new IOException("Failed to bind command channel server (" + e.getMessage() + ")", e);
         }
--- a/agent/command-server/src/test/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImplTest.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command-server/src/test/java/com/redhat/thermostat/agent/command/server/internal/CommandChannelServerImplTest.java	Fri Sep 18 11:24:03 2015 -0400
@@ -40,10 +40,12 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
 
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -54,14 +56,12 @@
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerContext;
-import com.redhat.thermostat.agent.command.server.internal.CommandChannelServerImpl;
-
 public class CommandChannelServerImplTest {
 
     private CommandChannelServerContext ctx;
-    ChannelGroup cg;
-    ServerBootstrap bootstrap;
+    private ChannelGroup cg;
+    private ServerBootstrap bootstrap;
+    private PrintStream printer;
 
     @Before
     public void setUp() {
@@ -72,15 +72,17 @@
         ctx = mock(CommandChannelServerContext.class);
         when(ctx.getBootstrap()).thenReturn(bootstrap);
         when(ctx.getChannelGroup()).thenReturn(cg);
+        printer = mock(PrintStream.class);
     }
 
     @Test
     public void testStartListening() throws IOException {
-        CommandChannelServerImpl server = new CommandChannelServerImpl(ctx);
+        CommandChannelServerImpl server = new CommandChannelServerImpl(ctx, printer);
         server.startListening("127.0.0.1", 123);
 
         ArgumentCaptor<InetSocketAddress> argument = ArgumentCaptor.forClass(InetSocketAddress.class);
         verify(bootstrap).bind(argument.capture());
+        verify(printer).println(CommandChannelConstants.SERVER_STARTED_TOKEN);
         
         InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 123);
         assertEquals(addr, argument.getValue());
@@ -89,7 +91,7 @@
     @SuppressWarnings("unchecked")
     @Test
     public void startListeningFailureThrowsException() {
-        CommandChannelServerImpl server = new CommandChannelServerImpl(ctx);
+        CommandChannelServerImpl server = new CommandChannelServerImpl(ctx, printer);
 
         when(bootstrap.bind(any(InetSocketAddress.class))).thenThrow(ChannelException.class);
         
@@ -99,6 +101,8 @@
         } catch (IOException e) {
             // pass
         }
+        
+        verify(printer, never()).println(CommandChannelConstants.SERVER_STARTED_TOKEN);
     }
 
     @Test
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelConstants.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelConstants.java	Fri Sep 18 11:24:03 2015 -0400
@@ -47,5 +47,6 @@
     String END_REQUEST_TOKEN = "<END REQUEST>";
     String BEGIN_RESPONSE_TOKEN = "<BEGIN RESPONSE>";
     String END_RESPONSE_TOKEN = "<END RESPONSE>";
+    String SERVER_STARTED_TOKEN = "<SERVER STARTED>";
 
 }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegate.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegate.java	Fri Sep 18 11:24:03 2015 -0400
@@ -36,8 +36,11 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.lang.ProcessBuilder.Redirect;
@@ -74,22 +77,24 @@
     private final StorageGetter storageGetter;
     private final File binPath;
     private final ProcessCreator procCreator;
+    private final ReaderCreator readerCreator;
     private Process process;
     private ProcessStreamReader stdoutReader;
     private PrintWriter printer;
     
     CommandChannelDelegate(ReceiverRegistry receivers, SSLConfiguration sslConf, File binPath) {
-        this(receivers, sslConf, binPath, new StorageGetter(), new ProcessCreator());
+        this(receivers, sslConf, binPath, new StorageGetter(), new ProcessCreator(), new ReaderCreator());
     }
 
     /** For testing only */
     CommandChannelDelegate(ReceiverRegistry receivers, SSLConfiguration sslConf, File binPath, 
-            StorageGetter getter, ProcessCreator procCreator) {
+            StorageGetter getter, ProcessCreator procCreator, ReaderCreator readerCreator) {
         this.storageGetter = getter;
         this.receivers = receivers;
         this.sslConf = sslConf;
         this.binPath = binPath;
         this.procCreator = procCreator;
+        this.readerCreator = readerCreator;
     }
 
     @Override
@@ -120,23 +125,41 @@
                 writeResponse(new Response(ResponseType.ERROR));
             }
         };
-        stdoutReader = new ProcessOutputStreamReader(process.getInputStream(), this, exceptionListener);
+        
         // Must be instantiated before starting output reader
         printer = new PrintWriter(new OutputStreamWriter(process.getOutputStream(), "UTF-8"));
+        SSLConfigurationWriter sslWriter = new SSLConfigurationWriter(printer);
+        sslWriter.writeSSLConfiguration(sslConf);
+        
+        // Wait for started notification
+        waitForStarted();
+        
+        stdoutReader = new ProcessOutputStreamReader(process.getInputStream(), this, exceptionListener);
         stdoutReader.start();
         
-        SSLConfigurationWriter sslWriter = new SSLConfigurationWriter(printer);
-        sslWriter.writeSSLConfiguration(sslConf);
+    }
+
+    private void waitForStarted() throws IOException {
+        BufferedReader br = readerCreator.createReader(process.getInputStream());
+        String token = br.readLine();
+        if (token == null || !CommandChannelConstants.SERVER_STARTED_TOKEN.equals(token)) {
+            throw new IOException("Command channel server failed to start");
+        }
+        logger.info("Command channel server ready to accept requests");
     }
 
     private void shutdownProcess() throws IOException {
         // Interrupt the reader thread to stop processing
-        stdoutReader.interrupt();
+        if (stdoutReader != null) {
+            stdoutReader.interrupt();
+        }
         
         process.destroy();
         
         try {
-            stdoutReader.join();
+            if (stdoutReader != null) {
+                stdoutReader.join();
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
@@ -248,5 +271,12 @@
             return builder.start();
         }
     }
+    
+    /** for testing only */
+    static class ReaderCreator {
+        BufferedReader createReader(InputStream in) throws IOException {
+            return new BufferedReader(new InputStreamReader(in, "UTF-8"));
+        }
+    }
 }
 
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegateTest.java	Thu Sep 17 17:14:11 2015 +0200
+++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/CommandChannelDelegateTest.java	Fri Sep 18 11:24:03 2015 -0400
@@ -46,6 +46,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -61,6 +62,7 @@
 import com.redhat.thermostat.agent.command.ReceiverRegistry;
 import com.redhat.thermostat.agent.command.RequestReceiver;
 import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.ProcessCreator;
+import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.ReaderCreator;
 import com.redhat.thermostat.agent.command.internal.CommandChannelDelegate.StorageGetter;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Request.RequestType;
@@ -74,6 +76,7 @@
     
     private StorageGetter storageGetter;
     private ProcessCreator processCreator;
+    private ReaderCreator readerCreator;
     private ReceiverRegistry receivers;
     private File binPath;
     private CommandChannelDelegate delegate;
@@ -90,16 +93,21 @@
         storageGetter = mock(StorageGetter.class);
         processCreator = mock(ProcessCreator.class);
         process = mock(Process.class);
+        
+        readerCreator = mock(ReaderCreator.class);
         stdout = mock(InputStream.class);
-        when(stdout.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
+        BufferedReader br = mock(BufferedReader.class);
+        when(br.readLine()).thenReturn(CommandChannelConstants.SERVER_STARTED_TOKEN);
+        when(readerCreator.createReader(stdout)).thenReturn(br);
         stderr = mock(InputStream.class);
         when(stderr.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
         stdin = mock(OutputStream.class);
+        
         when(process.getInputStream()).thenReturn(stdout);
         when(process.getErrorStream()).thenReturn(stderr);
         when(process.getOutputStream()).thenReturn(stdin);
         when(processCreator.startProcess(any(String[].class))).thenReturn(process);
-        delegate = new CommandChannelDelegate(receivers, sslConf, binPath, storageGetter, processCreator);
+        delegate = new CommandChannelDelegate(receivers, sslConf, binPath, storageGetter, processCreator, readerCreator);
     }
 
     @Test
@@ -130,6 +138,13 @@
         }).when(stdin).write(any(byte[].class), anyInt(), anyInt());
     }
     
+    @Test(expected=IOException.class)
+    public void testServerFailsToStart() throws IOException {
+        BufferedReader br = mock(BufferedReader.class);
+        when(readerCreator.createReader(stdout)).thenReturn(br);
+        delegate.startListening("127.0.0.1", 123);
+    }
+    
     @Test
     public void testStopListening() throws IOException {
         delegate.startListening("127.0.0.1", 123);