changeset 864:8e16f9b4ef38

Simplify how encoders/decoders are implemented in cmd-channel. Earlier *Encoder classes were using the IMHO rather un-intuitive writeRequested() state. I've added some code to MessageEncoder/MessageDecoder so that they are DownStreamHandlers/UpstreamHandlers and call an abstract encode/decode method. Encoders/Decoders implement this encode/decode method. This makes things a little bit easier to understand (IMHO). I've also added a Messages utility class which helps for equal-comparison of Requests/Responses. This helped for testing *Encoder/*Decoder classes, which are also easier testable now. Not all of them have had tests in the first place ;-) While I was doing all that I realized that there was no high-level description of how things are sent over the netty channel, so I've added that too. See javadoc of Request/Response. This should make it easier if one has to go back understanding things again after not touching the code for months ;-) Another reason why I've refactored this is because it makes putting in SSL support for the commmand-channel easier. Reviewed-by: vanaltj Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-December/004716.html
author Severin Gehwolf <sgehwolf@redhat.com>
date Fri, 14 Dec 2012 16:06:04 +0100
parents 39987bc895d4
children 4ebc8c2c19fb
files agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/ResponseEncoderTest.java client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java client/command/src/main/java/com/redhat/thermostat/client/command/internal/ResponseDecoder.java client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestEncoderTest.java client/command/src/test/java/com/redhat/thermostat/client/command/internal/ResponseDecoderTest.java common/command/src/main/java/com/redhat/thermostat/common/command/InvalidMessageException.java common/command/src/main/java/com/redhat/thermostat/common/command/Message.java common/command/src/main/java/com/redhat/thermostat/common/command/MessageDecoder.java common/command/src/main/java/com/redhat/thermostat/common/command/MessageEncoder.java common/command/src/main/java/com/redhat/thermostat/common/command/Messages.java common/command/src/main/java/com/redhat/thermostat/common/command/Request.java common/command/src/main/java/com/redhat/thermostat/common/command/Response.java common/command/src/test/java/com/redhat/thermostat/common/command/MessageDecoderTest.java common/command/src/test/java/com/redhat/thermostat/common/command/MessagesTest.java common/command/src/test/java/com/redhat/thermostat/common/command/ResponseTest.java
diffstat 18 files changed, 839 insertions(+), 120 deletions(-) [+]
line wrap: on
line diff
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,30 +36,42 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
 
 import com.redhat.thermostat.common.command.DecodingHelper;
+import com.redhat.thermostat.common.command.InvalidMessageException;
+import com.redhat.thermostat.common.command.Message;
 import com.redhat.thermostat.common.command.MessageDecoder;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Request.RequestType;
+import com.redhat.thermostat.common.utils.LoggingUtils;
 
 class RequestDecoder extends MessageDecoder {
+    
+    private static final Logger logger = LoggingUtils.getLogger(RequestDecoder.class);
 
+    /*
+     * See the javadoc of Request for a description of the encoding.
+     */
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel,
-            ChannelBuffer buffer) {
+    protected Message decode(Channel channel, ChannelBuffer msg) throws InvalidMessageException {
+        logger.log(Level.FINEST, "agent: decoding Request object");
+        ChannelBuffer buffer = (ChannelBuffer) msg;
         buffer.markReaderIndex();
         String typeAsString = DecodingHelper.decodeString(buffer);
         if (typeAsString == null) {
             buffer.resetReaderIndex();
-            return null;
+            throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(buffer));
         }
         Request request = new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress());
         if (!DecodingHelper.decodeParameters(buffer, request)) {
             buffer.resetReaderIndex();
-            return null;
+            throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(buffer));
         }
         return request;
     }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,23 +36,34 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.jboss.netty.buffer.ChannelBuffer;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
 
 import com.redhat.thermostat.common.command.EncodingHelper;
+import com.redhat.thermostat.common.command.Message;
 import com.redhat.thermostat.common.command.MessageEncoder;
 import com.redhat.thermostat.common.command.Response;
+import com.redhat.thermostat.common.utils.LoggingUtils;
 
 
 class ResponseEncoder extends MessageEncoder {
 
+    private static final Logger logger = LoggingUtils.getLogger(ResponseEncoder.class);
+    
+    /*
+     * See javadoc of Response for a description of the encoding.
+     */
     @Override
-    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
-
-        Response response = (Response) e.getMessage();
+    protected ChannelBuffer encode(Message msg) {
+        // At this point we are only getting Messages. Since our only
+        // registered MessageEncoder is the one for Responses a cast
+        // to Response should be safe.
+        logger.log(Level.FINEST, "agent: encoding Response object");
+        Response response = (Response) msg;
 
         // Response Type
         String responseType = EncodingHelper.trimType(response.getType().toString());
@@ -60,8 +71,7 @@
 
         // Compose the full message.
         ChannelBuffer buf = wrappedBuffer(typeBuffer);
-        Channels.write(ctx, e.getFuture(), buf);
-        
+        return buf;
     }
 
 }
