changeset 2567:4c926ce7235d

Fix Exception Handling in RequestQueue Sends an error response back to the listeners if the RequestQueue fails to connect to the target address. PR3184 Reviewed-by: jkang Review Thread: http://icedtea.classpath.org/pipermail/thermostat/2017-January/022012.html
author Joshua Matsuoka <jmatsuok@redhat.com>
date Fri, 20 Jan 2017 10:48:09 -0500
parents 599edeb76bfa
children b98beac5559c
files client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java
diffstat 2 files changed, 84 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java	Fri Jan 20 08:57:24 2017 -0500
+++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java	Fri Jan 20 10:48:09 2017 -0500
@@ -161,18 +161,26 @@
                 if (request == null) {
                     break;
                 }
-                ChannelFuture f = ctx.getBootstrap().connect(request.getTarget()).syncUninterruptibly();
-                if (f.isSuccess()) {
-                	Channel c = f.channel();
-                	ChannelPipeline pipeline = c.pipeline();
-                	if (ctx.getSSLConfiguration().enableForCmdChannel()) {
-                	    doSSLHandShake(pipeline, request);
-                	}
-                	pipeline.addLast("responseHandler", new ResponseHandler(request));
-                	pipeline.writeAndFlush(request);
-                } else {
-                	Response response  = new Response(ResponseType.ERROR);
-                	fireComplete(request, response);
+                try {
+                    ChannelFuture f = ctx.getBootstrap().connect(request.getTarget()).syncUninterruptibly();
+                    if (f.isSuccess()) {
+                        Channel c = f.channel();
+                        ChannelPipeline pipeline = c.pipeline();
+                        if (ctx.getSSLConfiguration().enableForCmdChannel()) {
+                            doSSLHandShake(pipeline, request);
+                        }
+                        pipeline.addLast("responseHandler", new ResponseHandler(request));
+                        pipeline.writeAndFlush(request);
+                    } else {
+                        Response response = new Response(ResponseType.ERROR);
+                        fireComplete(request, response);
+                    }
+                    // If an exception is thrown it needs to be caught otherwise no
+                    // response is ever sent back to the request receiver.
+                } catch (Exception e) {
+                    logger.severe(e.toString());
+                    Response response  = new Response(ResponseType.ERROR);
+                    fireComplete(request, response);
                 }
             }
         }
--- a/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java	Fri Jan 20 08:57:24 2017 -0500
+++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java	Fri Jan 20 10:48:09 2017 -0500
@@ -47,8 +47,15 @@
 import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -170,6 +177,63 @@
         assertTrue(queue.getQueue().contains(request));
     }
 
+    /*
+     * Ensure that if the connection to the request target fails, the exception is caught
+     * and an error response is returned.
+     */
+    @Test
+    public void testFailedConnectionIsCaught() throws InterruptedException {
+        CountDownLatch signal = new CountDownLatch(1);
+        InetSocketAddress addr = mock(InetSocketAddress.class);
+        Request req = createRequest(addr, "");
+        when(req.getTarget()).thenReturn(addr);
+        ConnectionFailedListener listener = new ConnectionFailedListener(signal);
+        List<RequestResponseListener> listeners = new ArrayList<>();
+        listeners.add(listener);
+        when(req.getListeners()).thenReturn(listeners);
+        ConfigurationRequestContext ctx = mock(ConfigurationRequestContext.class);
+        when(ctx.getBootstrap()).thenReturn(mock(Bootstrap.class));
+        when(ctx.getBootstrap().connect(any(InetSocketAddress.class))).thenReturn(mock(ChannelFuture.class));
+        when(ctx.getBootstrap()
+                .connect(any(InetSocketAddress.class))
+                .syncUninterruptibly())
+                .thenThrow(new ChannelException("Connection Refused"));
+        RequestQueueImpl queue = new RequestQueueImpl(ctx);
+        try {
+            queue.putRequest(req);
+            queue.startProcessingRequests();
+            // Wait for the response to be sent
+            signal.await(5, TimeUnit.SECONDS);
+            queue.stopProcessingRequests();
+            assertTrue(listener.isCalled());
+        } catch (InterruptedException ie) {
+            fail(ie.getMessage());
+        }
+    }
+
+    private class ConnectionFailedListener implements RequestResponseListener {
+
+        public boolean called = false;
+        private CountDownLatch signal;
+
+        public ConnectionFailedListener(CountDownLatch signal) {
+            this.signal = signal;
+        }
+
+        public boolean isCalled() {
+            return called;
+        }
+
+        @Override
+        public void fireComplete(Request request, Response response) {
+            if (response.getType() == ResponseType.ERROR) {
+                called = true;
+                signal.countDown();
+            }
+        }
+
+    }
+
     private static Request createRequest(InetSocketAddress agentAddress, String receiver) {
         Request request = mock(Request.class);
         when(request.getReceiver()).thenReturn(receiver);