changeset 148:d78d32242c62

[commands] Add support for receiver keep-alive. Reviewed-by: jkang Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-April/022866.html
author Severin Gehwolf <sgehwolf@redhat.com>
date Tue, 25 Apr 2017 14:56:15 +0200
parents 925274a4ba55
children 98a6b5520bf5
files distribution/src/etc/commands/basic-users.properties services/commands/src/main/java/com/redhat/thermostat/service/commands/channel/AgentSocketsRegistry.java services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandler.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentPingSequence.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistry.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelAgentSocket.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelClientSocket.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelSocket.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelWebSocket.java services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/Debug.java services/commands/src/main/resources/agent2.html services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CmdChannelAgentSocket.java services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/NoOpMsgCallback.java services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentPingSequenceTest.java services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistryTest.java
diffstat 17 files changed, 896 insertions(+), 69 deletions(-) [+]
line wrap: on
line diff
--- a/distribution/src/etc/commands/basic-users.properties	Mon May 15 17:19:26 2017 +0200
+++ b/distribution/src/etc/commands/basic-users.properties	Tue Apr 25 14:56:15 2017 +0200
@@ -1,4 +1,5 @@
-# Agent user
+# Agent user(s)
 foo-agent-user=agent-pwd,thermostat-commands-realm,thermostat-commands-provider-testAgent
+agent-user=agent-pwd,thermostat-commands-realm,thermostat-commands-provider-otherAgent
 # Client user
 bar-client-user=client-pwd,thermostat-commands-realm,thermostat-commands-grant-dump-heap,thermostat-commands-grant-jvm-abc