\ No newline at end of file
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,6 +36,7 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
+import java.net.SocketAddress;
 import java.util.Collection;
 
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -45,6 +46,9 @@
 import org.junit.Test;
 
 import com.redhat.thermostat.agent.command.internal.RequestDecoder;
+import com.redhat.thermostat.common.command.InvalidMessageException;
+import com.redhat.thermostat.common.command.Message;
+import com.redhat.thermostat.common.command.Messages;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Request.RequestType;
 
@@ -53,6 +57,45 @@
 import static org.mockito.Mockito.mock;
 
 public class RequestDecoderTest {
+    
+    /*
+     * This is serialized format for
+     * req = new Request(RequestType.RESPONSE_EXPECTED, blah)
+     * req.setParameter("param1", "value1");
+     * req.setParameter("param2", "value2");
+     */
+    private static final byte[] ENCODED_REQEUEST_WITH_PARAMS = new byte[] {
+        0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45,
+        0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, 0x00, 0x00, 0x00,
+        0x02, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x06, 0x70, 0x61, 0x72,
+        0x61, 0x6d, 0x31, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x31, 0x00, 0x00, 0x00,
+        0x06, 0x00, 0x00, 0x00, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x32, 0x76,
+        0x61, 0x6c, 0x75, 0x65, 0x32
+    };
+    
+    /*
+     * This is serialized format for
+     * req = new Request(RequestType.RESPONSE_EXPECTED, blah)
+     */
+    private static final byte[] ENCODED_REQUEST_WITH_NO_PARAMS = new byte[] {
+        0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45,
+        0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, 0x00, 0x00, 0x00,
+        0x00
+    };
+    
+    private static final byte[][] GARBAGE_AS_REQUEST = new byte[][] {
+            // general garbage
+            { 0x0d, 0x0b, 0x0e, 0x0e, 0x0f },
+            // first two bytes are broken
+            { 0x0f, 0x0d, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53,
+                    0x45, 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44,
+                    0x00, 0x00, 0x00, 0x00 },
+            // last byte indicates params, which are missing
+            { 0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53,
+                    0x45, 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44,
+                    0x00, 0x00, 0x00, 0x0f } };
+    
+    
     private static final byte[] TYPE = RequestType.RESPONSE_EXPECTED.toString().getBytes();
 
     private Channel channel;
@@ -65,18 +108,18 @@
     }
 
     @Test
