changeset 539:c12e371abba5

Add support for parameters in requests. This is pretty vital for making the command channel useful. To avoid complexity for encoding/decoding requests, parameter keys and values must be strings. reviewed-by: sgehwolf review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-August/002681.html PR924
author Jon VanAlten <jon.vanalten@redhat.com>
date Thu, 16 Aug 2012 17:57:23 -0400
parents 896613f76b83
children 1b323d4b416f
files agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ConfigurationServerImpl.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ServerHandler.java agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java common/command/src/main/java/com/redhat/thermostat/common/command/DecodingHelper.java common/command/src/main/java/com/redhat/thermostat/common/command/EncodingHelper.java common/command/src/main/java/com/redhat/thermostat/common/command/Request.java common/command/src/test/java/com/redhat/thermostat/common/command/EncodingHelperTest.java
diffstat 11 files changed, 137 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ConfigurationServerImpl.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ConfigurationServerImpl.java	Thu Aug 16 17:57:23 2012 -0400
@@ -62,6 +62,7 @@
     @Override
     public void stopListening() {
         ctx.getChannelGroup().close().awaitUninterruptibly();
+        ctx.getChannelGroup().clear();
         ctx.getBootstrap().releaseExternalResources();
     }
 }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java	Thu Aug 16 17:57:23 2012 -0400
@@ -50,11 +50,18 @@
     @Override
     protected Object decode(ChannelHandlerContext ctx, Channel channel,
             ChannelBuffer buffer) {
+        buffer.markReaderIndex();
         String typeAsString = DecodingHelper.decodeString(buffer);
         if (typeAsString == null) {
+            buffer.resetReaderIndex();
             return null;
         }
-        return new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress());
+        Request request = new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress());
+        if (!DecodingHelper.decodeParameters(buffer, request)) {
+            buffer.resetReaderIndex();
+            return null;
+        }
+        return request;
     }
 
 }
\ No newline at end of file
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java	Thu Aug 16 17:57:23 2012 -0400
@@ -56,8 +56,7 @@
 
         // Response Type
         String responseType = EncodingHelper.trimType(response.getType().toString());
