Mercurial > hg > release > thermostat-2.0
changeset 2567:4c926ce7235d
Fix Exception Handling in RequestQueue
Sends an error response back to the listeners if the RequestQueue fails to
connect to the target address.
PR3184
Reviewed-by: jkang
Review Thread: http://icedtea.classpath.org/pipermail/thermostat/2017-January/022012.html
author | Joshua Matsuoka <jmatsuok@redhat.com> |
---|---|
date | Fri, 20 Jan 2017 10:48:09 -0500 |
parents | 599edeb76bfa |
children | b98beac5559c |
files | client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java |
diffstat | 2 files changed, 84 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java Fri Jan 20 08:57:24 2017 -0500 +++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java Fri Jan 20 10:48:09 2017 -0500 @@ -161,18 +161,26 @@ if (request == null) { break; } - ChannelFuture f = ctx.getBootstrap().connect(request.getTarget()).syncUninterruptibly(); - if (f.isSuccess()) { - Channel c = f.channel(); - ChannelPipeline pipeline = c.pipeline(); - if (ctx.getSSLConfiguration().enableForCmdChannel()) { - doSSLHandShake(pipeline, request); - } - pipeline.addLast("responseHandler", new ResponseHandler(request)); - pipeline.writeAndFlush(request); - } else { - Response response = new Response(ResponseType.ERROR); - fireComplete(request, response); + try { + ChannelFuture f = ctx.getBootstrap().connect(request.getTarget()).syncUninterruptibly(); + if (f.isSuccess()) { + Channel c = f.channel(); + ChannelPipeline pipeline = c.pipeline(); + if (ctx.getSSLConfiguration().enableForCmdChannel()) { + doSSLHandShake(pipeline, request); + } + pipeline.addLast("responseHandler", new ResponseHandler(request)); + pipeline.writeAndFlush(request); + } else { + Response response = new Response(ResponseType.ERROR); + fireComplete(request, response); + } + // If an exception is thrown it needs to be caught otherwise no + // response is ever sent back to the request receiver. + } catch (Exception e) { + logger.severe(e.toString()); + Response response = new Response(ResponseType.ERROR); + fireComplete(request, response); } } }
--- a/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java Fri Jan 20 08:57:24 2017 -0500 +++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestQueueImplTest.java Fri Jan 20 10:48:09 2017 -0500 @@ -47,8 +47,15 @@ import static org.mockito.Mockito.when; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -170,6 +177,63 @@ assertTrue(queue.getQueue().contains(request)); } + /* + * Ensure that if the connection to the request target fails, the exception is caught + * and an error response is returned. + */ + @Test + public void testFailedConnectionIsCaught() throws InterruptedException { + CountDownLatch signal = new CountDownLatch(1); + InetSocketAddress addr = mock(InetSocketAddress.class); + Request req = createRequest(addr, ""); + when(req.getTarget()).thenReturn(addr); + ConnectionFailedListener listener = new ConnectionFailedListener(signal); + List<RequestResponseListener> listeners = new ArrayList<>(); + listeners.add(listener); + when(req.getListeners()).thenReturn(listeners); + ConfigurationRequestContext ctx = mock(ConfigurationRequestContext.class); + when(ctx.getBootstrap()).thenReturn(mock(Bootstrap.class)); + when(ctx.getBootstrap().connect(any(InetSocketAddress.class))).thenReturn(mock(ChannelFuture.class)); + when(ctx.getBootstrap() + .connect(any(InetSocketAddress.class)) + .syncUninterruptibly()) + .thenThrow(new ChannelException("Connection Refused")); + RequestQueueImpl queue = new RequestQueueImpl(ctx); + try { + queue.putRequest(req); + queue.startProcessingRequests(); + // Wait for the response to be sent + signal.await(5, TimeUnit.SECONDS); + queue.stopProcessingRequests(); + assertTrue(listener.isCalled()); + } catch (InterruptedException ie) { + fail(ie.getMessage()); + } + } + + private class ConnectionFailedListener implements RequestResponseListener { + + public boolean called = false; + private CountDownLatch signal; + + public ConnectionFailedListener(CountDownLatch signal) { + this.signal = signal; + } + + public boolean isCalled() { + return called; + } + + @Override + public void fireComplete(Request request, Response response) { + if (response.getType() == ResponseType.ERROR) { + called = true; + signal.countDown(); + } + } + + } + private static Request createRequest(InetSocketAddress agentAddress, String receiver) { Request request = mock(Request.class); when(request.getReceiver()).thenReturn(receiver);