-    public void testDecode() {
+    public void testDecode() throws InvalidMessageException {
         ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
         buffer.writeInt(TYPE.length);
         buffer.writeBytes(TYPE);
 
-        Request request = (Request) decoder.decode(null, channel, buffer);
+        Request request = (Request) decoder.decode(channel, buffer);
 
         assertTrue(RequestType.RESPONSE_EXPECTED == (RequestType) request.getType());
     }
 
     @Test
-    public void testDecodeWithParameters() {
+    public void testDecodeWithParameters() throws InvalidMessageException {
         String parmName = "parameter";
         String parmValue = "hello";
         ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
@@ -88,7 +131,7 @@
         buffer.writeBytes(parmName.getBytes());
         buffer.writeBytes(parmValue.getBytes());
         
-        Request request = (Request) decoder.decode(null, channel, buffer);
+        Request request = (Request) decoder.decode(channel, buffer);
         Collection<String> parmNames = request.getParameterNames();
 
         assertEquals(1, parmNames.size());
@@ -96,4 +139,40 @@
         String decodedValue = request.getParameter(parmName);
         assertEquals(parmValue, decodedValue);
     }
+    
+    @Test
+    public void testDecodeWithParametersFromBytesArray() throws InvalidMessageException {
+        ChannelBuffer buffer = ChannelBuffers.copiedBuffer(ENCODED_REQEUEST_WITH_PARAMS);
+        Request expected = new Request(RequestType.RESPONSE_EXPECTED, null);
+        expected.setParameter("param1", "value1");
+        expected.setParameter("param2", "value2");
+        Message actual = new RequestDecoder().decode(channel, buffer);
+        assertTrue(actual instanceof Request);
+        assertTrue(Messages.equal(expected, (Request)actual));
+        SocketAddress addr = mock(SocketAddress.class);
+        buffer = ChannelBuffers.copiedBuffer(ENCODED_REQUEST_WITH_NO_PARAMS);
+        expected = new Request(RequestType.RESPONSE_EXPECTED, addr);
+        actual = new RequestDecoder().decode(channel, buffer);
+        assertTrue(actual instanceof Request);
+        assertTrue(Messages.equal(expected, (Request)actual));
+    }
+    
+    @Test
+    public void decodingOfGarbageThrowsException()
+            throws InvalidMessageException {
+        int expectedFailures = GARBAGE_AS_REQUEST.length;
+        int actualFailures = 0;
+        for (int i = 0; i < GARBAGE_AS_REQUEST.length; i++) {
+            ChannelBuffer buffer = ChannelBuffers
+                    .copiedBuffer(GARBAGE_AS_REQUEST[0]);
+            RequestDecoder decoder = new RequestDecoder();
+            try {
+                decoder.decode(channel, buffer);
+            } catch (InvalidMessageException e) {
+                // pass
+                actualFailures++;
+            }
+        }
+        assertEquals(expectedFailures, actualFailures);
+    }
 }
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/ResponseEncoderTest.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/ResponseEncoderTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,63 +36,41 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
-import java.nio.ByteBuffer;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.Charset;
 
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.junit.Before;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import com.redhat.thermostat.agent.command.internal.ResponseEncoder;
 import com.redhat.thermostat.common.command.Response;
 import com.redhat.thermostat.common.command.Response.ResponseType;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Channels.class)
 public class ResponseEncoderTest {
 
-    private MessageEvent e;
-
-    @Before
-    public void setUp() {
-        Response r = new Response(ResponseType.OK);
-        e = mock(MessageEvent.class);
-        when(e.getMessage()).thenReturn(r);
-        when(e.getFuture()).thenReturn(null);
-        
-    }
-
+    private static final boolean DEBUG = false;
+    
     @Test
-    public void testWriteRequested() {
-        PowerMockito.mockStatic(Channels.class);
-        ArgumentCaptor<Object> argument = ArgumentCaptor.forClass(Object.class);
-
+    public void testEncode() throws Exception {
         ResponseEncoder encoder = new ResponseEncoder();
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        encoder.writeRequested(ctx, e);
-        
-        PowerMockito.verifyStatic();
-        Channels.write(any(ChannelHandlerContext.class), any(ChannelFuture.class), argument.capture());
-
-        ChannelBuffer buf = (ChannelBuffer) argument.getValue();
-        int messageLength = buf.readInt();
-        assertEquals(2, messageLength);
-        ByteBuffer bbuf = ByteBuffer.allocate(buf.readableBytes());
-        buf.readBytes(bbuf);
-        String message = new String(bbuf.array());
-        assertEquals("OK", message);
+        String responseExp = "OK";
+        ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset());
+        ChannelBuffer buf = ChannelBuffers.buffer(4);
+        buf.writeInt(responseExp.getBytes().length);
+        ChannelBuffer expected = ChannelBuffers.wrappedBuffer(buf, stringBuf);
+        Response ok = new Response(ResponseType.OK);
+        ChannelBuffer actual = (ChannelBuffer)encoder.encode(ok);
+        if (DEBUG) {
+            printBuffers(actual, expected);
+        }
+        assertEquals(0, ChannelBuffers.compare(expected, actual));
+    }
+    
+    private void printBuffers(ChannelBuffer actual, ChannelBuffer expected) {
+        System.out.println("hexdump expected\n-------------------------------------");
+        System.out.println(ChannelBuffers.hexDump(expected));
+        System.out.println("\nhexdump actual\n-------------------------------------");
+        System.out.println(ChannelBuffers.hexDump(actual) + "\n\n");
     }
 }
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -44,12 +44,9 @@
 import java.util.logging.Logger;
 
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
 
 import com.redhat.thermostat.common.command.EncodingHelper;
+import com.redhat.thermostat.common.command.Message;
 import com.redhat.thermostat.common.command.MessageEncoder;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.utils.LoggingUtils;