\ No newline at end of file
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/channel/AgentSocketsRegistry.java	Mon May 15 17:19:26 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,59 +0,0 @@
-/*
- * Copyright 2012-2017 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-package com.redhat.thermostat.service.commands.channel;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.websocket.Session;
-
-public class AgentSocketsRegistry {
-
-    private static final Map<String, Session> agentSockets = new ConcurrentHashMap<>();
-
-    public static void addSocket(String id, Session session) {
-        agentSockets.put(id, session);
-    }
-
-    public static Session getSession(String id) {
-        return agentSockets.get(id);
-    }
-
-    public static void removeSocket(String id) {
-        agentSockets.remove(id);
-    }
-}
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java	Tue Apr 25 14:56:15 2017 +0200
@@ -43,6 +43,7 @@
 import javax.websocket.OnError;
 import javax.websocket.OnMessage;
 import javax.websocket.OnOpen;
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
@@ -72,6 +73,11 @@
         super.onMessage(message);
     }
 
+    @OnMessage
+    public void onPongMessage(PongMessage pong) {
+        super.onPongMessage(pong);
+    }
+
     @OnClose
     public void onClose(CloseReason reason) {
         super.onClose(reason.getCloseCode().getCode(),
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandler.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandler.java	Tue Apr 25 14:56:15 2017 +0200
@@ -37,7 +37,9 @@
 package com.redhat.thermostat.service.commands.http.handlers;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 
 import com.redhat.thermostat.service.commands.channel.model.Message;
@@ -47,9 +49,14 @@
 
 class CommandChannelEndpointHandler {
 
+    // Default socket timeout will be 10 minutes. For agent sockets they should never
+    // time out since we are sending regular pings to those sockets periodically. The
+    // ping interval is strictly less than the timout set here.
+    private static final long DEFAULT_SOCKET_TIMEOUT = TimeUnit.MINUTES.toMillis(10);
     private CommandChannelWebSocket socket;
 
     protected void onConnect(WebSocketType type, String agentId, Session session) throws IOException {
+        session.setMaxIdleTimeout(DEFAULT_SOCKET_TIMEOUT);
         socket = CommandChannelSocketFactory.createWebSocketChannel(type, session, agentId);
         socket.onConnect();
     }
@@ -65,4 +72,8 @@
     protected void onClose(int code, String reason) {
         socket.onClose(code, reason);
     }
+
+    protected void onPongMessage(PongMessage msg) {
+        socket.onPongMessage(msg);
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentPingSequence.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.socket;
+
+class AgentPingSequence {
+
+    private final String agentId;
+    private int currentSequence;
+
+    AgentPingSequence(String agentId) {
+        this(0, agentId);
+    }
+
+    AgentPingSequence(int initialVal, String agentId) {
+        this.currentSequence = initialVal;
+        this.agentId = agentId;
+    }
+
+    private synchronized int getNextSequence() {
+        int next = currentSequence + 1;
+        next = Math.max(0, next); // handle roll-overs
+        currentSequence = next;
+        return next;
+    }
+
+    synchronized int getCurrentSequence() {
+        return currentSequence;
+    }
+
+    String getNextPingPayload() {
+        return String.format("%d|%s", getNextSequence(), agentId);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistry.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.socket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.Session;
+
+class AgentSocketsRegistry {
+
+    private static final double PERCENT_85 = 0.85;
+    private static final Map<String, AgentSessionHolder> agentSockets = new ConcurrentHashMap<>();
+    private static final String TIMER_NAME = "com.redhat.thermostat.service.commands.ReceiverPingTimer";
+    private static final long PING_BOUND_BELOW_SECONDS = 3;
+    private static AgentSocketsRegistry INSTANCE;
+    private final long pingInterval;
+    private final TimerTask pingTask;
+    private final Timer pingTimer;
+    private boolean isTimerStarted = false;
+
+    AgentSocketsRegistry(long socketTimeOut) {
+       this(new Timer(TIMER_NAME), new PingTimerTask(), socketTimeOut);
+    }
+
+    AgentSocketsRegistry(Timer timer, TimerTask pingTask, long socketTimeOut) {
+        this.pingTimer = timer;
+        this.pingTask = pingTask;
+        this.pingInterval = calculatePeriod(socketTimeOut);
+    }
+
+    public static synchronized AgentSocketsRegistry getInstance(long socketTimeout) {
+        if (INSTANCE == null) {
+            INSTANCE = new AgentSocketsRegistry(socketTimeout);
+        }
+        return INSTANCE;
+    }
+
+    public void addSocket(String id, Session session) {
+        AgentPingSequence sequence = new AgentPingSequence(id);
+        AgentSessionHolder holder = new AgentSessionHolder(session, sequence);
+        agentSockets.put(id, holder);
+        sendPing(holder);
+        if (agentSockets.size() == 1) {
+            startPingTimer();
+        }
+    }
+
+    public Session getSession(String id) {
+        AgentSessionHolder holder = agentSockets.get(id);
+        if (holder == null) {
+            return null;
+        }
+        return holder.session;
+    }
+
+    public void removeSocket(String id) {
+        agentSockets.remove(id);
+    }
+
+    private static void sendPing(AgentSessionHolder holder) {
+        try {
+            Session session = holder.session;
+            String pingPayload = holder.sequence.getNextPingPayload();
+            synchronized (session) {
+                if (session.isOpen()) {
+                    ByteBuffer payload = ByteBuffer.wrap(pingPayload.getBytes("UTF-8"));
+                    if (Debug.isOn()) {
+                        System.err.println("Server: sending ping msg <<" + pingPayload + ">>");
+                    }
+                    session.getBasicRemote().sendPing(payload);
+                }
+            }
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    static long calculatePeriod(long sockTimeOut) {
+        // Heuristic: Use 85% of the socket timeout for the ping timer period
+        long periodCandidate = (long)(sockTimeOut * PERCENT_85);
+        if (periodCandidate < TimeUnit.SECONDS.toMillis(PING_BOUND_BELOW_SECONDS)) {
+            long socketTimeOutSecs = TimeUnit.MILLISECONDS.toSeconds(sockTimeOut);
+            throw new IllegalStateException("Agent socket timeout (" + socketTimeOutSecs + "s) too short."+
+                                            " Is the socket timeout configured correctly?");
+        }
+        return periodCandidate;
+    }
+
+    private synchronized void startPingTimer() {
+        if (!isTimerStarted) {
+            pingTimer.scheduleAtFixedRate(pingTask, pingInterval, pingInterval);
+            isTimerStarted = true;
+        }
+    }
+
+    private static class PingTimerTask extends TimerTask {
+
+        @Override
+        public void run() {
+            for (AgentSessionHolder holder: agentSockets.values()) {
+                sendPing(holder);
+            }
+        }
+
+    }
+
+    private static class AgentSessionHolder {
+        private final Session session;
+        private final AgentPingSequence sequence;
+
+        AgentSessionHolder(Session session, AgentPingSequence sequence) {
+            this.session = session;
+            this.sequence = sequence;
+        }
+    }
+}
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelAgentSocket.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelAgentSocket.java	Tue Apr 25 14:56:15 2017 +0200
@@ -37,11 +37,13 @@
 package com.redhat.thermostat.service.commands.socket;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 
 import com.redhat.thermostat.gateway.common.core.auth.basic.RoleAwareUser;
-import com.redhat.thermostat.service.commands.channel.AgentSocketsRegistry;
 import com.redhat.thermostat.service.commands.channel.ClientAgentCommunication;
 import com.redhat.thermostat.service.commands.channel.CommunicationsRegistry;
 import com.redhat.thermostat.service.commands.channel.model.Message;
@@ -49,10 +51,13 @@
 
 class CommandChannelAgentSocket extends CommandChannelSocket {
 
+    private static final String UNKNOWN_PAYLOAD = "UNKNOWN";
     private static final String AGENT_PROVIDER_PREFIX = "thermostat-commands-provider-";
+    private final long socketTimeout;
 
     CommandChannelAgentSocket(String id, Session session) {
         super(id, session);
+        this.socketTimeout = session.getMaxIdleTimeout();
     }
 
     @Override
@@ -65,13 +70,40 @@
         //        connects in that window it will get an error back,
         //        believing that the agent it wants to talk to has not
         //        connected.
-        AgentSocketsRegistry.addSocket(agentId, this.session);
+        AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(socketTimeout);
+        reg.addSocket(agentId, this.session);
     }
 
     @Override
     public void onClose(int closeCode, String reason) {
         super.onClose(closeCode, reason);
-        AgentSocketsRegistry.removeSocket(agentId);
+        AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(socketTimeout);
+        reg.removeSocket(agentId);
+    }
+
+    @Override
+    public void onPongMessage(PongMessage message) {
+        if (Debug.isOn()) {
+            String payload = extractPayload(message);
+            System.err.println("Server: Got pong message <<" + payload + ">>");
+        }
+    }
+
+    private String extractPayload(PongMessage message) {
+        ByteBuffer payload = message.getApplicationData();
+        int limit = payload.limit();
+        int position = payload.position();
+        int length = limit - position;
+        byte[] buf = new byte[length];
+        for (int i = 0; position < limit; position++, i++) {
+            buf[i] = payload.get(position);
+        }
+        try {
+            return new String(buf, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            System.err.println("ERROR: Failed to extract payload from pong message: " + e.getMessage());
+            return UNKNOWN_PAYLOAD;
+        }
     }
 
     @Override
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelClientSocket.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelClientSocket.java	Tue Apr 25 14:56:15 2017 +0200
@@ -43,7 +43,6 @@
 import javax.websocket.Session;
 
 import com.redhat.thermostat.gateway.common.core.auth.basic.RoleAwareUser;
-import com.redhat.thermostat.service.commands.channel.AgentSocketsRegistry;
 import com.redhat.thermostat.service.commands.channel.ClientAgentCommunication;
 import com.redhat.thermostat.service.commands.channel.CommunicationsRegistry;
 import com.redhat.thermostat.service.commands.channel.WebSocketCommunicationBuilder;
@@ -59,9 +58,11 @@
     private static final String PATH_PARAM_ACTION = "action";
     private static final String PATH_PARAM_JVM = "jvmId";
     private static final String PATH_PARAM_SEQ_ID = "seqId";
+    private final long socketTimeout;
 
     CommandChannelClientSocket(String id, Session session) {
         super(id, session);
+        this.socketTimeout = session.getMaxIdleTimeout();
     }
 
     @Override
@@ -75,7 +76,11 @@
         } catch (NumberFormatException e) {
             sendErrorResponse(sequence /* will be unknown */);
         }