-        byte[] message = responseType.getBytes();
-        ChannelBuffer typeBuffer = EncodingHelper.encode(message);
+        ChannelBuffer typeBuffer = EncodingHelper.encode(responseType);
 
         // Compose the full message.
         ChannelBuffer buf = wrappedBuffer(typeBuffer);
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ServerHandler.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ServerHandler.java	Thu Aug 16 17:57:23 2012 -0400
@@ -65,7 +65,6 @@
         if (channel.isConnected()) {
             logger.info("Sending response: " + response.getType().toString());
             ChannelFuture f = channel.write(response);
-
             f.addListener(ChannelFutureListener.CLOSE);
         } else {
             logger.warning("Channel not connected.");
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java	Thu Aug 16 17:57:23 2012 -0400
@@ -36,6 +36,8 @@
 
 package com.redhat.thermostat.agent.command.internal;
 
+import java.util.Collection;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -46,29 +48,52 @@
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Request.RequestType;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class RequestDecoderTest {
     private static final byte[] PING = "PING".getBytes();
 
-    private ChannelBuffer buffer;
     private Channel channel;
+    private RequestDecoder decoder;
 
     @Before
     public void setUp() {
         channel = mock(Channel.class);
-        
-        buffer = ChannelBuffers.dynamicBuffer();
-        buffer.writeInt(PING.length);
-        buffer.writeBytes(PING);
+        decoder = new RequestDecoder();
     }
 
     @Test
     public void testDecode() {
-        RequestDecoder decoder = new RequestDecoder();
+        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+        buffer.writeInt(PING.length);
+        buffer.writeBytes(PING);
+
         Request request = (Request) decoder.decode(null, channel, buffer);
 
         assertTrue(RequestType.PING == (RequestType) request.getType());
     }
+
+    @Test
+    public void testDecodeWithParameters() {
+        String parmName = "parameter";
+        String parmValue = "hello";
+        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+        buffer.writeInt(PING.length);
+        buffer.writeBytes(PING);
+        buffer.writeInt(1);
+        buffer.writeInt(parmName.getBytes().length);
+        buffer.writeInt(parmValue.getBytes().length);
+        buffer.writeBytes(parmName.getBytes());
+        buffer.writeBytes(parmValue.getBytes());
+        
+        Request request = (Request) decoder.decode(null, channel, buffer);
+        Collection<String> parmNames = request.getParameterNames();
+
+        assertEquals(1, parmNames.size());
+        assertTrue(parmNames.contains(parmName));
+        String decodedValue = request.getParameter(parmName);
+        assertEquals(parmValue, decodedValue);
+    }
 }
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java	Thu Aug 16 17:57:23 2012 -0400
@@ -36,8 +36,9 @@
 
 package com.redhat.thermostat.client.command.internal;
 
+import java.util.Collection;
+
 import org.jboss.netty.buffer.ChannelBuffer;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.MessageEvent;
@@ -46,6 +47,8 @@
 import com.redhat.thermostat.common.command.MessageEncoder;
 import com.redhat.thermostat.common.command.Request;
 
+import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 
 class RequestEncoder extends MessageEncoder {
 
@@ -56,11 +59,20 @@
 
         // Request Type
         String requestType = EncodingHelper.trimType(request.getType().toString());
-        byte[] message = requestType.getBytes();
-        ChannelBuffer typeBuffer = EncodingHelper.encode(message);
+        ChannelBuffer typeBuffer = EncodingHelper.encode(requestType);
+
+        // Parameters
+        // TODO: if in practice parms take up more than 256 bytes, use appropriate
+        // dynamicBuffer() variant to specify initial/estimated capacity.
+        ChannelBuffer parmsBuffer = dynamicBuffer();
+        Collection<String> parmNames = request.getParameterNames();
+        parmsBuffer.writeInt(parmNames.size());
+        for (String parmName : parmNames) {
+            EncodingHelper.encode(parmName, request.getParameter(parmName), parmsBuffer);
+        }
 
         // Compose the full message.
-        ChannelBuffer buf = wrappedBuffer(typeBuffer);
+        ChannelBuffer buf = wrappedBuffer(typeBuffer, parmsBuffer);
         Channels.write(ctx, e.getFuture(), buf);
     }
 
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java	Thu Aug 16 17:57:23 2012 -0400
@@ -91,6 +91,7 @@
             while (processing) {
                 Request request = null;
                 try {
+                    // This will block until available (or interrupted).
                     request = queue.take();
                 } catch (InterruptedException e) {
                     if (Thread.interrupted()) {
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/DecodingHelper.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/DecodingHelper.java	Thu Aug 16 17:57:23 2012 -0400
@@ -44,14 +44,52 @@
         if (buffer.readableBytes() < 4) {
             return null;
         }
-        buffer.markReaderIndex();
         int length = buffer.readInt();
+        return decodeString(length, buffer);
+    }
+
+    public static boolean decodeParameters(ChannelBuffer buffer, Request request) {
+        int bytesLeft = buffer.readableBytes();
+        if (bytesLeft == 0) {
+            // Exactly zero parameters in this request.
+            return true;
+        }
+        if (bytesLeft < 4) {
+            // Bad encoding or some stream issue.
+            return false;
+        }
+        int numParms = buffer.readInt();
+        for (int i = 0; i < numParms; i++) {
+            if (!decodeParameter(buffer, request)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean decodeParameter(ChannelBuffer buffer, Request request) {
+        if (buffer.readableBytes() < 8) {
+            return false;
+        }
+        int nameLength = buffer.readInt();
+        int valueLength = buffer.readInt();
+        String name = decodeString(nameLength, buffer);
+        if (name == null) {
+            return false;
+        }
+        String value = decodeString(valueLength, buffer);
+        if (value == null) {
+            return false;
+        }
+        request.setParameter(name, value);
+        return true;
+    }
+
+    private static String decodeString(int length, ChannelBuffer buffer) {
         if (buffer.readableBytes() < length) {
-            buffer.resetReaderIndex();
             return null;
         }
         byte[] stringBytes = buffer.readBytes(length).array();
         return new String(stringBytes);
     }
-
 }
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/EncodingHelper.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/EncodingHelper.java	Thu Aug 16 17:57:23 2012 -0400
@@ -41,7 +41,21 @@
 
 public class EncodingHelper {
 
-    public static ChannelBuffer encode(byte[] message) {
+    public static void encode(String name, String value, ChannelBuffer dynamicBuffer) {
+        byte[] nameBytes = name.getBytes();
+        byte[] valueBytes = value.getBytes();
+        dynamicBuffer.writeInt(nameBytes.length);
+        dynamicBuffer.writeInt(valueBytes.length);
+        dynamicBuffer.writeBytes(nameBytes);
+        dynamicBuffer.writeBytes(valueBytes);
+    }
+
+    public static ChannelBuffer encode(String message) {
+        byte[] messageBytes = message.getBytes();
+        return encode(messageBytes);
+    }
+    
+    private static ChannelBuffer encode(byte[] message) {
         ChannelBuffer buf = buffer(4 + message.length);
         buf.writeInt(message.length);
         buf.writeBytes(message);
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java	Thu Aug 16 17:57:23 2012 -0400
@@ -39,6 +39,8 @@
 import java.net.SocketAddress;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 
@@ -49,11 +51,13 @@
     }
 
     private final RequestType type;
+    private final Map<String, String> parameters;
     private final SocketAddress target;
     private final Collection<RequestResponseListener> listeners;
- 
+
     public Request(RequestType type, SocketAddress target) {
         this.type = type;
+        parameters = new TreeMap<>();
         this.target = target;
         listeners = new CopyOnWriteArrayList<>();
     }
@@ -63,6 +67,18 @@
         return type;
     }
 
+    public void setParameter(String name, String value) {
+        parameters.put(name, value);
+    }
+
+    public String getParameter(String name) {
+        return parameters.get(name);
+    }
+
+    public Collection<String> getParameterNames() {
+        return parameters.keySet();
+    }
+
     public SocketAddress getTarget() {
         return target;
     }
--- a/common/command/src/test/java/com/redhat/thermostat/common/command/EncodingHelperTest.java	Thu Aug 16 17:57:04 2012 -0400
+++ b/common/command/src/test/java/com/redhat/thermostat/common/command/EncodingHelperTest.java	Thu Aug 16 17:57:23 2012 -0400
@@ -49,16 +49,17 @@
 
     @Test
     public void testEncode() {
-        byte[] input = "a test string".getBytes();
+        String input = "a test string";
+        byte[] inputBytes = input.getBytes();
         ChannelBuffer buf = EncodingHelper.encode(input);
         int encodedMessageLength = buf.readInt();
-        assertEquals(input.length, encodedMessageLength);
+        assertEquals(inputBytes.length, encodedMessageLength);
         ByteBuffer bbuf = ByteBuffer.allocate(buf.readableBytes());
         buf.readBytes(bbuf);
         byte[] output = bbuf.array();
-        assertEquals(input.length, output.length);
-        for (int i = 0; i < input.length; i++) {
-            assertEquals(input[i], output[i]);
+        assertEquals(inputBytes.length, output.length);
+        for (int i = 0; i < inputBytes.length; i++) {
+            assertEquals(inputBytes[i], output[i]);
         }
     }