Mercurial > hg > release > thermostat-0.4
changeset 539:c12e371abba5
Add support for parameters in requests.
This is pretty vital for making the command channel useful. To avoid complexity
for encoding/decoding requests, parameter keys and values must be strings.
reviewed-by: sgehwolf
review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-August/002681.html
PR924
line wrap: on
line diff
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ConfigurationServerImpl.java Thu Aug 16 17:57:04 2012 -0400 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ConfigurationServerImpl.java Thu Aug 16 17:57:23 2012 -0400 @@ -62,6 +62,7 @@ @Override public void stopListening() { ctx.getChannelGroup().close().awaitUninterruptibly(); + ctx.getChannelGroup().clear(); ctx.getBootstrap().releaseExternalResources(); } }
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java Thu Aug 16 17:57:04 2012 -0400 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/RequestDecoder.java Thu Aug 16 17:57:23 2012 -0400 @@ -50,11 +50,18 @@ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { + buffer.markReaderIndex(); String typeAsString = DecodingHelper.decodeString(buffer); if (typeAsString == null) { + buffer.resetReaderIndex(); return null; } - return new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress()); + Request request = new Request(RequestType.valueOf(typeAsString), channel.getRemoteAddress()); + if (!DecodingHelper.decodeParameters(buffer, request)) { + buffer.resetReaderIndex(); + return null; + } + return request; } } \ No newline at end of file
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java Thu Aug 16 17:57:04 2012 -0400 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ResponseEncoder.java Thu Aug 16 17:57:23 2012 -0400 @@ -56,8 +56,7 @@ // Response Type String responseType = EncodingHelper.trimType(response.getType().toString()); - byte[] message = responseType.getBytes(); - ChannelBuffer typeBuffer = EncodingHelper.encode(message); + ChannelBuffer typeBuffer = EncodingHelper.encode(responseType); // Compose the full message. ChannelBuffer buf = wrappedBuffer(typeBuffer);
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ServerHandler.java Thu Aug 16 17:57:04 2012 -0400 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/internal/ServerHandler.java Thu Aug 16 17:57:23 2012 -0400 @@ -65,7 +65,6 @@ if (channel.isConnected()) { logger.info("Sending response: " + response.getType().toString()); ChannelFuture f = channel.write(response); - f.addListener(ChannelFutureListener.CLOSE); } else { logger.warning("Channel not connected.");
--- a/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java Thu Aug 16 17:57:04 2012 -0400 +++ b/agent/command/src/test/java/com/redhat/thermostat/agent/command/internal/RequestDecoderTest.java Thu Aug 16 17:57:23 2012 -0400 @@ -36,6 +36,8 @@ package com.redhat.thermostat.agent.command.internal; +import java.util.Collection; + import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; @@ -46,29 +48,52 @@ import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Request.RequestType; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; public class RequestDecoderTest { private static final byte[] PING = "PING".getBytes(); - private ChannelBuffer buffer; private Channel channel; + private RequestDecoder decoder; @Before public void setUp() { channel = mock(Channel.class); - - buffer = ChannelBuffers.dynamicBuffer(); - buffer.writeInt(PING.length); - buffer.writeBytes(PING); + decoder = new RequestDecoder(); } @Test public void testDecode() { - RequestDecoder decoder = new RequestDecoder(); + ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); + buffer.writeInt(PING.length); + buffer.writeBytes(PING); + Request request = (Request) decoder.decode(null, channel, buffer); assertTrue(RequestType.PING == (RequestType) request.getType()); } + + @Test + public void testDecodeWithParameters() { + String parmName = "parameter"; + String parmValue = "hello"; + ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); + buffer.writeInt(PING.length); + buffer.writeBytes(PING); + buffer.writeInt(1); + buffer.writeInt(parmName.getBytes().length); + buffer.writeInt(parmValue.getBytes().length); + buffer.writeBytes(parmName.getBytes()); + buffer.writeBytes(parmValue.getBytes()); + + Request request = (Request) decoder.decode(null, channel, buffer); + Collection<String> parmNames = request.getParameterNames(); + + assertEquals(1, parmNames.size()); + assertTrue(parmNames.contains(parmName)); + String decodedValue = request.getParameter(parmName); + assertEquals(parmValue, decodedValue); + } }
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java Thu Aug 16 17:57:04 2012 -0400 +++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestEncoder.java Thu Aug 16 17:57:23 2012 -0400 @@ -36,8 +36,9 @@ package com.redhat.thermostat.client.command.internal; +import java.util.Collection; + import org.jboss.netty.buffer.ChannelBuffer; -import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; @@ -46,6 +47,8 @@ import com.redhat.thermostat.common.command.MessageEncoder; import com.redhat.thermostat.common.command.Request; +import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer; +import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; class RequestEncoder extends MessageEncoder { @@ -56,11 +59,20 @@ // Request Type String requestType = EncodingHelper.trimType(request.getType().toString()); - byte[] message = requestType.getBytes(); - ChannelBuffer typeBuffer = EncodingHelper.encode(message); + ChannelBuffer typeBuffer = EncodingHelper.encode(requestType); + + // Parameters + // TODO: if in practice parms take up more than 256 bytes, use appropriate + // dynamicBuffer() variant to specify initial/estimated capacity. + ChannelBuffer parmsBuffer = dynamicBuffer(); + Collection<String> parmNames = request.getParameterNames(); + parmsBuffer.writeInt(parmNames.size()); + for (String parmName : parmNames) { + EncodingHelper.encode(parmName, request.getParameter(parmName), parmsBuffer); + } // Compose the full message. - ChannelBuffer buf = wrappedBuffer(typeBuffer); + ChannelBuffer buf = wrappedBuffer(typeBuffer, parmsBuffer); Channels.write(ctx, e.getFuture(), buf); }
--- a/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java Thu Aug 16 17:57:04 2012 -0400 +++ b/client/command/src/main/java/com/redhat/thermostat/client/command/internal/RequestQueueImpl.java Thu Aug 16 17:57:23 2012 -0400 @@ -91,6 +91,7 @@ while (processing) { Request request = null; try { + // This will block until available (or interrupted). request = queue.take(); } catch (InterruptedException e) { if (Thread.interrupted()) {
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/DecodingHelper.java Thu Aug 16 17:57:04 2012 -0400 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/DecodingHelper.java Thu Aug 16 17:57:23 2012 -0400 @@ -44,14 +44,52 @@ if (buffer.readableBytes() < 4) { return null; } - buffer.markReaderIndex(); int length = buffer.readInt(); + return decodeString(length, buffer); + } + + public static boolean decodeParameters(ChannelBuffer buffer, Request request) { + int bytesLeft = buffer.readableBytes(); + if (bytesLeft == 0) { + // Exactly zero parameters in this request. + return true; + } + if (bytesLeft < 4) { + // Bad encoding or some stream issue. + return false; + } + int numParms = buffer.readInt(); + for (int i = 0; i < numParms; i++) { + if (!decodeParameter(buffer, request)) { + return false; + } + } + return true; + } + + private static boolean decodeParameter(ChannelBuffer buffer, Request request) { + if (buffer.readableBytes() < 8) { + return false; + } + int nameLength = buffer.readInt(); + int valueLength = buffer.readInt(); + String name = decodeString(nameLength, buffer); + if (name == null) { + return false; + } + String value = decodeString(valueLength, buffer); + if (value == null) { + return false; + } + request.setParameter(name, value); + return true; + } + + private static String decodeString(int length, ChannelBuffer buffer) { if (buffer.readableBytes() < length) { - buffer.resetReaderIndex(); return null; } byte[] stringBytes = buffer.readBytes(length).array(); return new String(stringBytes); } - }
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/EncodingHelper.java Thu Aug 16 17:57:04 2012 -0400 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/EncodingHelper.java Thu Aug 16 17:57:23 2012 -0400 @@ -41,7 +41,21 @@ public class EncodingHelper { - public static ChannelBuffer encode(byte[] message) { + public static void encode(String name, String value, ChannelBuffer dynamicBuffer) { + byte[] nameBytes = name.getBytes(); + byte[] valueBytes = value.getBytes(); + dynamicBuffer.writeInt(nameBytes.length); + dynamicBuffer.writeInt(valueBytes.length); + dynamicBuffer.writeBytes(nameBytes); + dynamicBuffer.writeBytes(valueBytes); + } + + public static ChannelBuffer encode(String message) { + byte[] messageBytes = message.getBytes(); + return encode(messageBytes); + } + + private static ChannelBuffer encode(byte[] message) { ChannelBuffer buf = buffer(4 + message.length); buf.writeInt(message.length); buf.writeBytes(message);
--- a/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java Thu Aug 16 17:57:04 2012 -0400 +++ b/common/command/src/main/java/com/redhat/thermostat/common/command/Request.java Thu Aug 16 17:57:23 2012 -0400 @@ -39,6 +39,8 @@ import java.net.SocketAddress; import java.util.Collection; import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -49,11 +51,13 @@ } private final RequestType type; + private final Map<String, String> parameters; private final SocketAddress target; private final Collection<RequestResponseListener> listeners; - + public Request(RequestType type, SocketAddress target) { this.type = type; + parameters = new TreeMap<>(); this.target = target; listeners = new CopyOnWriteArrayList<>(); } @@ -63,6 +67,18 @@ return type; } + public void setParameter(String name, String value) { + parameters.put(name, value); + } + + public String getParameter(String name) { + return parameters.get(name); + } + + public Collection<String> getParameterNames() { + return parameters.keySet(); + } + public SocketAddress getTarget() { return target; }
--- a/common/command/src/test/java/com/redhat/thermostat/common/command/EncodingHelperTest.java Thu Aug 16 17:57:04 2012 -0400 +++ b/common/command/src/test/java/com/redhat/thermostat/common/command/EncodingHelperTest.java Thu Aug 16 17:57:23 2012 -0400 @@ -49,16 +49,17 @@ @Test public void testEncode() { - byte[] input = "a test string".getBytes(); + String input = "a test string"; + byte[] inputBytes = input.getBytes(); ChannelBuffer buf = EncodingHelper.encode(input); int encodedMessageLength = buf.readInt(); - assertEquals(input.length, encodedMessageLength); + assertEquals(inputBytes.length, encodedMessageLength); ByteBuffer bbuf = ByteBuffer.allocate(buf.readableBytes()); buf.readBytes(bbuf); byte[] output = bbuf.array(); - assertEquals(input.length, output.length); - for (int i = 0; i < input.length; i++) { - assertEquals(input[i], output[i]); + assertEquals(inputBytes.length, output.length); + for (int i = 0; i < inputBytes.length; i++) { + assertEquals(inputBytes[i], output[i]); } }