-        Session agentSession = AgentSocketsRegistry.getSession(agentId);
+        // Note: agent/client session will have the same default timeout.
+        //       Thus, it's fine to use the client's set timeout value for
+        //       retrieving the agent registry.
+        AgentSocketsRegistry agentRegistry = AgentSocketsRegistry.getInstance(socketTimeout);
+        Session agentSession = agentRegistry.getSession(agentId);
         if (agentSession == null) {
             // the agent the client wants to talk to has not connected yet.
             sendErrorResponse(sequence);
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelSocket.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelSocket.java	Tue Apr 25 14:56:15 2017 +0200
@@ -42,6 +42,7 @@
 import javax.websocket.CloseReason;
 import javax.websocket.CloseReason.CloseCodes;
 import javax.websocket.EncodeException;
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 
 import com.redhat.thermostat.service.commands.channel.model.Message;
@@ -103,6 +104,11 @@
     }
 
     @Override
+    public void onPongMessage(PongMessage message) {
+        // no-op
+    }
+
+    @Override
     public String toString() {
         Principal p = session.getUserPrincipal();
         String user = p == null ? null : p.getName();
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelWebSocket.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelWebSocket.java	Tue Apr 25 14:56:15 2017 +0200
@@ -38,6 +38,8 @@
 
 import java.io.IOException;
 
+import javax.websocket.PongMessage;
+
 import com.redhat.thermostat.service.commands.channel.model.Message;
 
 public interface CommandChannelWebSocket {
@@ -49,4 +51,6 @@
     void onSocketMessage(Message msg);
 
     void onError(Throwable cause);
+
+    void onPongMessage(PongMessage pongMessage);
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/Debug.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.socket;
+
+class Debug {
+
+    static boolean isOn() {
+        return false;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/main/resources/agent2.html	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
+<head>
+    <title>Thermostat Commands Agent (HTML Receiver)</title>
+    <style type="text/css"><![CDATA[
+        input#cmd-chan {
+            width: 410px
+        }
+
+        #console-container {
+            width: 400px;
+        }
+
+        #console {
+            border: 1px solid #CCCCCC;
+            border-right-color: #999999;
+            border-bottom-color: #999999;
+            height: 170px;
+            overflow-y: scroll;
+            padding: 5px;
+            width: 100%;
+        }
+
+        #console p {
+            padding: 0;
+            margin: 0;
+        }
+    ]]></style>
+    <script type="application/javascript">
+    //<![CDATA[
+        var CmdChan = {};
+
+        var Response = {};
+
+        Response.asJson = (function(sequence, resp) {
+            var r = {};
+            r.type = 100;
+            r.sequence = sequence;
+            r.payload = {};
+            r.payload.respType = resp;
+            return JSON.stringify(r);
+        });
+
+        var Msg = {};
+
+        Msg.fromRequest = (function(rawMsg) {
+            var jsonObj = JSON.parse(rawMsg);
+            Msg.sequence = jsonObj.sequence;
+            Msg.paramStr = jsonObj.payload.param1;
+        });
+
+        CmdChan.socket = null;
+
+        CmdChan.connect = (function(host) {
+            if ('WebSocket' in window) {
+                CmdChan.socket = new WebSocket(host);
+            } else if ('MozWebSocket' in window) {
+                CmdChan.socket = new MozWebSocket(host);
+            } else {
+                Console.log('Error: WebSocket is not supported by this browser.');
+                return;
+            }
+
+            CmdChan.socket.onopen = function () {
+                Console.log('Info: WebSocket connection opened.');
+            };
+
+            CmdChan.socket.onclose = function () {
+                Console.log('Info: WebSocket closed.');
+            };
+
+            CmdChan.socket.onmessage = function (message) {
+                Console.log('Got: ' + message.data);
+                // parse message
+                Msg.fromRequest(message.data);
+                
+                // send the always-ok reply
+                CmdChan.socket.send(Response.asJson(Msg.sequence, 'OK'));
+                Console.log('Info: OK reply sent.');
+            };
+        });
+
+        CmdChan.initialize = function() {
+            if (window.location.protocol == 'http:') {
+                CmdChan.connect('ws://' + window.location.host + '/commands/v1/systems/foo/agents/otherAgent');
+            } else {
+                CmdChan.connect('wss://' + window.location.host + '/commands/v1/systems/foo/agents/otherAgent');
+            }
+        };
+
+        var Console = {};
+
+        Console.log = (function(message) {
+            var console = document.getElementById('console');
+            var p = document.createElement('p');
+            p.style.wordWrap = 'break-word';
+            p.innerHTML = message;
+            console.appendChild(p);
+            while (console.childNodes.length > 25) {
+                console.removeChild(console.firstChild);
+            }
+            console.scrollTop = console.scrollHeight;
+        });
+
+        CmdChan.initialize();
+
+
+        document.addEventListener("DOMContentLoaded", function() {
+            // Remove elements with "noscript" class - <noscript> is not allowed in XHTML
+            var noscripts = document.getElementsByClassName("noscript");
+            for (var i = 0; i < noscripts.length; i++) {
+                noscripts[i].parentNode.removeChild(noscripts[i]);
+            }
+        }, false);
+
+    //]]>
+    </script>
+</head>
+<body>
+<div class="noscript"><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable
+    Javascript and reload this page!</h2></div>
+<div>
+    <p>
+    Thermostat Commands: Agent Receiver
+    </p>
+    <div id="console-container">
+        <div id="console"/>
+    </div>
+</div>
+</body>
+</html>
--- a/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CmdChannelAgentSocket.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CmdChannelAgentSocket.java	Tue Apr 25 14:56:15 2017 +0200
@@ -36,14 +36,19 @@
 
 package com.redhat.thermostat.service.commands.http.handlers;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.api.extensions.Frame;
 
 import com.google.gson.Gson;
 import com.redhat.thermostat.service.commands.channel.model.Message;
