Mercurial > hg > release > thermostat-0.5
changeset 864:8e16f9b4ef38
Simplify how encoders/decoders are implemented in cmd-channel.
Earlier *Encoder classes were using the IMHO rather un-intuitive
writeRequested() state. I've added some code to MessageEncoder/MessageDecoder
so that they are DownStreamHandlers/UpstreamHandlers and call an abstract
encode/decode method. Encoders/Decoders implement this encode/decode method.
This makes things a little bit easier to understand (IMHO).
I've also added a Messages utility class which helps for equal-comparison of
Requests/Responses. This helped for testing *Encoder/*Decoder classes, which
are also easier testable now. Not all of them have had tests in the first place
;-)
While I was doing all that I realized that there was no high-level description
of how things are sent over the netty channel, so I've added that too. See
javadoc of Request/Response. This should make it easier if one has to go back
understanding things again after not touching the code for months ;-)
Another reason why I've refactored this is because it makes putting in SSL
support for the commmand-channel easier.
Reviewed-by: vanaltj
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-December/004716.html
line wrap: on
line diff
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,30 +36,42 @@ package com.redhat.thermostat.agent.command.internal; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import com.redhat.thermostat.common.command.DecodingHelper; +import com.redhat.thermostat.common.command.InvalidMessageException; +import com.redhat.thermostat.common.command.Message; import com.redhat.thermostat.common.command.MessageDecoder; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Request.RequestType; +import com.redhat.thermostat.common.utils.LoggingUtils; class RequestDecoder extends MessageDecoder { + + private static final Logger logger = LoggingUtils.getLogger(RequestDecoder.class); + /* + * See the javadoc of Request for a description of the encoding. + */ @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer) { + protected Message decode(Channel channel, ChannelBuffer msg) throws InvalidMessageException { + logger.log(Level.FINEST, "agent: decoding Request object"); + ChannelBuffer buffer = (ChannelBuffer) msg; buffer.markReaderIndex(); String typeAsString = DecodingHelper.decodeString(buffer); if (typeAsString == null) { buffer.resetReaderIndex(); - return null; + throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(buffer)); } Request request = new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress()); if (!DecodingHelper.decodeParameters(buffer, request)) { buffer.resetReaderIndex(); - return null; + throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(buffer)); } return request; }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,23 +36,34 @@ package com.redhat.thermostat.agent.command.internal; +import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; + +import java.util.logging.Level; +import java.util.logging.Logger; + import org.jboss.netty.buffer.ChannelBuffer; -import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; import com.redhat.thermostat.common.command.EncodingHelper; +import com.redhat.thermostat.common.command.Message; import com.redhat.thermostat.common.command.MessageEncoder; import com.redhat.thermostat.common.command.Response; +import com.redhat.thermostat.common.utils.LoggingUtils; class ResponseEncoder extends MessageEncoder { + private static final Logger logger = LoggingUtils.getLogger(ResponseEncoder.class); + + /* + * See javadoc of Response for a description of the encoding. + */ @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { - - Response response = (Response) e.getMessage(); + protected ChannelBuffer encode(Message msg) { + // At this point we are only getting Messages. Since our only + // registered MessageEncoder is the one for Responses a cast + // to Response should be safe. + logger.log(Level.FINEST, "agent: encoding Response object"); + Response response = (Response) msg; // Response Type String responseType = EncodingHelper.trimType(response.getType().toString()); @@ -60,8 +71,7 @@ // Compose the full message. ChannelBuffer buf = wrappedBuffer(typeBuffer); - Channels.write(ctx, e.getFuture(), buf); - + return buf; } } \ No newline at end of file
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java Mon Dec 17 16:37:43 2012 +0100 +++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,6 +36,7 @@ package com.redhat.thermostat.agent.command.internal; +import java.net.SocketAddress; import java.util.Collection; import org.jboss.netty.buffer.ChannelBuffer; @@ -45,6 +46,9 @@ import org.junit.Test; import com.redhat.thermostat.agent.command.internal.RequestDecoder; +import com.redhat.thermostat.common.command.InvalidMessageException; +import com.redhat.thermostat.common.command.Message; +import com.redhat.thermostat.common.command.Messages; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Request.RequestType; @@ -53,6 +57,45 @@ import static org.mockito.Mockito.mock; public class RequestDecoderTest { + + /* + * This is serialized format for + * req = new Request(RequestType.RESPONSE_EXPECTED, blah) + * req.setParameter("param1", "value1"); + * req.setParameter("param2", "value2"); + */ + private static final byte[] ENCODED_REQEUEST_WITH_PARAMS = new byte[] { + 0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45, + 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x06, 0x70, 0x61, 0x72, + 0x61, 0x6d, 0x31, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x31, 0x00, 0x00, 0x00, + 0x06, 0x00, 0x00, 0x00, 0x06, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x32, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x32 + }; + + /* + * This is serialized format for + * req = new Request(RequestType.RESPONSE_EXPECTED, blah) + */ + private static final byte[] ENCODED_REQUEST_WITH_NO_PARAMS = new byte[] { + 0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45, + 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, 0x00, 0x00, 0x00, + 0x00 + }; + + private static final byte[][] GARBAGE_AS_REQUEST = new byte[][] { + // general garbage + { 0x0d, 0x0b, 0x0e, 0x0e, 0x0f }, + // first two bytes are broken + { 0x0f, 0x0d, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, + 0x45, 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, + 0x00, 0x00, 0x00, 0x00 }, + // last byte indicates params, which are missing + { 0x00, 0x00, 0x00, 0x11, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, + 0x45, 0x5f, 0x45, 0x58, 0x50, 0x45, 0x43, 0x54, 0x45, 0x44, + 0x00, 0x00, 0x00, 0x0f } }; + + private static final byte[] TYPE = RequestType.RESPONSE_EXPECTED.toString().getBytes(); private Channel channel; @@ -65,18 +108,18 @@ } @Test - public void testDecode() { + public void testDecode() throws InvalidMessageException { ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); buffer.writeInt(TYPE.length); buffer.writeBytes(TYPE); - Request request = (Request) decoder.decode(null, channel, buffer); + Request request = (Request) decoder.decode(channel, buffer); assertTrue(RequestType.RESPONSE_EXPECTED == (RequestType) request.getType()); } @Test - public void testDecodeWithParameters() { + public void testDecodeWithParameters() throws InvalidMessageException { String parmName = "parameter"; String parmValue = "hello"; ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); @@ -88,7 +131,7 @@ buffer.writeBytes(parmName.getBytes()); buffer.writeBytes(parmValue.getBytes()); - Request request = (Request) decoder.decode(null, channel, buffer); + Request request = (Request) decoder.decode(channel, buffer); Collection<String> parmNames = request.getParameterNames(); assertEquals(1, parmNames.size()); @@ -96,4 +139,40 @@ String decodedValue = request.getParameter(parmName); assertEquals(parmValue, decodedValue); } + + @Test + public void testDecodeWithParametersFromBytesArray() throws InvalidMessageException { + ChannelBuffer buffer = ChannelBuffers.copiedBuffer(ENCODED_REQEUEST_WITH_PARAMS); + Request expected = new Request(RequestType.RESPONSE_EXPECTED, null); + expected.setParameter("param1", "value1"); + expected.setParameter("param2", "value2"); + Message actual = new RequestDecoder().decode(channel, buffer); + assertTrue(actual instanceof Request); + assertTrue(Messages.equal(expected, (Request)actual)); + SocketAddress addr = mock(SocketAddress.class); + buffer = ChannelBuffers.copiedBuffer(ENCODED_REQUEST_WITH_NO_PARAMS); + expected = new Request(RequestType.RESPONSE_EXPECTED, addr); + actual = new RequestDecoder().decode(channel, buffer); + assertTrue(actual instanceof Request); + assertTrue(Messages.equal(expected, (Request)actual)); + } + + @Test + public void decodingOfGarbageThrowsException() + throws InvalidMessageException { + int expectedFailures = GARBAGE_AS_REQUEST.length; + int actualFailures = 0; + for (int i = 0; i < GARBAGE_AS_REQUEST.length; i++) { + ChannelBuffer buffer = ChannelBuffers + .copiedBuffer(GARBAGE_AS_REQUEST[0]); + RequestDecoder decoder = new RequestDecoder(); + try { + decoder.decode(channel, buffer); + } catch (InvalidMessageException e) { + // pass + actualFailures++; + } + } + assertEquals(expectedFailures, actualFailures); + } }
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/ResponseEncoderTest.java Mon Dec 17 16:37:43 2012 +0100 +++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/ResponseEncoderTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,63 +36,41 @@ package com.redhat.thermostat.agent.command.internal; -import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; + +import java.nio.charset.Charset; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.junit.Before; +import org.jboss.netty.buffer.ChannelBuffers; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import com.redhat.thermostat.agent.command.internal.ResponseEncoder; import com.redhat.thermostat.common.command.Response; import com.redhat.thermostat.common.command.Response.ResponseType; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Channels.class) public class ResponseEncoderTest { - private MessageEvent e; - - @Before - public void setUp() { - Response r = new Response(ResponseType.OK); - e = mock(MessageEvent.class); - when(e.getMessage()).thenReturn(r); - when(e.getFuture()).thenReturn(null); - - } - + private static final boolean DEBUG = false; + @Test - public void testWriteRequested() { - PowerMockito.mockStatic(Channels.class); - ArgumentCaptor<Object> argument = ArgumentCaptor.forClass(Object.class); - + public void testEncode() throws Exception { ResponseEncoder encoder = new ResponseEncoder(); - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - encoder.writeRequested(ctx, e); - - PowerMockito.verifyStatic(); - Channels.write(any(ChannelHandlerContext.class), any(ChannelFuture.class), argument.capture()); - - ChannelBuffer buf = (ChannelBuffer) argument.getValue(); - int messageLength = buf.readInt(); - assertEquals(2, messageLength); - ByteBuffer bbuf = ByteBuffer.allocate(buf.readableBytes()); - buf.readBytes(bbuf); - String message = new String(bbuf.array()); - assertEquals("OK", message); + String responseExp = "OK"; + ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset()); + ChannelBuffer buf = ChannelBuffers.buffer(4); + buf.writeInt(responseExp.getBytes().length); + ChannelBuffer expected = ChannelBuffers.wrappedBuffer(buf, stringBuf); + Response ok = new Response(ResponseType.OK); + ChannelBuffer actual = (ChannelBuffer)encoder.encode(ok); + if (DEBUG) { + printBuffers(actual, expected); + } + assertEquals(0, ChannelBuffers.compare(expected, actual)); + } + + private void printBuffers(ChannelBuffer actual, ChannelBuffer expected) { + System.out.println("hexdump expected\n-------------------------------------"); + System.out.println(ChannelBuffers.hexDump(expected)); + System.out.println("\nhexdump actual\n-------------------------------------"); + System.out.println(ChannelBuffers.hexDump(actual) + "\n\n"); } }
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -44,12 +44,9 @@ import java.util.logging.Logger; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; import com.redhat.thermostat.common.command.EncodingHelper; +import com.redhat.thermostat.common.command.Message; import com.redhat.thermostat.common.command.MessageEncoder; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.utils.LoggingUtils; @@ -57,36 +54,36 @@ class RequestEncoder extends MessageEncoder { private static final Logger logger = LoggingUtils.getLogger(RequestEncoder.class); - + + /* + * See the javadoc of Request for a description of the encoding. + */ @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { - - Request request = (Request) e.getMessage(); + protected ChannelBuffer encode(Message msg) { + // At this point we are only getting Messages. Since our only + // registered MessageEncoder is the one for Requests a cast + // to Request should be safe. + Request request = (Request) msg; + logger.log(Level.FINEST, "encoding Request object"); // Request Type - String requestType = EncodingHelper.trimType(request.getType().toString()); + String requestType = EncodingHelper.trimType(request.getType() + .toString()); ChannelBuffer typeBuffer = EncodingHelper.encode(requestType); // Parameters - // TODO: if in practice parms take up more than 256 bytes, use appropriate - // dynamicBuffer() variant to specify initial/estimated capacity. + // TODO: if in practice parms take up more than 256 bytes, use + // appropriate dynamicBuffer() variant to specify initial/estimated capacity. ChannelBuffer parmsBuffer = dynamicBuffer(); Collection<String> parmNames = request.getParameterNames(); parmsBuffer.writeInt(parmNames.size()); for (String parmName : parmNames) { - EncodingHelper.encode(parmName, request.getParameter(parmName), parmsBuffer); + EncodingHelper.encode(parmName, request.getParameter(parmName), + parmsBuffer); } - // Compose the full message. ChannelBuffer buf = wrappedBuffer(typeBuffer, parmsBuffer); - Channels.write(ctx, e.getFuture(), buf); - } - - // This must be implemented, even though we are simply passing on the exception. If - // not implemented, this exception ends up going uncaught which causes problems. - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - logger.log(Level.FINE, "Forwarding exception ", e.getCause()); - Channels.fireExceptionCaught(ctx, e.getCause()); + // Just return the channel buffer which is our encoded message + return buf; } } \ No newline at end of file
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/ResponseDecoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/ResponseDecoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -37,22 +37,26 @@ package com.redhat.thermostat.client.command.internal; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import com.redhat.thermostat.common.command.DecodingHelper; +import com.redhat.thermostat.common.command.InvalidMessageException; +import com.redhat.thermostat.common.command.Message; import com.redhat.thermostat.common.command.MessageDecoder; import com.redhat.thermostat.common.command.Response; import com.redhat.thermostat.common.command.Response.ResponseType; class ResponseDecoder extends MessageDecoder { + /* + * See javadoc of Response for a description of the encoding. + */ @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer) { - String typeAsString = DecodingHelper.decodeString(buffer); + protected Message decode(Channel channel, ChannelBuffer originalMessage) throws InvalidMessageException { + String typeAsString = DecodingHelper.decodeString(originalMessage); if (typeAsString == null) { - return null; + throw new InvalidMessageException("Could not decode message: " + ChannelBuffers.hexDump(originalMessage)); } return new Response(ResponseType.valueOf(typeAsString)); }
--- a/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestEncoderTest.java Mon Dec 17 16:37:43 2012 +0100 +++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/RequestEncoderTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,34 +36,99 @@ package com.redhat.thermostat.client.command.internal; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Matchers.isA; + +import java.net.SocketAddress; +import java.nio.charset.Charset; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.junit.Test; +import com.redhat.thermostat.common.command.Request; +import com.redhat.thermostat.common.command.Request.RequestType; + public class RequestEncoderTest { - /** - * This test verifies that exception events are forwarded upstream. This is to ensure that - * fireComplete() events get delivered in case of Exceptions. - * - * @throws Exception - */ + private static final boolean DEBUG = false; + @Test - public void exceptionCaughtCallsFireComplete() throws Exception { + public void canEncodeSimpleRequestWithNoParams() throws Exception { + RequestEncoder encoder = new RequestEncoder(); + String responseExp = "RESPONSE_EXPECTED"; + ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset()); + ChannelBuffer buf = ChannelBuffers.buffer(4); + buf.writeInt(responseExp.getBytes().length); + ChannelBuffer buf2 = ChannelBuffers.wrappedBuffer(buf, stringBuf); + buf = ChannelBuffers.buffer(4); + buf.writeInt(0); + ChannelBuffer expected = ChannelBuffers.wrappedBuffer(buf2, buf); + SocketAddress addr = mock(SocketAddress.class); + Request item = new Request(RequestType.RESPONSE_EXPECTED, addr); + ChannelBuffer actual = encoder.encode(item); + if (DEBUG) { + printBuffers(actual, expected); + } + assertEquals(0, ChannelBuffers.compare(expected, actual)); + } + + @Test + public void canEncodeRequestWithParams() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + + // Prepare request we'd like to encode + Request item = new Request(RequestType.RESPONSE_EXPECTED, addr); + String param1Name = "param1"; + String param1Value = "value1"; + String param2Name = "param2"; + String param2Value = "value2"; + item.setParameter(param1Name, param1Value); + item.setParameter(param2Name, param2Value); RequestEncoder encoder = new RequestEncoder(); - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - ExceptionEvent evt = mock(ExceptionEvent.class); - when(evt.getCause()).thenReturn(new Exception()); - Channel channel = mock(Channel.class); - when(ctx.getChannel()).thenReturn(channel); - encoder.exceptionCaught(ctx, evt); - // Channels.fireExceptionCaught implicitly calls this - verify(ctx).sendUpstream(isA(ExceptionEvent.class)); + + // build expected + String responseExp = "RESPONSE_EXPECTED"; + ChannelBuffer stringBuf = ChannelBuffers.copiedBuffer(responseExp, Charset.defaultCharset()); + ChannelBuffer buf = ChannelBuffers.buffer(4); + buf.writeInt(responseExp.getBytes().length); + ChannelBuffer buf2 = ChannelBuffers.wrappedBuffer(buf, stringBuf); + buf = ChannelBuffers.buffer(4); + buf.writeInt(2); + ChannelBuffer request = ChannelBuffers.wrappedBuffer(buf2, buf); + ChannelBuffer nameLen = ChannelBuffers.buffer(4); + nameLen.writeInt(param1Name.getBytes().length); + ChannelBuffer valueLen = ChannelBuffers.buffer(4); + valueLen.writeInt(param1Value.getBytes().length); + ChannelBuffer lens = ChannelBuffers.wrappedBuffer(nameLen, valueLen); + ChannelBuffer nameBuf = ChannelBuffers.copiedBuffer(param1Name, Charset.defaultCharset()); + ChannelBuffer valueBuf = ChannelBuffers.copiedBuffer(param1Value, Charset.defaultCharset()); + ChannelBuffer payload = ChannelBuffers.wrappedBuffer(nameBuf, valueBuf); + ChannelBuffer param1Buf = ChannelBuffers.wrappedBuffer(lens, payload); + nameLen = ChannelBuffers.buffer(4); + nameLen.writeInt(param2Name.getBytes().length); + valueLen = ChannelBuffers.buffer(4); + valueLen.writeInt(param2Value.getBytes().length); + lens = ChannelBuffers.wrappedBuffer(nameLen, valueLen); + nameBuf = ChannelBuffers.copiedBuffer(param2Name, Charset.defaultCharset()); + valueBuf = ChannelBuffers.copiedBuffer(param2Value, Charset.defaultCharset()); + payload = ChannelBuffers.wrappedBuffer(nameBuf, valueBuf); + ChannelBuffer param2Buf = ChannelBuffers.wrappedBuffer(lens, payload); + ChannelBuffer params = ChannelBuffers.wrappedBuffer(param1Buf, param2Buf); + ChannelBuffer expected = ChannelBuffers.wrappedBuffer(request, params); + + // Encode item for actual + ChannelBuffer actual = encoder.encode(item); + if (DEBUG) { + printBuffers(actual, expected); + } + assertEquals(0, ChannelBuffers.compare(expected, actual)); + } + + private void printBuffers(ChannelBuffer actual, ChannelBuffer expected) { + System.out.println("hexdump expected\n-------------------------------------"); + System.out.println(ChannelBuffers.hexDump(expected)); + System.out.println("\nhexdump actual\n-------------------------------------"); + System.out.println(ChannelBuffers.hexDump(actual) + "\n\n"); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/client/command/src/test/java/com/redhat/thermostat/client/command/internal/ResponseDecoderTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -0,0 +1,83 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.client.command.internal; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.junit.Test; + +import com.redhat.thermostat.common.command.InvalidMessageException; +import com.redhat.thermostat.common.command.Message; +import com.redhat.thermostat.common.command.Messages; +import com.redhat.thermostat.common.command.Response; +import com.redhat.thermostat.common.command.Response.ResponseType; + +public class ResponseDecoderTest { + + private static final byte[] ENCODED_OK_RESP = new byte[] { + 0x00, 0x00, 0x00, 0x02, 0x4f, 0x4b + }; + + private static final byte[] GARBAGE_AS_RESPONSE = new byte[] { + 0x0d, 0x0b, 0x0e, 0x0e, 0x0f + }; + + @Test + public void testDecode() throws InvalidMessageException { + ChannelBuffer buffer = ChannelBuffers.copiedBuffer(ENCODED_OK_RESP); + Response expected = new Response(ResponseType.OK); + ResponseDecoder decoder = new ResponseDecoder(); + Message actual = decoder.decode(null, buffer); + assertTrue(actual instanceof Response); + assertTrue(Messages.equal(expected, (Response)actual)); + } + + @Test + public void verifyInvalidEncodingThrowsException() { + ResponseDecoder decoder = new ResponseDecoder(); + ChannelBuffer garbage = ChannelBuffers.copiedBuffer(GARBAGE_AS_RESPONSE); + try { + decoder.decode(null, garbage); + fail("Should have thrown decoding exception!"); + } catch (InvalidMessageException e) { + // pass + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/InvalidMessageException.java Fri Dec 14 16:06:04 2012 +0100 @@ -0,0 +1,49 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.common.command; + +/** + * Exception thrown when an improperly encoded {@link Message} has been received. + * + */ +@SuppressWarnings("serial") +public class InvalidMessageException extends Exception { + + public InvalidMessageException(String message) { + super(message); + } +}
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Message.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Message.java Fri Dec 14 16:06:04 2012 +0100 @@ -37,7 +37,7 @@ package com.redhat.thermostat.common.command; -interface Message { +public interface Message { interface MessageType { }
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/MessageDecoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/MessageDecoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,8 +36,64 @@ package com.redhat.thermostat.common.command; -import org.jboss.netty.handler.codec.frame.FrameDecoder; +import static org.jboss.netty.channel.Channels.fireMessageReceived; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +import com.redhat.thermostat.common.utils.LoggingUtils; + +public abstract class MessageDecoder extends SimpleChannelUpstreamHandler { + + // testing hook + boolean exceptionCaught = false; + + private static final Logger logger = LoggingUtils.getLogger(MessageDecoder.class); + + protected MessageDecoder() { + super(); + } -public abstract class MessageDecoder extends FrameDecoder { + @Override + public void handleUpstream( + ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { + if (!(evt instanceof MessageEvent)) { + ctx.sendUpstream(evt); + return; + } + MessageEvent e = (MessageEvent) evt; + Object originalMessage = e.getMessage(); + if (!(originalMessage instanceof ChannelBuffer)) { + // Skip decoding, since we've received something + // we don't know how to deal with anyway. + ctx.sendUpstream(evt); + } + Message decodedMessage = null; + try { + decodedMessage = decode(e.getChannel(), (ChannelBuffer)originalMessage); + } catch (InvalidMessageException ex) { + logger.log(Level.WARNING, "Decoding failed on received message. Possible DoS attack!", ex); + exceptionCaught = true; + } + if (decodedMessage != null) { + fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); + } + } + + /** + * Transforms the specified received message into another message and return + * the transformed message. Return {@code null} if the received message + * is supposed to be discarded. + * + * @throws InvalidMessageException If the received message was not properly encoded. + */ + protected abstract Message decode(Channel channel, ChannelBuffer msg) throws InvalidMessageException; } \ No newline at end of file
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/MessageEncoder.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/MessageEncoder.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,8 +36,48 @@ package com.redhat.thermostat.common.command; -import org.jboss.netty.channel.SimpleChannelHandler; +import static org.jboss.netty.channel.Channels.write; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelDownstreamHandler; + +public abstract class MessageEncoder extends SimpleChannelDownstreamHandler { + + protected MessageEncoder() { + super(); + } + + @Override + public void handleDownstream( + ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { + if (!(evt instanceof MessageEvent)) { + ctx.sendDownstream(evt); + return; + } -public abstract class MessageEncoder extends SimpleChannelHandler { + MessageEvent e = (MessageEvent) evt; + Object originalMessage = e.getMessage(); + // We only know how to encode Messages. Non-message types + // get sent downstream verbatim. + if (!(originalMessage instanceof Message)) { + ctx.sendDownstream(evt); + } + ChannelBuffer encodedMessage = encode((Message)originalMessage); + if (encodedMessage != null) { + write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress()); + } + } + /** + * Transforms the specified message into another message and return the + * transformed message. Note that you can not return {@code null}, unlike + * you can in + * {@link MessageDecoder#decode(org.jboss.netty.buffer.ChannelBuffer)}; you + * must return something, at least {@link ChannelBuffers#EMPTY_BUFFER}. + */ + protected abstract ChannelBuffer encode(Message originalMessage); } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Messages.java Fri Dec 14 16:06:04 2012 +0100 @@ -0,0 +1,93 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.common.command; + +import java.util.Collection; + +/** + * Helper class for comparing messages, such as {@link Request} and + * {@link Response}. + * + */ +public class Messages { + + /** + * Two requests, a and b, are considered equal if and only if they are, + * not-null, of the same type and all parameters (all keys and values) + * match. Listeners and targets are ignored. + * + * @return true if a and b are both not-null and equal. false otherwise. + */ + public static boolean equal(Request a, Request b) { + if (a == null || b == null) { + return false; + } + // type needs to be the same + if (a.getType() != b.getType()) { + return false; + } + // all parameters and values need to match + Collection<String> ourParamValues = a.getParameterNames(); + Collection<String> otherParamValues = b.getParameterNames(); + if (ourParamValues.size() != otherParamValues.size()) { + return false; + } + for (String name: ourParamValues) { + String otherParamValue = b.getParameter(name); + if (otherParamValue == null) { + // other doesn't have param which we have + return false; + } else { + // both requests contain same param name + String ourParamValue = a.getParameter(name); + if (!ourParamValue.equals(otherParamValue)) { + return false; + } + } + } + return true; + } + + /** + * Two responses are equal if and only if they are of the same type. + * + * @return true if a and b are both not-null and are equal. false otherwise. + */ + public static boolean equal(Response a, Response b) { + return a.getType() == b.getType(); + } +}
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java Fri Dec 14 16:06:04 2012 +0100 @@ -43,7 +43,39 @@ import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; - +/** + * A Request object represents a request passed from a client + * to an agent. + * + * + * Request objects are serialized over the command channel in the following + * format: + * + * ------------------------- + * | A | TYPE | B | PARAMS | + * ------------------------- + * + * A is an 32 bit integer representing the length - in bytes - of TYPE. TYPE + * is a byte array representing the string of the request type (e.g. + * "RESPONSE_EXPECTED") B is a 32 bit integer representing the number of + * request parameters which follow. + * + * PARAMS (if B > 0) is a variable length stream of the following format: + * + * It is a simple encoding of name => value pairs. + * + * ----------------------------------------------------------------------------------------------- + * | I_1 | K_1 | P_1 | V_1 | ... | I_(n-1) | K_(n-1) | P_(n-1) | V_(n-1) | I_n | K_n | P_n | V_n | + * ----------------------------------------------------------------------------------------------- + * + * I_n A 32 bit integer representing the length - in bytes - of the n'th + * parameter name. + * K_n A 32 bit integer representing the length - in bytes - of the n'th + * parameter value. + * P_n A byte array representing the string of the n'th parameter name. + * V_n A byte array representing the string of the n'th parameter value. + * + */ public class Request implements Message { public enum RequestType implements MessageType {
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Response.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Response.java Fri Dec 14 16:06:04 2012 +0100 @@ -36,6 +36,23 @@ package com.redhat.thermostat.common.command; + +/** + * A Response object represents a response message passed from an agent + * to a client. + * + * + * Response objects are serialized over the command channel in the following + * format: + * + * ------------ + * | A | TYPE | + * ------------ + * + * A is an 32 bit integer representing the length - in bytes - of TYPE. TYPE + * is a byte array representing the string of the response type (e.g. + * "OK"). + */ public class Response implements Message { // TODO add parameter support to provide more information in some of these types.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/command/src/test/java/com/redhat/thermostat/common/command/MessageDecoderTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -0,0 +1,110 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.common.command; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.junit.Test; + +public class MessageDecoderTest { + + private static final byte[] GOOD_MSG = new byte[] { + 0x00, 0x00, 0x00, 0x02, 0x4f, 0x4b + }; + + private static final byte[] BAD_MSG = new byte[] { + 0x0b, 0x0e, 0x0e, 0x0f + }; + + @Test + public void canDecodeGoodMessage() throws Exception { + MessageDecoder decoder = new DummyMessageDecoder(); + // Sanity check + assertFalse(decoder.exceptionCaught); + MessageEvent e = mock(MessageEvent.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + ChannelBuffer buffer = ChannelBuffers.copiedBuffer(GOOD_MSG); + when(e.getMessage()).thenReturn(buffer); + when(ctx.getChannel()).thenReturn(channel); + decoder.handleUpstream(ctx, e); + assertFalse(decoder.exceptionCaught); + } + + @Test + public void decodingBadMessageThrowsException() throws Exception { + MessageDecoder decoder = new DummyMessageDecoder(); + // Sanity check + assertFalse(decoder.exceptionCaught); + MessageEvent e = mock(MessageEvent.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + ChannelBuffer buffer = ChannelBuffers.copiedBuffer(BAD_MSG); + when(e.getMessage()).thenReturn(buffer); + when(ctx.getChannel()).thenReturn(channel); + decoder.handleUpstream(ctx, e); + assertTrue(decoder.exceptionCaught); + } + + private static class DummyMessageDecoder extends MessageDecoder { + + @Override + protected Message decode(Channel channel, ChannelBuffer msg) + throws InvalidMessageException { + if (msg.readInt() == 0x0b0e0e0f) { + throw new InvalidMessageException("Burn, baby, burn!"); + } + return new Message() { + + @Override + public MessageType getType() { + return new MessageType() {}; + } + + }; + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/command/src/test/java/com/redhat/thermostat/common/command/MessagesTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -0,0 +1,95 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.common.command; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +import com.redhat.thermostat.common.command.Request.RequestType; +import com.redhat.thermostat.common.command.Response.ResponseType; + +public class MessagesTest { + + @Test + public void testRequestsEqual() { + // self + Request req1 = new Request(RequestType.RESPONSE_EXPECTED, null); + assertTrue(Messages.equal(req1, req1)); + req1.setParameter("test", "blah"); + assertTrue(Messages.equal(req1, req1)); + + req1 = new Request(RequestType.RESPONSE_EXPECTED, null); + // basics + assertFalse(Messages.equal((Request)null, (Request)null)); + assertFalse(Messages.equal(req1, null)); + assertFalse(Messages.equal(req1, new Request(RequestType.MULTIPART_RESPONSE_EXPECTED, null))); + + Request req2 = new Request(RequestType.RESPONSE_EXPECTED, null); + String receiverClassName = "com.example.receivers.MyReceiver"; + req1.setReceiver(receiverClassName); + req2.setReceiver(receiverClassName); + // receivers are parameters + assertTrue(Messages.equal(req1, req2)); + + // add parameters + req1.setParameter("fluff", "foo"); + req2.setParameter("fluff", "foo"); + assertTrue(Messages.equal(req1, req1)); + + // one key is different + req2.setParameter("test", "false"); + assertFalse(Messages.equal(req1, req2)); + + req1.setParameter("test", "false"); + assertTrue(Messages.equal(req1, req2)); + req2.setParameter("test", "true"); + assertFalse(Messages.equal(req1, req2)); + } + + @Test + public void testResponsesEqual() { + Response r = new Response(ResponseType.NOK); + assertTrue(Messages.equal(r, r)); + Response r2 = new Response(ResponseType.NOK); + assertTrue(Messages.equal(r, r2)); + Response r3 = new Response(ResponseType.OK); + assertFalse(Messages.equal(r2, r3)); + assertFalse(Messages.equal(r, r3)); + } +}
--- a/common/command/src/test/java/com/redhat/thermostat/common/command/ResponseTest.java Mon Dec 17 16:37:43 2012 +0100 +++ b/common/command/src/test/java/com/redhat/thermostat/common/command/ResponseTest.java Fri Dec 14 16:06:04 2012 +0100 @@ -40,7 +40,6 @@ import org.junit.Test; -import com.redhat.thermostat.common.command.Response; import com.redhat.thermostat.common.command.Response.ResponseType; public class ResponseTest {