Mercurial > hg > thermostat-ng > web-gateway
changeset 148:d78d32242c62
[commands] Add support for receiver keep-alive.
Reviewed-by: jkang
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-April/022866.html
line wrap: on
line diff
--- a/distribution/src/etc/commands/basic-users.properties Mon May 15 17:19:26 2017 +0200 +++ b/distribution/src/etc/commands/basic-users.properties Tue Apr 25 14:56:15 2017 +0200 @@ -1,4 +1,5 @@ -# Agent user +# Agent user(s) foo-agent-user=agent-pwd,thermostat-commands-realm,thermostat-commands-provider-testAgent +agent-user=agent-pwd,thermostat-commands-realm,thermostat-commands-provider-otherAgent # Client user bar-client-user=client-pwd,thermostat-commands-realm,thermostat-commands-grant-dump-heap,thermostat-commands-grant-jvm-abc \ No newline at end of file
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/channel/AgentSocketsRegistry.java Mon May 15 17:19:26 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,59 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.service.commands.channel; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.websocket.Session; - -public class AgentSocketsRegistry { - - private static final Map<String, Session> agentSockets = new ConcurrentHashMap<>(); - - public static void addSocket(String id, Session session) { - agentSockets.put(id, session); - } - - public static Session getSession(String id) { - return agentSockets.get(id); - } - - public static void removeSocket(String id) { - agentSockets.remove(id); - } -}
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java Tue Apr 25 14:56:15 2017 +0200 @@ -43,6 +43,7 @@ import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; +import javax.websocket.PongMessage; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; @@ -72,6 +73,11 @@ super.onMessage(message); } + @OnMessage + public void onPongMessage(PongMessage pong) { + super.onPongMessage(pong); + } + @OnClose public void onClose(CloseReason reason) { super.onClose(reason.getCloseCode().getCode(),
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandler.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandler.java Tue Apr 25 14:56:15 2017 +0200 @@ -37,7 +37,9 @@ package com.redhat.thermostat.service.commands.http.handlers; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import javax.websocket.PongMessage; import javax.websocket.Session; import com.redhat.thermostat.service.commands.channel.model.Message; @@ -47,9 +49,14 @@ class CommandChannelEndpointHandler { + // Default socket timeout will be 10 minutes. For agent sockets they should never + // time out since we are sending regular pings to those sockets periodically. The + // ping interval is strictly less than the timout set here. + private static final long DEFAULT_SOCKET_TIMEOUT = TimeUnit.MINUTES.toMillis(10); private CommandChannelWebSocket socket; protected void onConnect(WebSocketType type, String agentId, Session session) throws IOException { + session.setMaxIdleTimeout(DEFAULT_SOCKET_TIMEOUT); socket = CommandChannelSocketFactory.createWebSocketChannel(type, session, agentId); socket.onConnect(); } @@ -65,4 +72,8 @@ protected void onClose(int code, String reason) { socket.onClose(code, reason); } + + protected void onPongMessage(PongMessage msg) { + socket.onPongMessage(msg); + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentPingSequence.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,67 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.socket; + +class AgentPingSequence { + + private final String agentId; + private int currentSequence; + + AgentPingSequence(String agentId) { + this(0, agentId); + } + + AgentPingSequence(int initialVal, String agentId) { + this.currentSequence = initialVal; + this.agentId = agentId; + } + + private synchronized int getNextSequence() { + int next = currentSequence + 1; + next = Math.max(0, next); // handle roll-overs + currentSequence = next; + return next; + } + + synchronized int getCurrentSequence() { + return currentSequence; + } + + String getNextPingPayload() { + return String.format("%d|%s", getNextSequence(), agentId); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistry.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,156 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.socket; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import javax.websocket.Session; + +class AgentSocketsRegistry { + + private static final double PERCENT_85 = 0.85; + private static final Map<String, AgentSessionHolder> agentSockets = new ConcurrentHashMap<>(); + private static final String TIMER_NAME = "com.redhat.thermostat.service.commands.ReceiverPingTimer"; + private static final long PING_BOUND_BELOW_SECONDS = 3; + private static AgentSocketsRegistry INSTANCE; + private final long pingInterval; + private final TimerTask pingTask; + private final Timer pingTimer; + private boolean isTimerStarted = false; + + AgentSocketsRegistry(long socketTimeOut) { + this(new Timer(TIMER_NAME), new PingTimerTask(), socketTimeOut); + } + + AgentSocketsRegistry(Timer timer, TimerTask pingTask, long socketTimeOut) { + this.pingTimer = timer; + this.pingTask = pingTask; + this.pingInterval = calculatePeriod(socketTimeOut); + } + + public static synchronized AgentSocketsRegistry getInstance(long socketTimeout) { + if (INSTANCE == null) { + INSTANCE = new AgentSocketsRegistry(socketTimeout); + } + return INSTANCE; + } + + public void addSocket(String id, Session session) { + AgentPingSequence sequence = new AgentPingSequence(id); + AgentSessionHolder holder = new AgentSessionHolder(session, sequence); + agentSockets.put(id, holder); + sendPing(holder); + if (agentSockets.size() == 1) { + startPingTimer(); + } + } + + public Session getSession(String id) { + AgentSessionHolder holder = agentSockets.get(id); + if (holder == null) { + return null; + } + return holder.session; + } + + public void removeSocket(String id) { + agentSockets.remove(id); + } + + private static void sendPing(AgentSessionHolder holder) { + try { + Session session = holder.session; + String pingPayload = holder.sequence.getNextPingPayload(); + synchronized (session) { + if (session.isOpen()) { + ByteBuffer payload = ByteBuffer.wrap(pingPayload.getBytes("UTF-8")); + if (Debug.isOn()) { + System.err.println("Server: sending ping msg <<" + pingPayload + ">>"); + } + session.getBasicRemote().sendPing(payload); + } + } + } catch (IOException e) { + // ignore + } + } + + static long calculatePeriod(long sockTimeOut) { + // Heuristic: Use 85% of the socket timeout for the ping timer period + long periodCandidate = (long)(sockTimeOut * PERCENT_85); + if (periodCandidate < TimeUnit.SECONDS.toMillis(PING_BOUND_BELOW_SECONDS)) { + long socketTimeOutSecs = TimeUnit.MILLISECONDS.toSeconds(sockTimeOut); + throw new IllegalStateException("Agent socket timeout (" + socketTimeOutSecs + "s) too short."+ + " Is the socket timeout configured correctly?"); + } + return periodCandidate; + } + + private synchronized void startPingTimer() { + if (!isTimerStarted) { + pingTimer.scheduleAtFixedRate(pingTask, pingInterval, pingInterval); + isTimerStarted = true; + } + } + + private static class PingTimerTask extends TimerTask { + + @Override + public void run() { + for (AgentSessionHolder holder: agentSockets.values()) { + sendPing(holder); + } + } + + } + + private static class AgentSessionHolder { + private final Session session; + private final AgentPingSequence sequence; + + AgentSessionHolder(Session session, AgentPingSequence sequence) { + this.session = session; + this.sequence = sequence; + } + } +}
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelAgentSocket.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelAgentSocket.java Tue Apr 25 14:56:15 2017 +0200 @@ -37,11 +37,13 @@ package com.redhat.thermostat.service.commands.socket; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import javax.websocket.PongMessage; import javax.websocket.Session; import com.redhat.thermostat.gateway.common.core.auth.basic.RoleAwareUser; -import com.redhat.thermostat.service.commands.channel.AgentSocketsRegistry; import com.redhat.thermostat.service.commands.channel.ClientAgentCommunication; import com.redhat.thermostat.service.commands.channel.CommunicationsRegistry; import com.redhat.thermostat.service.commands.channel.model.Message; @@ -49,10 +51,13 @@ class CommandChannelAgentSocket extends CommandChannelSocket { + private static final String UNKNOWN_PAYLOAD = "UNKNOWN"; private static final String AGENT_PROVIDER_PREFIX = "thermostat-commands-provider-"; + private final long socketTimeout; CommandChannelAgentSocket(String id, Session session) { super(id, session); + this.socketTimeout = session.getMaxIdleTimeout(); } @Override @@ -65,13 +70,40 @@ // connects in that window it will get an error back, // believing that the agent it wants to talk to has not // connected. - AgentSocketsRegistry.addSocket(agentId, this.session); + AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(socketTimeout); + reg.addSocket(agentId, this.session); } @Override public void onClose(int closeCode, String reason) { super.onClose(closeCode, reason); - AgentSocketsRegistry.removeSocket(agentId); + AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(socketTimeout); + reg.removeSocket(agentId); + } + + @Override + public void onPongMessage(PongMessage message) { + if (Debug.isOn()) { + String payload = extractPayload(message); + System.err.println("Server: Got pong message <<" + payload + ">>"); + } + } + + private String extractPayload(PongMessage message) { + ByteBuffer payload = message.getApplicationData(); + int limit = payload.limit(); + int position = payload.position(); + int length = limit - position; + byte[] buf = new byte[length]; + for (int i = 0; position < limit; position++, i++) { + buf[i] = payload.get(position); + } + try { + return new String(buf, "UTF-8"); + } catch (UnsupportedEncodingException e) { + System.err.println("ERROR: Failed to extract payload from pong message: " + e.getMessage()); + return UNKNOWN_PAYLOAD; + } } @Override
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelClientSocket.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelClientSocket.java Tue Apr 25 14:56:15 2017 +0200 @@ -43,7 +43,6 @@ import javax.websocket.Session; import com.redhat.thermostat.gateway.common.core.auth.basic.RoleAwareUser; -import com.redhat.thermostat.service.commands.channel.AgentSocketsRegistry; import com.redhat.thermostat.service.commands.channel.ClientAgentCommunication; import com.redhat.thermostat.service.commands.channel.CommunicationsRegistry; import com.redhat.thermostat.service.commands.channel.WebSocketCommunicationBuilder; @@ -59,9 +58,11 @@ private static final String PATH_PARAM_ACTION = "action"; private static final String PATH_PARAM_JVM = "jvmId"; private static final String PATH_PARAM_SEQ_ID = "seqId"; + private final long socketTimeout; CommandChannelClientSocket(String id, Session session) { super(id, session); + this.socketTimeout = session.getMaxIdleTimeout(); } @Override @@ -75,7 +76,11 @@ } catch (NumberFormatException e) { sendErrorResponse(sequence /* will be unknown */); } - Session agentSession = AgentSocketsRegistry.getSession(agentId); + // Note: agent/client session will have the same default timeout. + // Thus, it's fine to use the client's set timeout value for + // retrieving the agent registry. + AgentSocketsRegistry agentRegistry = AgentSocketsRegistry.getInstance(socketTimeout); + Session agentSession = agentRegistry.getSession(agentId); if (agentSession == null) { // the agent the client wants to talk to has not connected yet. sendErrorResponse(sequence);
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelSocket.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelSocket.java Tue Apr 25 14:56:15 2017 +0200 @@ -42,6 +42,7 @@ import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; import javax.websocket.EncodeException; +import javax.websocket.PongMessage; import javax.websocket.Session; import com.redhat.thermostat.service.commands.channel.model.Message; @@ -103,6 +104,11 @@ } @Override + public void onPongMessage(PongMessage message) { + // no-op + } + + @Override public String toString() { Principal p = session.getUserPrincipal(); String user = p == null ? null : p.getName();
--- a/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelWebSocket.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/CommandChannelWebSocket.java Tue Apr 25 14:56:15 2017 +0200 @@ -38,6 +38,8 @@ import java.io.IOException; +import javax.websocket.PongMessage; + import com.redhat.thermostat.service.commands.channel.model.Message; public interface CommandChannelWebSocket { @@ -49,4 +51,6 @@ void onSocketMessage(Message msg); void onError(Throwable cause); + + void onPongMessage(PongMessage pongMessage); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/service/commands/socket/Debug.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,44 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.socket; + +class Debug { + + static boolean isOn() { + return false; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/resources/agent2.html Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,131 @@ +<?xml version="1.0" encoding="UTF-8"?> +<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"> +<head> + <title>Thermostat Commands Agent (HTML Receiver)</title> + <style type="text/css"><![CDATA[ + input#cmd-chan { + width: 410px + } + + #console-container { + width: 400px; + } + + #console { + border: 1px solid #CCCCCC; + border-right-color: #999999; + border-bottom-color: #999999; + height: 170px; + overflow-y: scroll; + padding: 5px; + width: 100%; + } + + #console p { + padding: 0; + margin: 0; + } + ]]></style> + <script type="application/javascript"> + //<![CDATA[ + var CmdChan = {}; + + var Response = {}; + + Response.asJson = (function(sequence, resp) { + var r = {}; + r.type = 100; + r.sequence = sequence; + r.payload = {}; + r.payload.respType = resp; + return JSON.stringify(r); + }); + + var Msg = {}; + + Msg.fromRequest = (function(rawMsg) { + var jsonObj = JSON.parse(rawMsg); + Msg.sequence = jsonObj.sequence; + Msg.paramStr = jsonObj.payload.param1; + }); + + CmdChan.socket = null; + + CmdChan.connect = (function(host) { + if ('WebSocket' in window) { + CmdChan.socket = new WebSocket(host); + } else if ('MozWebSocket' in window) { + CmdChan.socket = new MozWebSocket(host); + } else { + Console.log('Error: WebSocket is not supported by this browser.'); + return; + } + + CmdChan.socket.onopen = function () { + Console.log('Info: WebSocket connection opened.'); + }; + + CmdChan.socket.onclose = function () { + Console.log('Info: WebSocket closed.'); + }; + + CmdChan.socket.onmessage = function (message) { + Console.log('Got: ' + message.data); + // parse message + Msg.fromRequest(message.data); + + // send the always-ok reply + CmdChan.socket.send(Response.asJson(Msg.sequence, 'OK')); + Console.log('Info: OK reply sent.'); + }; + }); + + CmdChan.initialize = function() { + if (window.location.protocol == 'http:') { + CmdChan.connect('ws://' + window.location.host + '/commands/v1/systems/foo/agents/otherAgent'); + } else { + CmdChan.connect('wss://' + window.location.host + '/commands/v1/systems/foo/agents/otherAgent'); + } + }; + + var Console = {}; + + Console.log = (function(message) { + var console = document.getElementById('console'); + var p = document.createElement('p'); + p.style.wordWrap = 'break-word'; + p.innerHTML = message; + console.appendChild(p); + while (console.childNodes.length > 25) { + console.removeChild(console.firstChild); + } + console.scrollTop = console.scrollHeight; + }); + + CmdChan.initialize(); + + + document.addEventListener("DOMContentLoaded", function() { + // Remove elements with "noscript" class - <noscript> is not allowed in XHTML + var noscripts = document.getElementsByClassName("noscript"); + for (var i = 0; i < noscripts.length; i++) { + noscripts[i].parentNode.removeChild(noscripts[i]); + } + }, false); + + //]]> + </script> +</head> +<body> +<div class="noscript"><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable + Javascript and reload this page!</h2></div> +<div> + <p> + Thermostat Commands: Agent Receiver + </p> + <div id="console-container"> + <div id="console"/> + </div> +</div> +</body> +</html>
--- a/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CmdChannelAgentSocket.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CmdChannelAgentSocket.java Tue Apr 25 14:56:15 2017 +0200 @@ -36,14 +36,19 @@ package com.redhat.thermostat.service.commands.http.handlers; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; import com.google.gson.Gson; import com.redhat.thermostat.service.commands.channel.model.Message; @@ -59,24 +64,88 @@ private final Gson gson; private final CountDownLatch closeLatch; private final CountDownLatch connectLatch; + private final CountDownLatch pongReceived; + private final CountDownLatch pingReceived; private final OnMessageCallBack onMessage; private Session session; + private String pongMsg; + private String pingMsg; public CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, Gson gson) { - this.closeLatch = new CountDownLatch(1); - this.onMessage = onMessage; - this.connectLatch = connect; - this.gson = gson; + this(onMessage, connect, new CountDownLatch(1), new CountDownLatch(1), gson); } public CmdChannelAgentSocket(OnMessageCallBack onMessage, Gson gson) { this(onMessage, new CountDownLatch(1), gson); } + public CmdChannelAgentSocket(CountDownLatch connect, CountDownLatch pingPongSignal, boolean isPing) { + this(new NoOpMsgCallback(), connect, isPing ? new CountDownLatch(1) : pingPongSignal, isPing ? pingPongSignal : new CountDownLatch(1), null); + } + + private CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, CountDownLatch pongReceived, CountDownLatch pingReceived, Gson gson) { + this.closeLatch = new CountDownLatch(1); + this.onMessage = onMessage; + this.connectLatch = connect; + this.gson = gson; + this.pongReceived = pongReceived; + this.pingReceived = pingReceived; + } + + public void awaitClose() throws InterruptedException { this.closeLatch.await(); } + @OnWebSocketFrame + public void onFrame(Frame frame) { + switch (frame.getType()) { + case PONG: + handlePong(frame.getPayload()); + break; + case PING: + ByteBuffer payload = null; + if (frame.hasPayload()) { + payload = frame.getPayload(); + } + handlePing(payload); + break; + default: + // nothing to do + } + } + + private void handlePing(ByteBuffer payload) { + RemoteEndpoint endPoint = session.getRemote(); + try { + // Note: sendPong() will change the position of the payload + // so the string needs to get retrieved before sendPong() + // is called. + pingMsg = getStringFromPayload(payload); + endPoint.sendPong(payload); + pingReceived.countDown(); + } catch (IOException e) { + System.err.println("Failed to send pong response!"); + e.printStackTrace(); + } + } + + private void handlePong(ByteBuffer payload) { + pongMsg = getStringFromPayload(payload); + pongReceived.countDown(); + } + + private String getStringFromPayload(ByteBuffer payload) { + int limit = payload.limit(); + int position = payload.position(); + int length = limit - position; + byte[] buf = new byte[length]; + for (int i = 0; position < limit; position++, i++) { + buf[i] = payload.get(position); + } + return new String(buf); + } + @OnWebSocketClose public void onClose(int statusCode, String reason) { this.closeLatch.countDown(); // trigger latch @@ -109,6 +178,24 @@ } } + public void sendPingToServer(String msgPayload) throws IOException { + if (this.session == null) { + throw new NullPointerException("Session null. Agent not connected?"); + } + RemoteEndpoint endpoint = session.getRemote(); + ByteBuffer pingPayload = ByteBuffer.wrap(msgPayload.getBytes()); + endpoint.sendPing(pingPayload); + System.err.println("Client: Ping msg sent <<" + msgPayload + ">>"); + } + + public String getPongMsg() { + return pongMsg; + } + + public String getPingMsg() { + return pingMsg; + } + public interface OnMessageCallBack { public void run(Session session, Message msg); }
--- a/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java Mon May 15 17:19:26 2017 +0200 +++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java Tue Apr 25 14:56:15 2017 +0200 @@ -313,6 +313,77 @@ doNoAuthTestAgent(agentUser); } + /** + * Tests whether the client can ping the server end-point. A server response is + * expected. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testAgentPing() throws Exception { + String agentUser = "foo-agent-user"; + String agentId = "testAgent"; + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + final CountDownLatch pongSignal = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongSignal, false); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + + // Ensure agent conneted before we actually do anything + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + // Initiate ping from client + String pingMsgPayload = "testAgent ping message"; + agentSocket.sendPingToServer(pingMsgPayload); + + // Wait for server pong to come back + pongSignal.await(); + + assertEquals(pingMsgPayload, agentSocket.getPongMsg()); + + // now we are ready to close the agent socket too + agentSocket.closeSession(); + agentSocket.awaitClose(); + } + + /** + * Tests whether a server ping is properly responded to by the client by sending a + * pong response. + * + * Note: The service sends a ping onConnect() and then every subsequent X minutes + * where X is strictly less than the currently set timeout value for the + * agent/receiver sockets. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testAgentPong() throws Exception { + String agentUser = "foo-agent-user"; + String agentId = "testAgent"; + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + final CountDownLatch pongResponseSent = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongResponseSent, true); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + + // Ensure agent conneted before we actually do anything + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + String expectedPingPayload = "1|" + agentId; + pongResponseSent.await(); + assertEquals(expectedPingPayload, agentSocket.getPingMsg()); + + // now we are ready to close the agent socket too + agentSocket.closeSession(); + agentSocket.awaitClose(); + } + private void doNoAuthTestClient(long clientSequence, TestUser clientUser) throws Exception { ClientRequest noMatter = new ClientRequest(clientSequence); String agentId = "testAgent";
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/http/handlers/NoOpMsgCallback.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,51 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.http.handlers; + +import org.eclipse.jetty.websocket.api.Session; + +import com.redhat.thermostat.service.commands.channel.model.Message; +import com.redhat.thermostat.service.commands.http.handlers.CmdChannelAgentSocket.OnMessageCallBack; + +class NoOpMsgCallback implements OnMessageCallBack { + + @Override + public void run(Session session, Message msg) { + // nothing + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentPingSequenceTest.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,68 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.socket; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class AgentPingSequenceTest { + + private static final String AGENT_ID = "foo"; + + @Test + public void testRollOver() { + int initial = Integer.MAX_VALUE - 1; + AgentPingSequence sequence = new AgentPingSequence(initial, AGENT_ID); + assertEquals(initial, sequence.getCurrentSequence()); + assertEquals(String.format("%d|foo", Integer.MAX_VALUE), sequence.getNextPingPayload()); + assertEquals(Integer.MAX_VALUE, sequence.getCurrentSequence()); + assertEquals("0|foo", sequence.getNextPingPayload()); + assertEquals(0, sequence.getCurrentSequence()); + assertEquals("1|foo", sequence.getNextPingPayload()); + assertEquals(1, sequence.getCurrentSequence()); + } + + @Test + public void canGetPingPayload() { + String expected = "1|foo"; + String nextExpected = "2|foo"; + AgentPingSequence sequence = new AgentPingSequence(AGENT_ID); + assertEquals(expected, sequence.getNextPingPayload()); + assertEquals(nextExpected, sequence.getNextPingPayload()); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/service/commands/socket/AgentSocketsRegistryTest.java Tue Apr 25 14:56:15 2017 +0200 @@ -0,0 +1,146 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.service.commands.socket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import javax.websocket.RemoteEndpoint.Basic; +import javax.websocket.Session; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class AgentSocketsRegistryTest { + + private static final long SOCKET_TIMEOUT = TimeUnit.SECONDS.toMillis(30); + private AgentSocketsRegistry reg; + private Timer timer; + private TimerTask pingTask; + + @Before + public void setup() { + timer = mock(Timer.class); + pingTask = mock(TimerTask.class); + reg = new AgentSocketsRegistry(timer, pingTask, SOCKET_TIMEOUT); + } + + @Test(expected = IllegalStateException.class) + public void illegalSocketTimeout() { + new AgentSocketsRegistry(TimeUnit.SECONDS.toMillis(3)); + } + + @Test + public void addingSocketToRegistrySendsPing() throws IOException { + Session session = mock(Session.class); + Basic basic = mock(Basic.class); + when(session.getBasicRemote()).thenReturn(basic); + when(session.isOpen()).thenReturn(true); + + reg.addSocket("fooAgent", session); + + ArgumentCaptor<ByteBuffer> payloadCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + verify(basic).sendPing(payloadCaptor.capture()); + verify(session).isOpen(); + ByteBuffer payload = payloadCaptor.getValue(); + String actualPayload = extractPayload(payload); + assertEquals("1|fooAgent", actualPayload); + } + + private String extractPayload(ByteBuffer payload) throws UnsupportedEncodingException { + int limit = payload.limit(); + int position = payload.position(); + int length = limit - position; + byte[] buf = new byte[length]; + for (int i = 0; position < limit; position++, i++) { + buf[i] = payload.get(position); + } + return new String(buf, "UTF-8"); + } + + @Test + public void getSessionNonExistentReturnsNull() { + // Must not throw exception + assertNull(reg.getSession("do-not-exist")); + } + + @Test + public void addingSocketToRegistryStartsTimer() { + reg = new AgentSocketsRegistry(timer, pingTask, SOCKET_TIMEOUT); + long expectedPeriod = (long)(SOCKET_TIMEOUT * 0.85); + Session session = mock(Session.class); + Basic basic = mock(Basic.class); + when(session.getBasicRemote()).thenReturn(basic); + + reg.addSocket("fooAgent", session); + verify(timer).scheduleAtFixedRate(eq(pingTask), eq(expectedPeriod), eq(expectedPeriod)); + } + + @Test + public void testIsSingleton() { + AgentSocketsRegistry reg = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT); + AgentSocketsRegistry other = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT); + assertSame(reg, other); + // differnent timeout doesn't matter for subsequent calls. + AgentSocketsRegistry third = AgentSocketsRegistry.getInstance(SOCKET_TIMEOUT + 3); + assertSame(reg, third); + } + + @Test + public void calculatesTimerPeriodCorrectly() { + long actual = AgentSocketsRegistry.calculatePeriod(100 * 1000); + long expectedPeriod = 85 * 1000; // 85% of socket timeout + assertEquals(expectedPeriod, actual); + actual = AgentSocketsRegistry.calculatePeriod(TimeUnit.MINUTES.toMillis(2)); + long expected = (long)(TimeUnit.MINUTES.toMillis(2) * 0.85); + assertEquals(expected, actual); + } + +}