@@ -59,24 +64,88 @@
     private final Gson gson;
     private final CountDownLatch closeLatch;
     private final CountDownLatch connectLatch;
+    private final CountDownLatch pongReceived;
+    private final CountDownLatch pingReceived;
     private final OnMessageCallBack onMessage;
     private Session session;
+    private String pongMsg;
+    private String pingMsg;
 
     public CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, Gson gson) {
-        this.closeLatch = new CountDownLatch(1);
-        this.onMessage = onMessage;
-        this.connectLatch = connect;
-        this.gson = gson;
+        this(onMessage, connect, new CountDownLatch(1), new CountDownLatch(1), gson);
     }
 
     public CmdChannelAgentSocket(OnMessageCallBack onMessage, Gson gson) {
         this(onMessage, new CountDownLatch(1), gson);
     }
 
+    public CmdChannelAgentSocket(CountDownLatch connect, CountDownLatch pingPongSignal, boolean isPing) {
+        this(new NoOpMsgCallback(), connect, isPing ? new CountDownLatch(1) : pingPongSignal, isPing ? pingPongSignal : new CountDownLatch(1), null);
+    }
+
+    private CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, CountDownLatch pongReceived, CountDownLatch pingReceived, Gson gson) {
+        this.closeLatch = new CountDownLatch(1);
+        this.onMessage = onMessage;
+        this.connectLatch = connect;
+        this.gson = gson;
+        this.pongReceived = pongReceived;
+        this.pingReceived = pingReceived;
+    }
+
+
     public void awaitClose() throws InterruptedException {
         this.closeLatch.await();
     }
 