@@ -57,36 +54,36 @@
 class RequestEncoder extends MessageEncoder {
 
     private static final Logger logger = LoggingUtils.getLogger(RequestEncoder.class);
-    
+
+    /*
+     * See the javadoc of Request for a description of the encoding.
+     */
     @Override
-    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
-
-        Request request = (Request) e.getMessage();
+    protected ChannelBuffer encode(Message msg) {
+        // At this point we are only getting Messages. Since our only
+        // registered MessageEncoder is the one for Requests a cast
+        // to Request should be safe.
+        Request request = (Request) msg;
+        logger.log(Level.FINEST, "encoding Request object");
 
         // Request Type
-        String requestType = EncodingHelper.trimType(request.getType().toString());
+        String requestType = EncodingHelper.trimType(request.getType()
+                .toString());
         ChannelBuffer typeBuffer = EncodingHelper.encode(requestType);
 
         // Parameters
-        // TODO: if in practice parms take up more than 256 bytes, use appropriate
-        // dynamicBuffer() variant to specify initial/estimated capacity.
+        // TODO: if in practice parms take up more than 256 bytes, use
+        // appropriate dynamicBuffer() variant to specify initial/estimated capacity.
         ChannelBuffer parmsBuffer = dynamicBuffer();
         Collection<String> parmNames = request.getParameterNames();
         parmsBuffer.writeInt(parmNames.size());
         for (String parmName : parmNames) {
-            EncodingHelper.encode(parmName, request.getParameter(parmName), parmsBuffer);
+            EncodingHelper.encode(parmName, request.getParameter(parmName),
+                    parmsBuffer);
         }
-
         // Compose the full message.
         ChannelBuffer buf = wrappedBuffer(typeBuffer, parmsBuffer);
-        Channels.write(ctx, e.getFuture(), buf);
-    }
-    
-    // This must be implemented, even though we are simply passing on the exception.  If
-    // not implemented, this exception ends up going uncaught which causes problems.
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        logger.log(Level.FINE, "Forwarding exception ", e.getCause());
-        Channels.fireExceptionCaught(ctx, e.getCause());
+        // Just return the channel buffer which is our encoded message
+        return buf;
     }
 }
\ No newline at end of file
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/ResponseDecoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/ResponseDecoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -37,22 +37,26 @@
 package com.redhat.thermostat.client.command.internal;
 
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
 
 import com.redhat.thermostat.common.command.DecodingHelper;
+import com.redhat.thermostat.common.command.InvalidMessageException;
+import com.redhat.thermostat.common.command.Message;
 import com.redhat.thermostat.common.command.MessageDecoder;
 import com.redhat.thermostat.common.command.Response;
 import com.redhat.thermostat.common.command.Response.ResponseType;
 
 class ResponseDecoder extends MessageDecoder {
 
+    /*
+     * See javadoc of Response for a description of the encoding.
+     */
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel,
-            ChannelBuffer buffer) {
-        String typeAsString = DecodingHelper.decodeString(buffer);
+    protected Message decode(Channel channel, ChannelBuffer originalMessage) throws InvalidMessageException {
+        String typeAsString = DecodingHelper.decodeString(originalMessage);
         if (typeAsString == null) {
-            return null;
+            throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(originalMessage));
         }
         return new Response(ResponseType.valueOf(typeAsString));
     }
--- a/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestEncoderTest.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestEncoderTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,34 +36,99 @@
 
 package com.redhat.thermostat.client.command.internal;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Matchers.isA;
+
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
 
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.junit.Test;
 