+    @OnWebSocketFrame
+    public void onFrame(Frame frame) {
+        switch (frame.getType()) {
+        case PONG:
+            handlePong(frame.getPayload());
+            break;
+        case PING:
+            ByteBuffer payload = null;
+            if (frame.hasPayload()) {
+                payload = frame.getPayload();
+            }
+            handlePing(payload);
+            break;
+        default:
+            // nothing to do
+        }
+    }
+
+    private void handlePing(ByteBuffer payload) {
+        RemoteEndpoint endPoint = session.getRemote();
+        try {
+            // Note: sendPong() will change the position of the payload
+            //       so the string needs to get retrieved before sendPong()
+            //       is called.
+            pingMsg = getStringFromPayload(payload);
+            endPoint.sendPong(payload);
+            pingReceived.countDown();
+        } catch (IOException e) {
+            System.err.println("Failed to send pong response!");
+            e.printStackTrace();
+        }
+    }
+
+    private void handlePong(ByteBuffer payload) {
+        pongMsg = getStringFromPayload(payload);
+        pongReceived.countDown();
+    }
+
+    private String getStringFromPayload(ByteBuffer payload) {
+        int limit = payload.limit();
+        int position = payload.position();
+        int length = limit - position;
+        byte[] buf = new byte[length];
+        for (int i = 0; position < limit; position++, i++) {
+            buf[i] = payload.get(position);
+        }
+        return new String(buf);
+    }
+
     @OnWebSocketClose
     public void onClose(int statusCode, String reason) {
         this.closeLatch.countDown(); // trigger latch
@@ -109,6 +178,24 @@
         }
     }
 
+    public void sendPingToServer(String msgPayload) throws IOException {
+        if (this.session == null) {
+            throw new NullPointerException("Session null. Agent not connected?");
+        }
+        RemoteEndpoint endpoint = session.getRemote();
+        ByteBuffer pingPayload = ByteBuffer.wrap(msgPayload.getBytes());
+        endpoint.sendPing(pingPayload);
+        System.err.println("Client: Ping msg sent <<" + msgPayload + ">>");
+    }
+
+    public String getPongMsg() {
+        return pongMsg;
+    }
+
+    public String getPingMsg() {
+        return pingMsg;
+    }
+
     public interface OnMessageCallBack {
         public void run(Session session, Message msg);
     }
--- a/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java	Mon May 15 17:19:26 2017 +0200
+++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java	Tue Apr 25 14:56:15 2017 +0200
@@ -313,6 +313,77 @@
         doNoAuthTestAgent(agentUser);
     }
 
+    /**
+     * Tests whether the client can ping the server end-point. A server response is
+     * expected.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 2000)
+    public void testAgentPing() throws Exception {
+        String agentUser = "foo-agent-user";
+        String agentId = "testAgent";
+        URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId);
+        final CountDownLatch waitForAgentConnect = new CountDownLatch(1);
+        final CountDownLatch pongSignal = new CountDownLatch(1);
+        CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongSignal, false);
+        ClientUpgradeRequest agentRequest = new ClientUpgradeRequest();
+        agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(),
+                getBasicAuthHeaderValue(agentUser, "agent-pwd"));
+
+        // Ensure agent conneted before we actually do anything
+        client.connect(agentSocket, agentUri, agentRequest);
+        waitForAgentConnect.await();
+
+        // Initiate ping from client
+        String pingMsgPayload = "testAgent ping message";
+        agentSocket.sendPingToServer(pingMsgPayload);
+
+        // Wait for server pong to come back
+        pongSignal.await();
+
+        assertEquals(pingMsgPayload, agentSocket.getPongMsg());
+
+        // now we are ready to close the agent socket too
+        agentSocket.closeSession();
+        agentSocket.awaitClose();
+    }
+
+    /**
+     * Tests whether a server ping is properly responded to by the client by sending a
+     * pong response.
+     *
+     * Note: The service sends a ping onConnect() and then every subsequent X minutes
+     *       where X is strictly less than the currently set timeout value for the
+     *       agent/receiver sockets.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 2000)
+    public void testAgentPong() throws Exception {
+        String agentUser = "foo-agent-user";
+        String agentId = "testAgent";
+        URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId);
+        final CountDownLatch waitForAgentConnect = new CountDownLatch(1);
+        final CountDownLatch pongResponseSent = new CountDownLatch(1);
+        CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongResponseSent, true);
+        ClientUpgradeRequest agentRequest = new ClientUpgradeRequest();
+        agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(),
+                getBasicAuthHeaderValue(agentUser, "agent-pwd"));
+
+        // Ensure agent conneted before we actually do anything
+        client.connect(agentSocket, agentUri, agentRequest);
+        waitForAgentConnect.await();
+
+        String expectedPingPayload = "1|" + agentId;
+        pongResponseSent.await();
+        assertEquals(expectedPingPayload, agentSocket.getPingMsg());
+
+        // now we are ready to close the agent socket too
+        agentSocket.closeSession();
+        agentSocket.awaitClose();
+    }
+
     private void doNoAuthTestClient(long clientSequence, TestUser clientUser) throws Exception {
         ClientRequest noMatter = new ClientRequest(clientSequence);
         String agentId = "testAgent";
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/NoOpMsgCallback.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.http.handlers;
+
+import org.eclipse.jetty.websocket.api.Session;
+
+import com.redhat.thermostat.service.commands.channel.model.Message;
+import com.redhat.thermostat.service.commands.http.handlers.CmdChannelAgentSocket.OnMessageCallBack;
+
+class NoOpMsgCallback implements OnMessageCallBack {
+
+    @Override
+    public void run(Session session, Message msg) {
+        // nothing
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentPingSequenceTest.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.socket;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class AgentPingSequenceTest {
+
+    private static final String AGENT_ID = "foo";
+
+    @Test
+    public void testRollOver() {
+        int initial = Integer.MAX_VALUE - 1;
+        AgentPingSequence sequence = new AgentPingSequence(initial, AGENT_ID);
+        assertEquals(initial, sequence.getCurrentSequence());
+        assertEquals(String.format("%d|foo", Integer.MAX_VALUE), sequence.getNextPingPayload());
+        assertEquals(Integer.MAX_VALUE, sequence.getCurrentSequence());
+        assertEquals("0|foo", sequence.getNextPingPayload());
+        assertEquals(0, sequence.getCurrentSequence());
+        assertEquals("1|foo", sequence.getNextPingPayload());
+        assertEquals(1, sequence.getCurrentSequence());
+    }
+
+    @Test
+    public void canGetPingPayload() {
+        String expected = "1|foo";
+        String nextExpected = "2|foo";
+        AgentPingSequence sequence = new AgentPingSequence(AGENT_ID);
+        assertEquals(expected, sequence.getNextPingPayload());
+        assertEquals(nextExpected, sequence.getNextPingPayload());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistryTest.java	Tue Apr 25 14:56:15 2017 +0200
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2012-2017 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.service.commands.socket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.RemoteEndpoint.Basic;
+import javax.websocket.Session;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class AgentSocketsRegistryTest {
+
+    private static final long SOCKET_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+    private AgentSocketsRegistry reg;
+    private Timer timer;
+    private TimerTask pingTask;
+
+    @Before
+    public void setup() {
+        timer = mock(Timer.class);
+        pingTask = mock(TimerTask.class);
+        reg = new AgentSocketsRegistry(timer, pingTask, SOCKET_TIMEOUT);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void illegalSocketTimeout() {
+        new AgentSocketsRegistry(TimeUnit.SECONDS.toMillis(3));
+    }
+
+    @Test
+    public void addingSocketToRegistrySendsPing() throws IOException {
+        Session session = mock(Session.class);
+        Basic basic = mock(Basic.class);
+        when(session.getBasicRemote()).thenReturn(basic);
+        when(session.isOpen()).thenReturn(true);
+
+        reg.addSocket("fooAgent", session);
+
+        ArgumentCaptor<ByteBuffer> payloadCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+        verify(basic).sendPing(payloadCaptor.capture());
+        verify(session).isOpen();
+        ByteBuffer payload = payloadCaptor.getValue();
+        String actualPayload = extractPayload(payload);
+        assertEquals("1|fooAgent", actualPayload);
+    }
+
+    private String extractPayload(ByteBuffer payload) throws UnsupportedEncodingException {
+        int limit = payload.limit();
+        int position = payload.position();
+        int length = limit - position;
+        byte[] buf = new byte[length];
+        for (int i = 0; position < limit; position++, i++) {
+            buf[i] = payload.get(position);
+        }
+        return new String(buf, "UTF-8");
+    }
+
+    @Test
+    public void getSessionNonExistentReturnsNull() {
+        // Must not throw exception
+        assertNull(reg.getSession("do-not-exist"));
+    }
+
+    @Test
+    public void addingSocketToRegistryStartsTimer() {
+        reg = new AgentSocketsRegistry(timer, pingTask, SOCKET_TIMEOUT);
+        long expectedPeriod = (long)(SOCKET_TIMEOUT * 0.85);
+        Session session = mock(Session.class);
+        Basic basic = mock(Basic.class);
+        when(session.getBasicRemote()).thenReturn(basic);
+
+        reg.addSocket("fooAgent", session);
+        verify(timer).scheduleAtFixedRate(eq(pingTask), eq(expectedPeriod), eq(expectedPeriod));
+    }
+
+    @Test
+    public void testIsSingleton() {
+        AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT);
+        AgentSocketsRegistry other = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT);
+        assertSame(reg, other);
+        // differnent timeout doesn't matter for subsequent calls.
+        AgentSocketsRegistry third = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT + 3);
+        assertSame(reg, third);
+    }
+
+    @Test
+    public void calculatesTimerPeriodCorrectly() {
+        long actual = AgentSocketsRegistry.calculatePeriod(100 * 1000);
+        long expectedPeriod = 85 * 1000; // 85% of socket timeout
+        assertEquals(expectedPeriod, actual);
+        actual = AgentSocketsRegistry.calculatePeriod(TimeUnit.MINUTES.toMillis(2));
+        long expected = (long)(TimeUnit.MINUTES.toMillis(2) * 0.85);
+        assertEquals(expected, actual);
+    }
+
+}