+import com.redhat.thermostat.common.command.Request;
+import com.redhat.thermostat.common.command.Request.RequestType;
+
 public class RequestEncoderTest {
 
-    /**
-     * This test verifies that exception events are forwarded upstream. This is to ensure that
-     * fireComplete() events get delivered in case of Exceptions.
-     * 
-     * @throws Exception
-     */
+    private static final boolean DEBUG = false;
+    
     @Test
-    public void exceptionCaughtCallsFireComplete() throws Exception {
+    public void canEncodeSimpleRequestWithNoParams() throws Exception {
+        RequestEncoder encoder = new RequestEncoder();
+        String responseExp = "RESPONSE_EXPECTED";
+        ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset());
+        ChannelBuffer buf = ChannelBuffers.buffer(4);
+        buf.writeInt(responseExp.getBytes().length);
+        ChannelBuffer buf2 = ChannelBuffers.wrappedBuffer(buf, stringBuf);
+        buf = ChannelBuffers.buffer(4);
+        buf.writeInt(0);
+        ChannelBuffer expected = ChannelBuffers.wrappedBuffer(buf2, buf);
+        SocketAddress addr = mock(SocketAddress.class);
+        Request item = new Request(RequestType.RESPONSE_EXPECTED, addr);
+        ChannelBuffer actual = encoder.encode(item);
+        if (DEBUG) {
+            printBuffers(actual, expected);
+        }
+        assertEquals(0, ChannelBuffers.compare(expected, actual));
+    }
+    
+    @Test
+    public void canEncodeRequestWithParams() throws Exception {
+        SocketAddress addr = mock(SocketAddress.class);
+
+        // Prepare request we'd like to encode
+        Request item = new Request(RequestType.RESPONSE_EXPECTED, addr);
+        String param1Name = "param1";
+        String param1Value = "value1";
+        String param2Name = "param2";
+        String param2Value = "value2";
+        item.setParameter(param1Name, param1Value);
+        item.setParameter(param2Name, param2Value);
         RequestEncoder encoder = new RequestEncoder();
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        ExceptionEvent evt = mock(ExceptionEvent.class);
-        when(evt.getCause()).thenReturn(new Exception());
-        Channel channel = mock(Channel.class);
-        when(ctx.getChannel()).thenReturn(channel);
-        encoder.exceptionCaught(ctx, evt);
-        // Channels.fireExceptionCaught implicitly calls this
-        verify(ctx).sendUpstream(isA(ExceptionEvent.class));
+        
+        // build expected
+        String responseExp = "RESPONSE_EXPECTED";
+        ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset());
+        ChannelBuffer buf = ChannelBuffers.buffer(4);
+        buf.writeInt(responseExp.getBytes().length);
+        ChannelBuffer buf2 = ChannelBuffers.wrappedBuffer(buf, stringBuf);
+        buf = ChannelBuffers.buffer(4);
+        buf.writeInt(2);
+        ChannelBuffer request = ChannelBuffers.wrappedBuffer(buf2, buf);
+        ChannelBuffer nameLen = ChannelBuffers.buffer(4);
+        nameLen.writeInt(param1Name.getBytes().length);
+        ChannelBuffer valueLen = ChannelBuffers.buffer(4);
+        valueLen.writeInt(param1Value.getBytes().length);
+        ChannelBuffer lens = ChannelBuffers.wrappedBuffer(nameLen, valueLen);
+        ChannelBuffer nameBuf = ChannelBuffers.copiedBuffer(param1Name, Charset.defaultCharset());
+        ChannelBuffer valueBuf = ChannelBuffers.copiedBuffer(param1Value, Charset.defaultCharset());
+        ChannelBuffer payload = ChannelBuffers.wrappedBuffer(nameBuf, valueBuf);
+        ChannelBuffer param1Buf = ChannelBuffers.wrappedBuffer(lens, payload);
+        nameLen = ChannelBuffers.buffer(4);
+        nameLen.writeInt(param2Name.getBytes().length);
+        valueLen = ChannelBuffers.buffer(4);
+        valueLen.writeInt(param2Value.getBytes().length);
+        lens = ChannelBuffers.wrappedBuffer(nameLen, valueLen);
+        nameBuf = ChannelBuffers.copiedBuffer(param2Name, Charset.defaultCharset());
+        valueBuf = ChannelBuffers.copiedBuffer(param2Value, Charset.defaultCharset());
+        payload = ChannelBuffers.wrappedBuffer(nameBuf, valueBuf);
+        ChannelBuffer param2Buf = ChannelBuffers.wrappedBuffer(lens, payload);
+        ChannelBuffer params = ChannelBuffers.wrappedBuffer(param1Buf, param2Buf);
+        ChannelBuffer expected = ChannelBuffers.wrappedBuffer(request, params);
+        
+        // Encode item for actual
+        ChannelBuffer actual = encoder.encode(item);
+        if (DEBUG) {
+            printBuffers(actual, expected);
+        }
+        assertEquals(0, ChannelBuffers.compare(expected, actual));
+    }
+
+    private void printBuffers(ChannelBuffer actual, ChannelBuffer expected) {
+        System.out.println("hexdump expected\n-------------------------------------");
+        System.out.println(ChannelBuffers.hexDump(expected));
+        System.out.println("\nhexdump actual\n-------------------------------------");
+        System.out.println(ChannelBuffers.hexDump(actual) + "\n\n");
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/ResponseDecoderTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.client.command.internal;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.Test;
+
+import com.redhat.thermostat.common.command.InvalidMessageException;
+import com.redhat.thermostat.common.command.Message;
+import com.redhat.thermostat.common.command.Messages;
+import com.redhat.thermostat.common.command.Response;
+import com.redhat.thermostat.common.command.Response.ResponseType;
+
+public class ResponseDecoderTest {
+
+    private static final byte[] ENCODED_OK_RESP = new byte[] {
+        0x00, 0x00, 0x00, 0x02, 0x4f, 0x4b  
+    };
+    
+    private static final byte[] GARBAGE_AS_RESPONSE = new byte[] {
+        0x0d, 0x0b, 0x0e, 0x0e, 0x0f  
+    };
+    
+    @Test
+    public void testDecode() throws InvalidMessageException {
+        ChannelBuffer buffer = ChannelBuffers.copiedBuffer(ENCODED_OK_RESP);
+        Response expected = new Response(ResponseType.OK);
+        ResponseDecoder decoder = new ResponseDecoder();
+        Message actual = decoder.decode(null, buffer);
+        assertTrue(actual instanceof Response);
+        assertTrue(Messages.equal(expected, (Response)actual));
+    }
+    
+    @Test
+    public void verifyInvalidEncodingThrowsException() {
+        ResponseDecoder decoder = new ResponseDecoder();
+        ChannelBuffer garbage = ChannelBuffers.copiedBuffer(GARBAGE_AS_RESPONSE);
+        try {
+            decoder.decode(null, garbage);
+            fail("Should have thrown decoding exception!");
+        } catch (InvalidMessageException e) {
+            // pass
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/InvalidMessageException.java	Fri Dec 14 16:06:04 2012 +0100
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.common.command;
+
+/**
+ * Exception thrown when an improperly encoded {@link Message} has been received.
+ *
+ */
+@SuppressWarnings("serial")
+public class InvalidMessageException extends Exception {
+
+    public InvalidMessageException(String message) {
+        super(message);
+    }
+}
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Message.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Message.java	Fri Dec 14 16:06:04 2012 +0100
@@ -37,7 +37,7 @@
 package com.redhat.thermostat.common.command;
 
 
-interface Message {
+public interface Message {
 
     interface MessageType {
     }
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/MessageDecoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/MessageDecoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,8 +36,64 @@
 
 package com.redhat.thermostat.common.command;
 
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import static org.jboss.netty.channel.Channels.fireMessageReceived;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import com.redhat.thermostat.common.utils.LoggingUtils;
+
+public abstract class MessageDecoder extends SimpleChannelUpstreamHandler {
+
+    // testing hook
+    boolean exceptionCaught = false;
+    
+    private static final Logger logger = LoggingUtils.getLogger(MessageDecoder.class);
+    
+    protected MessageDecoder() {
+        super();
+    }
 
-public abstract class MessageDecoder extends FrameDecoder {
+    @Override
+    public void handleUpstream(
+            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
+        if (!(evt instanceof MessageEvent)) {
+            ctx.sendUpstream(evt);
+            return;
+        }
 
+        MessageEvent e = (MessageEvent) evt;
+        Object originalMessage = e.getMessage();
+        if (!(originalMessage instanceof ChannelBuffer)) {
+            // Skip decoding, since we've received something
+            // we don't know how to deal with anyway.
+            ctx.sendUpstream(evt);
+        }
+        Message decodedMessage = null;
+        try {
+            decodedMessage = decode(e.getChannel(), (ChannelBuffer)originalMessage);
+        } catch (InvalidMessageException ex) {
+            logger.log(Level.WARNING, "Decoding failed on received message. Possible DoS attack!", ex);
+            exceptionCaught = true;
+        }
+        if (decodedMessage != null) {
+            fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress());
+        }
+    }
+
+    /**
+     * Transforms the specified received message into another message and return
+     * the transformed message.  Return {@code null} if the received message
+     * is supposed to be discarded.
+     * 
+     * @throws InvalidMessageException If the received message was not properly encoded.
+     */
+    protected abstract Message decode(Channel channel, ChannelBuffer msg) throws InvalidMessageException;
 }
\ No newline at end of file
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/MessageEncoder.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/MessageEncoder.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,8 +36,48 @@
 
 package com.redhat.thermostat.common.command;
 
-import org.jboss.netty.channel.SimpleChannelHandler;
+import static org.jboss.netty.channel.Channels.write;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
+
+public abstract class MessageEncoder extends SimpleChannelDownstreamHandler {
+
+    protected MessageEncoder() {
+        super();
+    }
+
+    @Override
+    public void handleDownstream(
+            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
+        if (!(evt instanceof MessageEvent)) {
+            ctx.sendDownstream(evt);
+            return;
+        }
 
-public abstract class MessageEncoder extends SimpleChannelHandler {
+        MessageEvent e = (MessageEvent) evt;
+        Object originalMessage = e.getMessage();
+        // We only know how to encode Messages. Non-message types
+        // get sent downstream verbatim.
+        if (!(originalMessage instanceof Message)) {
+            ctx.sendDownstream(evt);
+        }
+        ChannelBuffer encodedMessage = encode((Message)originalMessage);
+        if (encodedMessage != null) {
+            write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
+        }
+    }
 
+    /**
+     * Transforms the specified message into another message and return the
+     * transformed message. Note that you can not return {@code null}, unlike
+     * you can in
+     * {@link MessageDecoder#decode(org.jboss.netty.buffer.ChannelBuffer)}; you
+     * must return something, at least {@link ChannelBuffers#EMPTY_BUFFER}.
+     */
+    protected abstract ChannelBuffer encode(Message originalMessage);
 }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Messages.java	Fri Dec 14 16:06:04 2012 +0100
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.common.command;
+
+import java.util.Collection;
+
+/**
+ * Helper class for comparing messages, such as {@link Request} and
+ * {@link Response}.
+ * 
+ */
+public class Messages {
+
+    /**
+     * Two requests, a and b, are considered equal if and only if they are,
+     * not-null, of the same type and all parameters (all keys and values)
+     * match. Listeners and targets are ignored.
+     * 
+     * @return true if a and b are both not-null and equal. false otherwise.
+     */
+    public static boolean equal(Request a, Request b) {
+        if (a == null || b == null) {
+            return false;
+        }
+        // type needs to be the same
+        if (a.getType() != b.getType()) {
+            return false;
+        }
+        // all parameters and values need to match
+        Collection<String> ourParamValues = a.getParameterNames();
+        Collection<String> otherParamValues = b.getParameterNames();
+        if (ourParamValues.size() != otherParamValues.size()) {
+            return false;
+        }
+        for (String name: ourParamValues) {
+            String otherParamValue = b.getParameter(name);
+            if (otherParamValue == null) {
+                // other doesn't have param which we have
+                return false;
+            } else {
+                // both requests contain same param name
+                String ourParamValue = a.getParameter(name);
+                if (!ourParamValue.equals(otherParamValue)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+    
+    /**
+     * Two responses are equal if and only if they are of the same type.
+     * 
+     * @return true if a and b are both not-null and are equal. false otherwise.
+     */
+    public static boolean equal(Response a, Response b) {
+        return a.getType() == b.getType();
+    }
+}
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java	Fri Dec 14 16:06:04 2012 +0100
@@ -43,7 +43,39 @@
 import java.util.TreeMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-
+/**
+ * A Request object represents a request passed from a client
+ * to an agent.
+ * 
+ * 
+ * Request objects are serialized over the command channel in the following
+ * format:
+ * 
+ * -------------------------
+ * | A | TYPE | B | PARAMS |
+ * -------------------------
+ * 
+ * A is an 32 bit integer representing the length - in bytes - of TYPE. TYPE
+ * is a byte array representing the string of the request type (e.g.
+ * "RESPONSE_EXPECTED") B is a 32 bit integer representing the number of
+ * request parameters which follow.
+ * 
+ * PARAMS (if B > 0) is a variable length stream of the following format:
+ * 
+ * It is a simple encoding of name => value pairs.
+ * 
+ * -----------------------------------------------------------------------------------------------
+ * | I_1 | K_1 | P_1 | V_1 | ... | I_(n-1) | K_(n-1) | P_(n-1) | V_(n-1) | I_n | K_n | P_n | V_n |
+ * -----------------------------------------------------------------------------------------------
+ * 
+ * I_n  A 32 bit integer representing the length - in bytes - of the n'th
+ *      parameter name.
+ * K_n  A 32 bit integer representing the length - in bytes - of the n'th
+ *      parameter value.
+ * P_n  A byte array representing the string of the n'th parameter name.
+ * V_n  A byte array representing the string of the n'th parameter value.
+ * 
+ */
 public class Request implements Message {
 
     public enum RequestType implements MessageType {
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Response.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Response.java	Fri Dec 14 16:06:04 2012 +0100
@@ -36,6 +36,23 @@
 
 package com.redhat.thermostat.common.command;
 
+
+/**
+ * A Response object represents a response message passed from an agent
+ * to a client.
+ * 
+ * 
+ * Response objects are serialized over the command channel in the following
+ * format:
+ * 
+ * ------------
+ * | A | TYPE |
+ * ------------
+ * 
+ * A is an 32 bit integer representing the length - in bytes - of TYPE. TYPE
+ * is a byte array representing the string of the response type (e.g.
+ * "OK").
+ */
 public class Response implements Message {
 
     // TODO add parameter support to provide more information in some of these types.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/common/command/src/test/java/com/redhat/thermostat/common/command/MessageDecoderTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.common.command;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.junit.Test;
+
+public class MessageDecoderTest {
+
+    private static final byte[] GOOD_MSG = new byte[] {
+        0x00, 0x00, 0x00, 0x02, 0x4f, 0x4b
+    };
+    
+    private static final byte[] BAD_MSG = new byte[] {
+        0x0b, 0x0e, 0x0e, 0x0f
+    };
+    
+    @Test
+    public void canDecodeGoodMessage() throws Exception {
+        MessageDecoder decoder = new DummyMessageDecoder();
+        // Sanity check
+        assertFalse(decoder.exceptionCaught);
+        MessageEvent e = mock(MessageEvent.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        ChannelBuffer buffer = ChannelBuffers.copiedBuffer(GOOD_MSG);
+        when(e.getMessage()).thenReturn(buffer);
+        when(ctx.getChannel()).thenReturn(channel);
+        decoder.handleUpstream(ctx, e);
+        assertFalse(decoder.exceptionCaught);
+    }
+    
+    @Test
+    public void decodingBadMessageThrowsException() throws Exception {
+        MessageDecoder decoder = new DummyMessageDecoder();
+        // Sanity check
+        assertFalse(decoder.exceptionCaught);
+        MessageEvent e = mock(MessageEvent.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        ChannelBuffer buffer = ChannelBuffers.copiedBuffer(BAD_MSG);
+        when(e.getMessage()).thenReturn(buffer);
+        when(ctx.getChannel()).thenReturn(channel);
+        decoder.handleUpstream(ctx, e);
+        assertTrue(decoder.exceptionCaught);
+    }
+    
+    private static class DummyMessageDecoder extends MessageDecoder {
+
+        @Override
+        protected Message decode(Channel channel, ChannelBuffer msg)
+                throws InvalidMessageException {
+            if (msg.readInt() == 0x0b0e0e0f) {
+                throw new InvalidMessageException("Burn, baby, burn!");
+            }
+            return new Message() {
+
+                @Override
+                public MessageType getType() {
+                    return new MessageType() {};
+                }
+                
+            };
+        }
+        
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/common/command/src/test/java/com/redhat/thermostat/common/command/MessagesTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.common.command;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+import com.redhat.thermostat.common.command.Request.RequestType;
+import com.redhat.thermostat.common.command.Response.ResponseType;
+
+public class MessagesTest {
+    
+    @Test
+    public void testRequestsEqual() {
+        // self
+        Request req1 = new Request(RequestType.RESPONSE_EXPECTED, null);
+        assertTrue(Messages.equal(req1, req1));
+        req1.setParameter("test", "blah");
+        assertTrue(Messages.equal(req1, req1));
+        
+        req1 = new Request(RequestType.RESPONSE_EXPECTED, null);
+        // basics
+        assertFalse(Messages.equal((Request)null, (Request)null));
+        assertFalse(Messages.equal(req1, null));
+        assertFalse(Messages.equal(req1, new Request(RequestType.MULTIPART_RESPONSE_EXPECTED, null)));
+        
+        Request req2 = new Request(RequestType.RESPONSE_EXPECTED, null);
+        String receiverClassName = "com.example.receivers.MyReceiver";
+        req1.setReceiver(receiverClassName);
+        req2.setReceiver(receiverClassName);
+        // receivers are parameters
+        assertTrue(Messages.equal(req1, req2));
+        
+        // add parameters
+        req1.setParameter("fluff", "foo");
+        req2.setParameter("fluff", "foo");
+        assertTrue(Messages.equal(req1, req1));
+        
+        // one key is different
+        req2.setParameter("test", "false");
+        assertFalse(Messages.equal(req1, req2));
+        
+        req1.setParameter("test", "false");
+        assertTrue(Messages.equal(req1, req2));
+        req2.setParameter("test", "true");
+        assertFalse(Messages.equal(req1, req2));
+    }
+    
+    @Test
+    public void testResponsesEqual() {
+        Response r = new Response(ResponseType.NOK);
+        assertTrue(Messages.equal(r, r));
+        Response r2 = new Response(ResponseType.NOK);
+        assertTrue(Messages.equal(r, r2));
+        Response r3 = new Response(ResponseType.OK);
+        assertFalse(Messages.equal(r2, r3));
+        assertFalse(Messages.equal(r, r3));
+    }
+}
--- a/common/command/src/test/java/com/redhat/thermostat/common/command/ResponseTest.java	Mon Dec 17 16:37:43 2012 +0100
+++ b/common/command/src/test/java/com/redhat/thermostat/common/command/ResponseTest.java	Fri Dec 14 16:06:04 2012 +0100
@@ -40,7 +40,6 @@
 
 import org.junit.Test;
 
-import com.redhat.thermostat.common.command.Response;
 import com.redhat.thermostat.common.command.Response.ResponseType;
 
 public class ResponseTest {