Mercurial > hg > thermostat-ng > web-gateway
changeset 242:186646ba5e7b
[commands] Preparation for config aware configurator.
Reviewed-by: neugens
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-September/024816.html
line wrap: on
line diff
--- a/common/core/src/main/java/com/redhat/thermostat/gateway/common/core/servlet/GlobalConstants.java Thu Aug 31 15:34:39 2017 +0200 +++ b/common/core/src/main/java/com/redhat/thermostat/gateway/common/core/servlet/GlobalConstants.java Mon Sep 04 08:46:20 2017 +0200 @@ -42,4 +42,5 @@ public static final String GATEWAY_HOME_ENV = "THERMOSTAT_GATEWAY_HOME"; public static final String SERVICE_NAME_KEY = GATEWAY_PREFIX + ".SERVICE_NAME"; public static final String SERVICE_CONFIG_KEY = GATEWAY_PREFIX + ".SERVICE_CONFIG"; + public static final String SERVICE_VERSION_KEY = GATEWAY_PREFIX + ".SERVICE_VERSION"; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CommandChannelAgentEndpointHandler.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,100 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.io.IOException; + +import javax.websocket.CloseReason; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.PongMessage; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import com.redhat.thermostat.gateway.service.commands.channel.coders.AgentRequestEncoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.MessageDecoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.WebSocketResponseEncoder; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import com.redhat.thermostat.gateway.service.commands.servlet.SocketRegistrationListener; +import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; + +/** + * Agent endpoints: a.k.a Receivers. + * + * @see SocketRegistrationListener where endpoints are set up. + * + */ +@ServerEndpoint( decoders = MessageDecoder.class, + encoders = { AgentRequestEncoder.class, WebSocketResponseEncoder.class }, + value = "ignored" ) // Actual path is set up in SocketRegistrationListener +public class CommandChannelAgentEndpointHandler extends CommandChannelEndpointHandler { + + /** + * The path this handler is registered for. Must not include version prefix. + */ + public static final String PATH = "/systems/{systemId}/agents/{agentId}"; + + @OnOpen + public void onConnect(Session session, + @PathParam("agentId") final String agentId) throws IOException { + super.onConnect(WebSocketType.AGENT, agentId, session); + } + + @OnMessage + public void onMessage(Message message) { + super.onMessage(message); + } + + @OnMessage + public void onPongMessage(PongMessage pong) { + super.onPongMessage(pong); + } + + @OnClose + public void onClose(CloseReason reason) { + super.onClose(reason.getCloseCode().getCode(), + reason.getReasonPhrase()); + } + + @OnError + public void onErrorThrown(Throwable cause) { + super.onError(cause); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CommandChannelClientEndpointHandler.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,110 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.io.IOException; + +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCodes; +import javax.websocket.EncodeException; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import com.redhat.thermostat.gateway.service.commands.channel.coders.AgentRequestEncoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.MessageDecoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.WebSocketResponseEncoder; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; +import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse.ResponseType; +import com.redhat.thermostat.gateway.service.commands.servlet.SocketRegistrationListener; +import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; + +/** + * Client endpoints: a.k.a Initiators. + * + * @see SocketRegistrationListener where endpoints are set up. + * + */ +@ServerEndpoint( decoders = MessageDecoder.class, + encoders = { AgentRequestEncoder.class, WebSocketResponseEncoder.class }, + value = "ignored") // Actual path is set up in SocketRegistrationListener +public class CommandChannelClientEndpointHandler extends CommandChannelEndpointHandler { + + /** + * The path this handler is registered for + */ + public static final String PATH = "/actions/{action}/systems/{systemId}/agents/{agentId}/jvms/{jvmId}/sequence/{seqId}"; + + @OnOpen + public void onConnect(Session session, + @PathParam("agentId") final String agentId) throws IOException { + super.onConnect(WebSocketType.CLIENT, agentId, session); + } + + @OnMessage + public void onMessage(Message msg) { + super.onMessage(msg); + } + + @OnClose + public void onClose(CloseReason reason) { + super.onClose(reason.getCloseCode().getCode(), + reason.getReasonPhrase()); + } + + @OnError + public void onErrorThrown(Session session, + Throwable cause, + @PathParam("seqId") final long seqId) { + try { + // Let the client know something failed + WebSocketResponse resp = new WebSocketResponse(seqId, ResponseType.ERROR); + session.getBasicRemote().sendObject(resp); + session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, + "Server error")); + } catch (IOException|EncodeException e) { + // Not much we can do + e.printStackTrace(); + } + super.onError(cause); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CommandChannelEndpointHandler.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,79 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import javax.websocket.PongMessage; +import javax.websocket.Session; + +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import com.redhat.thermostat.gateway.service.commands.socket.CommandChannelSocketFactory; +import com.redhat.thermostat.gateway.service.commands.socket.CommandChannelWebSocket; +import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; + +class CommandChannelEndpointHandler { + + // Default socket timeout will be 10 minutes. For agent sockets they should never + // time out since we are sending regular pings to those sockets periodically. The + // ping interval is strictly less than the timout set here. + private static final long DEFAULT_SOCKET_TIMEOUT = TimeUnit.MINUTES.toMillis(10); + private CommandChannelWebSocket socket; + + protected void onConnect(WebSocketType type, String agentId, Session session) throws IOException { + session.setMaxIdleTimeout(DEFAULT_SOCKET_TIMEOUT); + socket = CommandChannelSocketFactory.createWebSocketChannel(type, session, agentId); + socket.onConnect(); + } + + protected void onMessage(Message msg) { + socket.onSocketMessage(msg); + } + + protected void onError(Throwable cause) { + socket.onError(cause); + } + + protected void onClose(int code, String reason) { + socket.onClose(code, reason); + } + + protected void onPongMessage(PongMessage msg) { + socket.onPongMessage(msg); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CommandChannelEndpointHandlerFactory.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.util.ArrayList; +import java.util.List; + +import javax.websocket.Decoder; +import javax.websocket.Encoder; +import javax.websocket.server.ServerEndpointConfig; + +import com.redhat.thermostat.gateway.common.core.config.Configuration; +import com.redhat.thermostat.gateway.common.core.servlet.GlobalConstants; +import com.redhat.thermostat.gateway.service.commands.channel.coders.AgentRequestEncoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.MessageDecoder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.WebSocketResponseEncoder; + +public class CommandChannelEndpointHandlerFactory { + + private final List<Class<? extends Decoder>> decoders; + private final List<Class<? extends Encoder>> encoders; + + public CommandChannelEndpointHandlerFactory() { + this.decoders = new ArrayList<Class<? extends Decoder>>(); + decoders.add(MessageDecoder.class); + this.encoders = new ArrayList<Class<? extends Encoder>>(); + encoders.add(AgentRequestEncoder.class); + encoders.add(WebSocketResponseEncoder.class); + } + + public <T extends CommandChannelEndpointHandler> ServerEndpointConfig createEndpointConfig(Class<T> endpointClass, String path, Configuration serviceConfig) { + ServerEndpointConfig.Builder configBuilder = ServerEndpointConfig.Builder.create(endpointClass, path); + configBuilder.configurator(new RealmAuthorizerConfigurator()); + configBuilder.decoders(decoders); + configBuilder.encoders(encoders); + ServerEndpointConfig config = configBuilder.build(); + config.getUserProperties().put(GlobalConstants.SERVICE_CONFIG_KEY, serviceConfig); + return config; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/RealmAuthorizerConfigurator.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,66 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import javax.websocket.HandshakeResponse; +import javax.websocket.server.HandshakeRequest; +import javax.websocket.server.ServerEndpointConfig; +import javax.websocket.server.ServerEndpointConfig.Configurator; + +import com.redhat.thermostat.gateway.common.core.auth.RealmAuthorizer; +import com.redhat.thermostat.gateway.common.core.auth.basic.BasicRealmAuthorizer; +import com.redhat.thermostat.gateway.common.core.auth.basic.BasicWebUser; +import com.redhat.thermostat.gateway.common.core.config.Configuration; +import com.redhat.thermostat.gateway.common.core.servlet.GlobalConstants; + +public class RealmAuthorizerConfigurator extends Configurator { + + @Override + public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) { + Configuration serviceConfig = (Configuration)config.getUserProperties().get(GlobalConstants.SERVICE_CONFIG_KEY); + + // FIXME: Set up proper realm authorizer based on config + BasicWebUser user = (BasicWebUser)request.getUserPrincipal(); + RealmAuthorizer realmAuthorizer; + if (user == null) { + realmAuthorizer = new RealmAuthorizer() {}; // deny-all authorizer + } else { + realmAuthorizer = new BasicRealmAuthorizer(user); + } + config.getUserProperties().put(RealmAuthorizer.class.getName(), realmAuthorizer); + } +}
--- a/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CommandChannelAgentEndpointHandler.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,92 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import java.io.IOException; - -import javax.websocket.CloseReason; -import javax.websocket.OnClose; -import javax.websocket.OnError; -import javax.websocket.OnMessage; -import javax.websocket.OnOpen; -import javax.websocket.PongMessage; -import javax.websocket.Session; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; - -import com.redhat.thermostat.gateway.service.commands.channel.coders.AgentRequestEncoder; -import com.redhat.thermostat.gateway.service.commands.channel.coders.MessageDecoder; -import com.redhat.thermostat.gateway.service.commands.channel.coders.WebSocketResponseEncoder; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; - -// Agent endpoints; Receivers -@ServerEndpoint( - value = "/v1/systems/{systemId}/agents/{agentId}", - encoders = { AgentRequestEncoder.class, WebSocketResponseEncoder.class }, - decoders = { MessageDecoder.class }, - configurator = RealmAuthorizerConfigurator.class -) -public class CommandChannelAgentEndpointHandler extends CommandChannelEndpointHandler { - - @OnOpen - public void onConnect(Session session, - @PathParam("agentId") final String agentId) throws IOException { - super.onConnect(WebSocketType.AGENT, agentId, session); - } - - @OnMessage - public void onMessage(Message message) { - super.onMessage(message); - } - - @OnMessage - public void onPongMessage(PongMessage pong) { - super.onPongMessage(pong); - } - - @OnClose - public void onClose(CloseReason reason) { - super.onClose(reason.getCloseCode().getCode(), - reason.getReasonPhrase()); - } - - @OnError - public void onErrorThrown(Throwable cause) { - super.onError(cause); - } -}
--- a/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CommandChannelClientEndpointHandler.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import java.io.IOException; - -import javax.websocket.CloseReason; -import javax.websocket.CloseReason.CloseCodes; -import javax.websocket.EncodeException; -import javax.websocket.OnClose; -import javax.websocket.OnError; -import javax.websocket.OnMessage; -import javax.websocket.OnOpen; -import javax.websocket.Session; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; - -import com.redhat.thermostat.gateway.service.commands.channel.coders.AgentRequestEncoder; -import com.redhat.thermostat.gateway.service.commands.channel.coders.MessageDecoder; -import com.redhat.thermostat.gateway.service.commands.channel.coders.WebSocketResponseEncoder; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; -import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse.ResponseType; -import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; - -// Client endpoints; Initiators -@ServerEndpoint( - value = "/v1/actions/{action}/systems/{systemId}/agents/{agentId}/jvms/{jvmId}/sequence/{seqId}", - encoders = { AgentRequestEncoder.class, WebSocketResponseEncoder.class }, - decoders = { MessageDecoder.class }, - configurator = RealmAuthorizerConfigurator.class -) -public class CommandChannelClientEndpointHandler extends CommandChannelEndpointHandler { - - @OnOpen - public void onConnect(Session session, - @PathParam("agentId") final String agentId) throws IOException { - super.onConnect(WebSocketType.CLIENT, agentId, session); - } - - @OnMessage - public void onMessage(Message msg) { - super.onMessage(msg); - } - - @OnClose - public void onClose(CloseReason reason) { - super.onClose(reason.getCloseCode().getCode(), - reason.getReasonPhrase()); - } - - @OnError - public void onErrorThrown(Session session, - Throwable cause, - @PathParam("seqId") final long seqId) { - try { - // Let the client know something failed - WebSocketResponse resp = new WebSocketResponse(seqId, ResponseType.ERROR); - session.getBasicRemote().sendObject(resp); - session.close(new CloseReason(CloseCodes.UNEXPECTED_CONDITION, - "Server error")); - } catch (IOException|EncodeException e) { - // Not much we can do - e.printStackTrace(); - } - super.onError(cause); - } -}
--- a/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CommandChannelEndpointHandler.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import javax.websocket.PongMessage; -import javax.websocket.Session; - -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import com.redhat.thermostat.gateway.service.commands.socket.CommandChannelSocketFactory; -import com.redhat.thermostat.gateway.service.commands.socket.CommandChannelWebSocket; -import com.redhat.thermostat.gateway.service.commands.socket.WebSocketType; - -class CommandChannelEndpointHandler { - - // Default socket timeout will be 10 minutes. For agent sockets they should never - // time out since we are sending regular pings to those sockets periodically. The - // ping interval is strictly less than the timout set here. - private static final long DEFAULT_SOCKET_TIMEOUT = TimeUnit.MINUTES.toMillis(10); - private CommandChannelWebSocket socket; - - protected void onConnect(WebSocketType type, String agentId, Session session) throws IOException { - session.setMaxIdleTimeout(DEFAULT_SOCKET_TIMEOUT); - socket = CommandChannelSocketFactory.createWebSocketChannel(type, session, agentId); - socket.onConnect(); - } - - protected void onMessage(Message msg) { - socket.onSocketMessage(msg); - } - - protected void onError(Throwable cause) { - socket.onError(cause); - } - - protected void onClose(int code, String reason) { - socket.onClose(code, reason); - } - - protected void onPongMessage(PongMessage msg) { - socket.onPongMessage(msg); - } -}
--- a/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/http/handlers/RealmAuthorizerConfigurator.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,62 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import javax.websocket.HandshakeResponse; -import javax.websocket.server.HandshakeRequest; -import javax.websocket.server.ServerEndpointConfig; -import javax.websocket.server.ServerEndpointConfig.Configurator; - -import com.redhat.thermostat.gateway.common.core.auth.RealmAuthorizer; -import com.redhat.thermostat.gateway.common.core.auth.basic.BasicRealmAuthorizer; -import com.redhat.thermostat.gateway.common.core.auth.basic.BasicWebUser; - -public class RealmAuthorizerConfigurator extends Configurator { - - @Override - public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) { - // FIXME: Set up proper realm authorizer based on config - BasicWebUser user = (BasicWebUser)request.getUserPrincipal(); - RealmAuthorizer realmAuthorizer; - if (user == null) { - realmAuthorizer = new RealmAuthorizer() {}; // deny-all authorizer - } else { - realmAuthorizer = new BasicRealmAuthorizer(user); - } - config.getUserProperties().put(RealmAuthorizer.class.getName(), realmAuthorizer); - } -}
--- a/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/servlet/SocketRegistrationListener.java Thu Aug 31 15:34:39 2017 +0200 +++ b/services/commands/src/main/java/com/redhat/thermostat/gateway/service/commands/servlet/SocketRegistrationListener.java Mon Sep 04 08:46:20 2017 +0200 @@ -36,27 +36,48 @@ package com.redhat.thermostat.gateway.service.commands.servlet; +import java.util.logging.Level; +import java.util.logging.Logger; + import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.websocket.DeploymentException; import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerEndpointConfig; -import com.redhat.thermostat.gateway.service.commands.http.handlers.CommandChannelAgentEndpointHandler; -import com.redhat.thermostat.gateway.service.commands.http.handlers.CommandChannelClientEndpointHandler; +import com.redhat.thermostat.gateway.common.core.config.Configuration; +import com.redhat.thermostat.gateway.common.core.servlet.GlobalConstants; +import com.redhat.thermostat.gateway.common.util.LoggingUtil; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CommandChannelAgentEndpointHandler; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CommandChannelClientEndpointHandler; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CommandChannelEndpointHandlerFactory; public class SocketRegistrationListener implements ServletContextListener { - // javax.websocket.server.ServerContainer + private static final Logger logger = LoggingUtil.getLogger(SocketRegistrationListener.class); private static final String SERVER_CONTAINER_ATTR = "javax.websocket.server.ServerContainer"; @Override public void contextInitialized(ServletContextEvent sce) { ServletContext ctxt = sce.getServletContext(); + Configuration serviceConfig = (Configuration)ctxt.getAttribute(GlobalConstants.SERVICE_CONFIG_KEY); + String version = ctxt.getInitParameter(GlobalConstants.SERVICE_VERSION_KEY); ServerContainer container = (ServerContainer)ctxt.getAttribute(SERVER_CONTAINER_ATTR); + String agentPath = buildPathWithVersion(version, CommandChannelAgentEndpointHandler.PATH); + String clientPath = buildPathWithVersion(version, CommandChannelClientEndpointHandler.PATH); + logger.log(Level.CONFIG, "Setting up agent (receiver) web-socket endpoint at: " + agentPath); + logger.log(Level.CONFIG, "Setting up client (initiator) web-socket endpoint at: " + clientPath); + CommandChannelEndpointHandlerFactory configFactory = new CommandChannelEndpointHandlerFactory(); + ServerEndpointConfig agentConf = configFactory.createEndpointConfig(CommandChannelAgentEndpointHandler.class, + agentPath, + serviceConfig); + ServerEndpointConfig clientConf = configFactory.createEndpointConfig(CommandChannelClientEndpointHandler.class, + clientPath, + serviceConfig); try { - container.addEndpoint(CommandChannelAgentEndpointHandler.class); - container.addEndpoint(CommandChannelClientEndpointHandler.class); + container.addEndpoint(agentConf); + container.addEndpoint(clientConf); } catch (DeploymentException e) { throw new RuntimeException(e); } @@ -67,4 +88,8 @@ // no-op } + private String buildPathWithVersion(final String version, final String path) { + return "/" + version + path; + } + }
--- a/services/commands/src/main/webapp/WEB-INF/web.xml Thu Aug 31 15:34:39 2017 +0200 +++ b/services/commands/src/main/webapp/WEB-INF/web.xml Mon Sep 04 08:46:20 2017 +0200 @@ -60,6 +60,11 @@ <listener> <listener-class>com.redhat.thermostat.gateway.service.commands.servlet.SocketRegistrationListener</listener-class> </listener> + <!-- Context configuration --> + <context-param> + <param-name>com.redhat.thermostat.gateway.SERVICE_VERSION</param-name> + <param-value>v1</param-value> + </context-param> <!-- Allow viewing of API spec without authentication --> <security-constraint> <web-resource-collection>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/AuthBasicCoreServerTest.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,219 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import static org.mockito.Mockito.mock; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.servlet.Servlet; +import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerEndpointConfig; + +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.redhat.thermostat.gateway.common.core.config.Configuration; +import com.redhat.thermostat.gateway.common.core.config.GlobalConfiguration; +import com.redhat.thermostat.gateway.server.CoreServerBuilder; +import com.redhat.thermostat.gateway.server.auth.basic.BasicLoginService; +import com.redhat.thermostat.gateway.server.auth.basic.BasicUserStore; +import com.redhat.thermostat.gateway.server.services.CoreService; +import com.redhat.thermostat.gateway.server.services.CoreServiceBuilder; + +public class AuthBasicCoreServerTest { + + private static final String SERVICE_NAME = "commands"; + private static final String CONTEXT_NAME = "/" + SERVICE_NAME; + private static final int TEST_PORT = 32039; + private static final String TEST_ADDRESS = "127.0.0.1"; + protected WebSocketClient client; + protected final String baseUrl = "ws://" + TEST_ADDRESS + ":" + TEST_PORT + CONTEXT_NAME +"/v1/"; + + private static Thread thread; + private static Server server; + + protected static CountDownLatch serverReadyLatch = new CountDownLatch(1); + + @BeforeClass + public static void setupClass() { + CoreServerBuilder builder = new CoreServerBuilder(); + builder.setServiceBuilder(new CoreServiceBuilder() { + + @Override + public CoreServiceBuilder setConfiguration(Configuration config) { + // nothing + return null; + } + + @Override + public List<CoreService> build() { + return Arrays.<CoreService>asList(new TestCoreService(getUserConfig())); + } + }); + builder.setServerConfiguration(new Configuration() { + + @Override + public Map<String, Object> asMap() { + Map<String, Object> config = new HashMap<>(); + config.put(GlobalConfiguration.ConfigurationKey.IP.name(), TEST_ADDRESS); + config.put(GlobalConfiguration.ConfigurationKey.PORT.name(), Integer.toString(TEST_PORT)); + config.put(GlobalConfiguration.ConfigurationKey.WITH_SWAGGER_UI.name(), Boolean.FALSE.toString()); + config.put(GlobalConfiguration.ConfigurationKey.WITH_WEB_CLIENT.name(), Boolean.FALSE.toString()); + return config; + } + }); + server = builder.build(); + thread = new Thread(new Runnable() { + @Override + public void run() { + try { + server.start(); + serverReadyLatch.countDown(); + server.join(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + thread.start(); + } + + @Before + public void setup() throws Exception { + boolean expired = !serverReadyLatch.await(2, TimeUnit.SECONDS); + if (expired) { + throw new RuntimeException("Server not becoming available"); + } + client = new WebSocketClient(); + client.start(); + } + + @AfterClass + public static void teardownClass() throws Exception { + server.stop(); + thread.join(); + } + + protected static Map<String, String> getUserConfig() { + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("foo-agent-user", "agent-pwd,receiver_provider-commands"); + userConfig.put("bar-client-user", "client-pwd,dump_heap-commands"); + userConfig.put("insufficient-roles-agent", "agent-pwd"); + userConfig.put("insufficient-roles-client", "client-pwd"); + return userConfig; + }; + + public static class TestCoreService implements CoreService { + + private final Map<String, String> userConfig; + + public TestCoreService(Map<String, String> userConfig) { + this.userConfig = userConfig; + } + + @Override + public ServletContextHandler createServletContextHandler(Server server) { + ServletContextHandler contextHandler = createContext(server); + addWebSocketsHandlers(server, contextHandler); + setupAuthForContext(server, contextHandler); + return contextHandler; + } + + private ServletContextHandler createContext(Server server) { + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SECURITY|ServletContextHandler.SESSIONS); + context.setServer(server); + context.setContextPath(CONTEXT_NAME); + context.addServlet(new ServletHolder(mock(Servlet.class)), "/v1/*"); + return context; + } + + private void setupAuthForContext(Server server, ServletContextHandler contextHandler) { + // FIXME: Filter for websockets does not seem to be called. + // Set up basic auth directly using jetty API + ConstraintSecurityHandler security = new ConstraintSecurityHandler(); + String realmName = "Thermostat Realm"; + Constraint cons = new Constraint(realmName, "thermostat-realm"); + cons.setAuthenticate(true); + ConstraintMapping mapping = new ConstraintMapping(); + mapping.setConstraint(cons); + mapping.setMethod("Basic"); + mapping.setPathSpec("/*"); + security.setConstraintMappings(Collections.singletonList(mapping)); + security.setAuthenticator(new BasicAuthenticator()); + security.setLoginService(new BasicLoginService(new BasicUserStore(userConfig), realmName)); + contextHandler.setSecurityHandler(security); + } + + private void addWebSocketsHandlers(Server server, ServletContextHandler contextHandler) { + // Initialize javax.websocket layer + try { + contextHandler.setServer(server); + ServerContainer container = WebSocketServerContainerInitializer.configureContext(contextHandler); + CommandChannelEndpointHandlerFactory configFactory = new CommandChannelEndpointHandlerFactory(); + Configuration serviceConfig = mock(Configuration.class); + ServerEndpointConfig agentConf = configFactory.createEndpointConfig(CommandChannelAgentEndpointHandler.class, + "/v1" + CommandChannelAgentEndpointHandler.PATH, + serviceConfig); + ServerEndpointConfig clientConf = configFactory.createEndpointConfig(CommandChannelClientEndpointHandler.class, + "/v1" + CommandChannelClientEndpointHandler.PATH, + serviceConfig); + container.addEndpoint(agentConf); + container.addEndpoint(clientConf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CmdChannelAgentSocket.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,201 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; + +import com.google.gson.Gson; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; + +/** + * Handles the agent connections from the Web endpoint. + * + */ +@WebSocket(maxTextMessageSize = 64 * 1024) +public class CmdChannelAgentSocket { + + private final Gson gson; + private final CountDownLatch closeLatch; + private final CountDownLatch connectLatch; + private final CountDownLatch pongReceived; + private final CountDownLatch pingReceived; + private final OnMessageCallBack onMessage; + private Session session; + private String pongMsg; + private String pingMsg; + + public CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, Gson gson) { + this(onMessage, connect, new CountDownLatch(1), new CountDownLatch(1), gson); + } + + public CmdChannelAgentSocket(OnMessageCallBack onMessage, Gson gson) { + this(onMessage, new CountDownLatch(1), gson); + } + + public CmdChannelAgentSocket(CountDownLatch connect, CountDownLatch pingPongSignal, boolean isPing) { + this(new NoOpMsgCallback(), connect, isPing ? new CountDownLatch(1) : pingPongSignal, isPing ? pingPongSignal : new CountDownLatch(1), null); + } + + private CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, CountDownLatch pongReceived, CountDownLatch pingReceived, Gson gson) { + this.closeLatch = new CountDownLatch(1); + this.onMessage = onMessage; + this.connectLatch = connect; + this.gson = gson; + this.pongReceived = pongReceived; + this.pingReceived = pingReceived; + } + + + public void awaitClose() throws InterruptedException { + this.closeLatch.await(); + } + + @OnWebSocketFrame + public void onFrame(Frame frame) { + switch (frame.getType()) { + case PONG: + handlePong(frame.getPayload()); + break; + case PING: + ByteBuffer payload = null; + if (frame.hasPayload()) { + payload = frame.getPayload(); + } + handlePing(payload); + break; + default: + // nothing to do + } + } + + private void handlePing(ByteBuffer payload) { + RemoteEndpoint endPoint = session.getRemote(); + try { + // Note: sendPong() will change the position of the payload + // so the string needs to get retrieved before sendPong() + // is called. + pingMsg = getStringFromPayload(payload); + endPoint.sendPong(payload); + pingReceived.countDown(); + } catch (IOException e) { + System.err.println("Failed to send pong response!"); + e.printStackTrace(); + } + } + + private void handlePong(ByteBuffer payload) { + pongMsg = getStringFromPayload(payload); + pongReceived.countDown(); + } + + private String getStringFromPayload(ByteBuffer payload) { + int limit = payload.limit(); + int position = payload.position(); + int length = limit - position; + byte[] buf = new byte[length]; + for (int i = 0; position < limit; position++, i++) { + buf[i] = payload.get(position); + } + return new String(buf); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) { + this.closeLatch.countDown(); // trigger latch + } + + @OnWebSocketConnect + public void onConnect(Session session) { + this.session = session; + this.connectLatch.countDown(); + } + + @OnWebSocketError + public void onError(Throwable cause) { + Throwable realCause = cause.getCause(); + while (realCause != null) { + realCause.printStackTrace(); + realCause = realCause.getCause(); + } + } + + @OnWebSocketMessage + public void onMessage(final Session session, final String msg) { + final Message message = gson.fromJson(msg, Message.class); + onMessage.run(session, message); + } + + public void closeSession() { + if (session != null) { + this.session.close(); + } + } + + public void sendPingToServer(String msgPayload) throws IOException { + if (this.session == null) { + throw new NullPointerException("Session null. Agent not connected?"); + } + RemoteEndpoint endpoint = session.getRemote(); + ByteBuffer pingPayload = ByteBuffer.wrap(msgPayload.getBytes()); + endpoint.sendPing(pingPayload); + System.err.println("Client: Ping msg sent <<" + msgPayload + ">>"); + } + + public String getPongMsg() { + return pongMsg; + } + + public String getPingMsg() { + return pingMsg; + } + + public interface OnMessageCallBack { + public void run(Session session, Message msg); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CmdChannelClientSocket.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,131 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import java.util.concurrent.CountDownLatch; + +import com.redhat.thermostat.gateway.service.commands.channel.coders.typeadapters.MessageTypeAdapterFactory; +import com.redhat.thermostat.gateway.service.commands.channel.model.ClientRequest; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message.MessageType; +import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; + +/** + * Handles the client initiated actions. E.g. triggering a cmd channel request + * to some agent. + * + */ +@WebSocket(maxTextMessageSize = 64 * 1024) +public class CmdChannelClientSocket { + + private final Gson gson; + private final CountDownLatch closeLatch; + private final CountDownLatch messageSentLatch; + private final ClientRequest request; + private WebSocketResponse resp; + + public CmdChannelClientSocket(ClientRequest request, CountDownLatch messageSentLatch) { + this.closeLatch = new CountDownLatch(1); + this.request = request; + this.messageSentLatch = messageSentLatch; + this.gson = new GsonBuilder() + .registerTypeAdapterFactory(new MessageTypeAdapterFactory()) + .serializeNulls() + .disableHtmlEscaping() + .create(); + } + + public CmdChannelClientSocket(ClientRequest request) { + this(request, new CountDownLatch(1)); + } + + public void awaitClose() throws InterruptedException { + this.closeLatch.await(); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) { + this.closeLatch.countDown(); // trigger latch + } + + @OnWebSocketConnect + public void onConnect(Session session) { + try { + String serializedMsg = gson.toJson(request); + session.getRemote().sendString(serializedMsg); + session.getRemote().flush(); + messageSentLatch.countDown(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + @OnWebSocketError + public void onError(Throwable cause) { + Throwable realCause = cause.getCause(); + while (realCause != null) { + realCause.printStackTrace(); + realCause = realCause.getCause(); + } + } + + @OnWebSocketMessage + public void onMessage(Session session, String msg) { + Message message = gson.fromJson(msg, Message.class); + if (message.getMessageType() != MessageType.RESPONSE) { + throw new AssertionError("Illegal type. Got " + message.getMessageType()); + } + WebSocketResponse r = (WebSocketResponse)message; + if (r.getSequenceId() == request.getSequenceId()) { + this.resp = r; + } + session.close(); + } + + public WebSocketResponse getResponse() { + return resp; + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/CommandChannelEndpointHandlerTest.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,469 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assume.assumeTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.redhat.thermostat.gateway.service.commands.channel.coders.typeadapters.MessageTypeAdapterFactory; +import com.redhat.thermostat.gateway.service.commands.channel.model.AgentRequest; +import com.redhat.thermostat.gateway.service.commands.channel.model.ClientRequest; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message.MessageType; +import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; +import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse.ResponseType; + +public class CommandChannelEndpointHandlerTest extends AuthBasicCoreServerTest { + + private Gson gson; + private ClientRequest clientRequest; + + @Before + public void setUp() { + gson = new GsonBuilder() + .registerTypeAdapterFactory(new MessageTypeAdapterFactory()) + .serializeNulls() + .create(); + clientRequest = new ClientRequest(Message.UNKNOWN_SEQUENCE); + } + + @Test(timeout = 2000) + public void testHandshakeAllRoles() throws Exception { + String agentUser = "foo-agent-user"; + String clientUser = "bar-client-user"; + long clientSequence = 144L; + clientRequest.setSequenceId(clientSequence); + String agentId = "testAgent"; + URI clientUri = new URI( + baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId + + "/jvms/abc/sequence/" + clientSequence); + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final CountDownLatch clientHasSentMessages = new CountDownLatch(1); + CmdChannelClientSocket clientSocket = new CmdChannelClientSocket(clientRequest, clientHasSentMessages); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( + new CmdChannelAgentSocket.OnMessageCallBack() { + @Override + public void run(Session session, Message msg) { + if (msg.getMessageType() != MessageType.AGENT_REQUEST) { + throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); + } + AgentRequest req = (AgentRequest)msg; + WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.OK); + String jsonResp = gson.toJson(resp); + try { + session.getRemote().sendString(jsonResp); + } catch (IOException e) { + e.printStackTrace(); + } + } + }, waitForAgentConnect, gson); + ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(clientUser, "client-pwd")); + + // Ensure agent conneted before we actually do anything + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + // Ensure client has connected and initiated handshake by + // sending its message + client.connect(clientSocket, clientUri, clientRequest); + clientHasSentMessages.await(); + + // wait for client to close the socket + clientSocket.awaitClose(); + + // now we are ready to close the agent socket too + agentSocket.closeSession(); + agentSocket.awaitClose(); + + assertNotNull(clientSocket.getResponse()); + assertEquals(WebSocketResponse.ResponseType.OK, + clientSocket.getResponse().getResponseType()); + } + + /** + * A client which tries to communicate with an agent that has not yet + * connected should fail. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testHandshakeAuthorizedMissingAgentConnect() throws Exception { + long sequenceId = 333L; + clientRequest.setSequenceId(sequenceId); + String clientUser = "bar-client-user"; + String agentId = "testAgent"; + URI clientUri = new URI( + baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId + + "/jvms/abc/sequence/" + sequenceId); + final CountDownLatch clientHasSentMessages = new CountDownLatch(1); + CmdChannelClientSocket clientSocket = new CmdChannelClientSocket(clientRequest, clientHasSentMessages); + ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); + clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(clientUser, "client-pwd")); + client.connect(clientSocket, clientUri, clientRequest); + + boolean clientConnected = clientHasSentMessages.await(1, TimeUnit.SECONDS); + // For some reason the client might not connect when stress-tested. + // Avoid false test failures by using assumeTrue + assumeTrue(clientConnected); + + // wait for closed socket connection. + clientSocket.awaitClose(); + assertNotNull(clientSocket.getResponse()); + assertEquals(WebSocketResponse.ResponseType.ERROR, + clientSocket.getResponse().getResponseType()); + } + + /** + * There is no guarantee in which order messages get processed on the server + * side. Multiple clients might connect before an agent responds. It's possible + * that one channel connect which happened later + * channel request is faster than another. This test simulates some of + * this behavior. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testMultipleHandshakesInterleavedAllRoles() throws Exception { + final long clientSequenceFirst = 901l; + final long clientSequenceSecond = 902l; + ClientRequest r1 = new ClientRequest(clientSequenceFirst); + ClientRequest r2 = new ClientRequest(clientSequenceSecond); + + String agentUser = "foo-agent-user"; + String clientUser = "bar-client-user"; + String clientPassword = "client-pwd"; + String agentId = "testAgent"; + URI firstClientUri = new URI( + baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId + + "/jvms/abc/sequence/" + clientSequenceFirst); + URI secondClientUri = new URI( + baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId + + "/jvms/abc/sequence/" + clientSequenceSecond); + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + CountDownLatch clientsHaveSentMessages = new CountDownLatch(2); + CmdChannelClientSocket firstClientSocket = new CmdChannelClientSocket(r1, clientsHaveSentMessages); + CmdChannelClientSocket secondClientSocket = new CmdChannelClientSocket(r2, clientsHaveSentMessages); + final CountDownLatch waitForSecondClientConnect = new CountDownLatch(1); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + final AtomicInteger agentRespCount = new AtomicInteger(0); + final CountDownLatch allResponsesSent = new CountDownLatch(2); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( + new CmdChannelAgentSocket.OnMessageCallBack() { + @Override + public void run(Session session, Message msg) { + if (agentRespCount.get() == 0) { + try { + waitForSecondClientConnect.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } + } + if (msg.getMessageType() != MessageType.AGENT_REQUEST) { + throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); + } + AgentRequest req = (AgentRequest)msg; + agentRespCount.getAndAdd(1); + try { + if (req.getSequenceId() == clientSequenceFirst) { + WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.OK); + session.getRemote().sendString(gson.toJson(resp)); + session.getRemote().flush(); + allResponsesSent.countDown(); + } else if (req.getSequenceId() == clientSequenceSecond) { + WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.ERROR); + session.getRemote().sendString(gson.toJson(resp)); + session.getRemote().flush(); + allResponsesSent.countDown(); + } else { + throw new AssertionError("Should not get here"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + }, waitForAgentConnect, gson); + ClientUpgradeRequest firstClientRequest = new ClientUpgradeRequest(); + ClientUpgradeRequest secondClientRequest = new ClientUpgradeRequest(); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + firstClientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(clientUser, clientPassword)); + secondClientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(clientUser, clientPassword)); + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + client.connect(firstClientSocket, firstClientUri, firstClientRequest); + client.connect(secondClientSocket, secondClientUri, secondClientRequest); + + // There is no guarantee that the second client actually connected, but + // at least we know that we've called relevant code to do the connection. + waitForSecondClientConnect.countDown(); + + clientsHaveSentMessages.await(); + + // wait before clients have been processed before we close the agent + boolean isAllResponsesSent = allResponsesSent.await(1, TimeUnit.SECONDS); + // We might hit a window where we've got the signal that the agent is + // "ready", but actually still in progress of getting its socket added + // to the registry. In that case, the agent call-back is not going to be + // sending the response. Instead the server aborts with ERROR, since it + // cannot find the socket in the agent socket registry yet. assumeTrue + // that the latch did not expire in order to avoid false test failures. + assumeTrue(isAllResponsesSent); + + // wait for client connections to close + secondClientSocket.awaitClose(); + firstClientSocket.awaitClose(); + + agentSocket.closeSession(); + agentSocket.awaitClose(); + + assertNotNull(firstClientSocket.getResponse()); + assertEquals(WebSocketResponse.ResponseType.OK, + firstClientSocket.getResponse().getResponseType()); + assertNotNull(secondClientSocket.getResponse()); + assertEquals(WebSocketResponse.ResponseType.ERROR, secondClientSocket.getResponse().getResponseType()); + } + + @Test(timeout = 2000) + public void testHandshakeNotAuthenticatedClient() throws Exception { + long clientSequence = 324L; + TestUser clientUser = null /* no auth-creds */; + doNoAuthTestClient(clientSequence, clientUser); + } + + @Test(timeout = 2000) + public void testHandshakeNotAuthenticatedAgent() throws Exception { + TestUser agentUser = null /* no auth-creds */; + doNoAuthTestAgent(agentUser); + } + + @Test(timeout = 2000) + public void testHandshakeNotAuthorizedClient() throws Exception { + long clientSequence = 332l; + TestUser clientUser = new TestUser(); + clientUser.username = "insufficient-roles-client"; + clientUser.password = "client-pwd"; + doNoAuthTestClient(clientSequence, clientUser); + } + + @Test(timeout = 2000) + public void testHandshakeNotAuthorizedAgent() throws Exception { + TestUser agentUser = new TestUser(); + agentUser.username = "insufficient-roles-agent"; + agentUser.password = "agent-pwd"; + doNoAuthTestAgent(agentUser); + } + + /** + * Tests whether the client can ping the server end-point. A server response is + * expected. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testAgentPing() throws Exception { + String agentUser = "foo-agent-user"; + String agentId = "testAgent"; + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + final CountDownLatch pongSignal = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongSignal, false); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + + // Ensure agent conneted before we actually do anything + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + // Initiate ping from client + String pingMsgPayload = "testAgent ping message"; + agentSocket.sendPingToServer(pingMsgPayload); + + // Wait for server pong to come back + pongSignal.await(); + + assertEquals(pingMsgPayload, agentSocket.getPongMsg()); + + // now we are ready to close the agent socket too + agentSocket.closeSession(); + agentSocket.awaitClose(); + } + + /** + * Tests whether a server ping is properly responded to by the client by sending a + * pong response. + * + * Note: The service sends a ping onConnect() and then every subsequent X minutes + * where X is strictly less than the currently set timeout value for the + * agent/receiver sockets. + * + * @throws Exception + */ + @Test(timeout = 2000) + public void testAgentPong() throws Exception { + String agentUser = "foo-agent-user"; + String agentId = "testAgent"; + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final CountDownLatch waitForAgentConnect = new CountDownLatch(1); + final CountDownLatch pongResponseSent = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongResponseSent, true); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser, "agent-pwd")); + + // Ensure agent conneted before we actually do anything + client.connect(agentSocket, agentUri, agentRequest); + waitForAgentConnect.await(); + + String expectedPingPayload = "1|" + agentId; + pongResponseSent.await(); + assertEquals(expectedPingPayload, agentSocket.getPingMsg()); + + // now we are ready to close the agent socket too + agentSocket.closeSession(); + agentSocket.awaitClose(); + } + + private void doNoAuthTestClient(long clientSequence, TestUser clientUser) throws Exception { + ClientRequest noMatter = new ClientRequest(clientSequence); + String agentId = "testAgent"; + URI clientUri = new URI( + baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId + + "/jvms/abc/sequence/" + clientSequence); + CmdChannelClientSocket clientSocket = new CmdChannelClientSocket( + noMatter /* doesn't matter */); + ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); + if (clientUser != null) { + clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(clientUser.username, clientUser.password)); + } + client.connect(clientSocket, clientUri, clientRequest); + + // wait for client connection to get closed (by the server) + clientSocket.awaitClose(); + + WebSocketResponse resp = clientSocket.getResponse(); + // Sometimes the client response is not sent due to EOF (short reads?) on the + // underlying socket. In order to avoid false test failures use assumeTrue. + // Symptoms: + // "Connection closed: 1006 - EOF: Broken pipe" or + // "Connection closed: 1006 - WebSocket Read EOF" + assumeTrue(resp != null); + assertNotNull(resp); + assertEquals(WebSocketResponse.ResponseType.AUTH_FAIL, + resp.getResponseType()); + assertEquals(clientSequence, resp.getSequenceId()); + } + + private void doNoAuthTestAgent(TestUser agentUser) throws Exception { + String agentId = "testAgent"; + URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); + final WebSocketResponse[] agentResponse = new WebSocketResponse[1]; + final CountDownLatch agentResponseReady = new CountDownLatch(1); + CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( + new CmdChannelAgentSocket.OnMessageCallBack() { + + @Override + public void run(Session session, Message msg) { + if (msg.getMessageType() != MessageType.RESPONSE) { + throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); + } + agentResponse[0] = (WebSocketResponse)msg; + agentResponseReady.countDown(); + } + }, agentResponseReady, gson); + ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); + if (agentUser != null) { + agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), + getBasicAuthHeaderValue(agentUser.username, agentUser.password)); + } + client.connect(agentSocket, agentUri, agentRequest); + + boolean isAgentResponseReady = agentResponseReady.await(1, TimeUnit.SECONDS); + // Sometimes the agent response is not sent due to EOF (short reads?) on the + // underlying socket. In order to avoid false test failures use assumeTrue. + // Symptoms: + // "Connection closed: 1006 - EOF: Broken pipe" or + // "Connection closed: 1006 - WebSocket Read EOF" + assumeTrue(isAgentResponseReady); + + // wait for the agent connection to get closed (by the server) + agentSocket.awaitClose(); + + assertEquals("There is no sequence for agent connections", + Message.UNKNOWN_SEQUENCE, agentResponse[0].getSequenceId()); + } + + private String getBasicAuthHeaderValue(String testUser, String password) { + String userpassword = testUser + ":" + password; + @SuppressWarnings("restriction") + String encodedAuthorization = new sun.misc.BASE64Encoder() + .encode(userpassword.getBytes()); + return "Basic " + encodedAuthorization; + } + + private static class TestUser { + String password; + String username; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/channel/endpoints/NoOpMsgCallback.java Mon Sep 04 08:46:20 2017 +0200 @@ -0,0 +1,50 @@ +/* + * Copyright 2012-2017 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.gateway.service.commands.channel.endpoints; + +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CmdChannelAgentSocket.OnMessageCallBack; +import com.redhat.thermostat.gateway.service.commands.channel.model.Message; +import org.eclipse.jetty.websocket.api.Session; + +class NoOpMsgCallback implements OnMessageCallBack { + + @Override + public void run(Session session, Message msg) { + // nothing + } + +}
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/http/handlers/AuthBasicCoreServerTest.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,210 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import static org.mockito.Mockito.mock; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.servlet.Servlet; -import javax.websocket.server.ServerContainer; - -import org.eclipse.jetty.security.ConstraintMapping; -import org.eclipse.jetty.security.ConstraintSecurityHandler; -import org.eclipse.jetty.security.authentication.BasicAuthenticator; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.util.security.Constraint; -import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import com.redhat.thermostat.gateway.common.core.config.Configuration; -import com.redhat.thermostat.gateway.common.core.config.GlobalConfiguration; -import com.redhat.thermostat.gateway.server.CoreServerBuilder; -import com.redhat.thermostat.gateway.server.auth.basic.BasicLoginService; -import com.redhat.thermostat.gateway.server.auth.basic.BasicUserStore; -import com.redhat.thermostat.gateway.server.services.CoreService; -import com.redhat.thermostat.gateway.server.services.CoreServiceBuilder; - -public class AuthBasicCoreServerTest { - - private static final String SERVICE_NAME = "commands"; - private static final String CONTEXT_NAME = "/" + SERVICE_NAME; - private static final int TEST_PORT = 32039; - private static final String TEST_ADDRESS = "127.0.0.1"; - protected WebSocketClient client; - protected final String baseUrl = "ws://" + TEST_ADDRESS + ":" + TEST_PORT + CONTEXT_NAME +"/v1/"; - - private static Thread thread; - private static Server server; - - protected static CountDownLatch serverReadyLatch = new CountDownLatch(1); - - @BeforeClass - public static void setupClass() { - CoreServerBuilder builder = new CoreServerBuilder(); - builder.setServiceBuilder(new CoreServiceBuilder() { - - @Override - public CoreServiceBuilder setConfiguration(Configuration config) { - // nothing - return null; - } - - @Override - public List<CoreService> build() { - return Arrays.<CoreService>asList(new TestCoreService(getUserConfig())); - } - }); - builder.setServerConfiguration(new Configuration() { - - @Override - public Map<String, Object> asMap() { - Map<String, Object> config = new HashMap<>(); - config.put(GlobalConfiguration.ConfigurationKey.IP.name(), TEST_ADDRESS); - config.put(GlobalConfiguration.ConfigurationKey.PORT.name(), Integer.toString(TEST_PORT)); - config.put(GlobalConfiguration.ConfigurationKey.WITH_SWAGGER_UI.name(), Boolean.FALSE.toString()); - config.put(GlobalConfiguration.ConfigurationKey.WITH_WEB_CLIENT.name(), Boolean.FALSE.toString()); - return config; - } - }); - server = builder.build(); - thread = new Thread(new Runnable() { - @Override - public void run() { - try { - server.start(); - serverReadyLatch.countDown(); - server.join(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - thread.start(); - } - - @Before - public void setup() throws Exception { - boolean expired = !serverReadyLatch.await(2, TimeUnit.SECONDS); - if (expired) { - throw new RuntimeException("Server not becoming available"); - } - client = new WebSocketClient(); - client.start(); - } - - @AfterClass - public static void teardownClass() throws Exception { - server.stop(); - thread.join(); - } - - protected static Map<String, String> getUserConfig() { - Map<String, String> userConfig = new HashMap<>(); - userConfig.put("foo-agent-user", "agent-pwd,receiver_provider-commands"); - userConfig.put("bar-client-user", "client-pwd,dump_heap-commands"); - userConfig.put("insufficient-roles-agent", "agent-pwd"); - userConfig.put("insufficient-roles-client", "client-pwd"); - return userConfig; - }; - - public static class TestCoreService implements CoreService { - - private final Map<String, String> userConfig; - - public TestCoreService(Map<String, String> userConfig) { - this.userConfig = userConfig; - } - - @Override - public ServletContextHandler createServletContextHandler(Server server) { - ServletContextHandler contextHandler = createContext(server); - addWebSocketsHandlers(server, contextHandler); - setupAuthForContext(server, contextHandler); - return contextHandler; - } - - private ServletContextHandler createContext(Server server) { - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SECURITY|ServletContextHandler.SESSIONS); - context.setServer(server); - context.setContextPath(CONTEXT_NAME); - context.addServlet(new ServletHolder(mock(Servlet.class)), "/v1/*"); - return context; - } - - private void setupAuthForContext(Server server, ServletContextHandler contextHandler) { - // FIXME: Filter for websockets does not seem to be called. - // Set up basic auth directly using jetty API - ConstraintSecurityHandler security = new ConstraintSecurityHandler(); - String realmName = "Thermostat Realm"; - Constraint cons = new Constraint(realmName, "thermostat-realm"); - cons.setAuthenticate(true); - ConstraintMapping mapping = new ConstraintMapping(); - mapping.setConstraint(cons); - mapping.setMethod("Basic"); - mapping.setPathSpec("/*"); - security.setConstraintMappings(Collections.singletonList(mapping)); - security.setAuthenticator(new BasicAuthenticator()); - security.setLoginService(new BasicLoginService(new BasicUserStore(userConfig), realmName)); - contextHandler.setSecurityHandler(security); - } - - private void addWebSocketsHandlers(Server server, ServletContextHandler contextHandler) { - // Initialize javax.websocket layer - try { - contextHandler.setServer(server); - ServerContainer container = WebSocketServerContainerInitializer.configureContext(contextHandler); - container.addEndpoint(CommandChannelClientEndpointHandler.class); - container.addEndpoint(CommandChannelAgentEndpointHandler.class); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - } -}
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CmdChannelAgentSocket.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,201 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -import org.eclipse.jetty.websocket.api.RemoteEndpoint; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.api.extensions.Frame; - -import com.google.gson.Gson; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; - -/** - * Handles the agent connections from the Web endpoint. - * - */ -@WebSocket(maxTextMessageSize = 64 * 1024) -public class CmdChannelAgentSocket { - - private final Gson gson; - private final CountDownLatch closeLatch; - private final CountDownLatch connectLatch; - private final CountDownLatch pongReceived; - private final CountDownLatch pingReceived; - private final OnMessageCallBack onMessage; - private Session session; - private String pongMsg; - private String pingMsg; - - public CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, Gson gson) { - this(onMessage, connect, new CountDownLatch(1), new CountDownLatch(1), gson); - } - - public CmdChannelAgentSocket(OnMessageCallBack onMessage, Gson gson) { - this(onMessage, new CountDownLatch(1), gson); - } - - public CmdChannelAgentSocket(CountDownLatch connect, CountDownLatch pingPongSignal, boolean isPing) { - this(new NoOpMsgCallback(), connect, isPing ? new CountDownLatch(1) : pingPongSignal, isPing ? pingPongSignal : new CountDownLatch(1), null); - } - - private CmdChannelAgentSocket(OnMessageCallBack onMessage, CountDownLatch connect, CountDownLatch pongReceived, CountDownLatch pingReceived, Gson gson) { - this.closeLatch = new CountDownLatch(1); - this.onMessage = onMessage; - this.connectLatch = connect; - this.gson = gson; - this.pongReceived = pongReceived; - this.pingReceived = pingReceived; - } - - - public void awaitClose() throws InterruptedException { - this.closeLatch.await(); - } - - @OnWebSocketFrame - public void onFrame(Frame frame) { - switch (frame.getType()) { - case PONG: - handlePong(frame.getPayload()); - break; - case PING: - ByteBuffer payload = null; - if (frame.hasPayload()) { - payload = frame.getPayload(); - } - handlePing(payload); - break; - default: - // nothing to do - } - } - - private void handlePing(ByteBuffer payload) { - RemoteEndpoint endPoint = session.getRemote(); - try { - // Note: sendPong() will change the position of the payload - // so the string needs to get retrieved before sendPong() - // is called. - pingMsg = getStringFromPayload(payload); - endPoint.sendPong(payload); - pingReceived.countDown(); - } catch (IOException e) { - System.err.println("Failed to send pong response!"); - e.printStackTrace(); - } - } - - private void handlePong(ByteBuffer payload) { - pongMsg = getStringFromPayload(payload); - pongReceived.countDown(); - } - - private String getStringFromPayload(ByteBuffer payload) { - int limit = payload.limit(); - int position = payload.position(); - int length = limit - position; - byte[] buf = new byte[length]; - for (int i = 0; position < limit; position++, i++) { - buf[i] = payload.get(position); - } - return new String(buf); - } - - @OnWebSocketClose - public void onClose(int statusCode, String reason) { - this.closeLatch.countDown(); // trigger latch - } - - @OnWebSocketConnect - public void onConnect(Session session) { - this.session = session; - this.connectLatch.countDown(); - } - - @OnWebSocketError - public void onError(Throwable cause) { - Throwable realCause = cause.getCause(); - while (realCause != null) { - realCause.printStackTrace(); - realCause = realCause.getCause(); - } - } - - @OnWebSocketMessage - public void onMessage(final Session session, final String msg) { - final Message message = gson.fromJson(msg, Message.class); - onMessage.run(session, message); - } - - public void closeSession() { - if (session != null) { - this.session.close(); - } - } - - public void sendPingToServer(String msgPayload) throws IOException { - if (this.session == null) { - throw new NullPointerException("Session null. Agent not connected?"); - } - RemoteEndpoint endpoint = session.getRemote(); - ByteBuffer pingPayload = ByteBuffer.wrap(msgPayload.getBytes()); - endpoint.sendPing(pingPayload); - System.err.println("Client: Ping msg sent <<" + msgPayload + ">>"); - } - - public String getPongMsg() { - return pongMsg; - } - - public String getPingMsg() { - return pingMsg; - } - - public interface OnMessageCallBack { - public void run(Session session, Message msg); - } -}
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CmdChannelClientSocket.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,131 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import java.util.concurrent.CountDownLatch; - -import com.redhat.thermostat.gateway.service.commands.channel.coders.typeadapters.MessageTypeAdapterFactory; -import com.redhat.thermostat.gateway.service.commands.channel.model.ClientRequest; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message.MessageType; -import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; - -/** - * Handles the client initiated actions. E.g. triggering a cmd channel request - * to some agent. - * - */ -@WebSocket(maxTextMessageSize = 64 * 1024) -public class CmdChannelClientSocket { - - private final Gson gson; - private final CountDownLatch closeLatch; - private final CountDownLatch messageSentLatch; - private final ClientRequest request; - private WebSocketResponse resp; - - public CmdChannelClientSocket(ClientRequest request, CountDownLatch messageSentLatch) { - this.closeLatch = new CountDownLatch(1); - this.request = request; - this.messageSentLatch = messageSentLatch; - this.gson = new GsonBuilder() - .registerTypeAdapterFactory(new MessageTypeAdapterFactory()) - .serializeNulls() - .disableHtmlEscaping() - .create(); - } - - public CmdChannelClientSocket(ClientRequest request) { - this(request, new CountDownLatch(1)); - } - - public void awaitClose() throws InterruptedException { - this.closeLatch.await(); - } - - @OnWebSocketClose - public void onClose(int statusCode, String reason) { - this.closeLatch.countDown(); // trigger latch - } - - @OnWebSocketConnect - public void onConnect(Session session) { - try { - String serializedMsg = gson.toJson(request); - session.getRemote().sendString(serializedMsg); - session.getRemote().flush(); - messageSentLatch.countDown(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - @OnWebSocketError - public void onError(Throwable cause) { - Throwable realCause = cause.getCause(); - while (realCause != null) { - realCause.printStackTrace(); - realCause = realCause.getCause(); - } - } - - @OnWebSocketMessage - public void onMessage(Session session, String msg) { - Message message = gson.fromJson(msg, Message.class); - if (message.getMessageType() != MessageType.RESPONSE) { - throw new AssertionError("Illegal type. Got " + message.getMessageType()); - } - WebSocketResponse r = (WebSocketResponse)message; - if (r.getSequenceId() == request.getSequenceId()) { - this.resp = r; - } - session.close(); - } - - public WebSocketResponse getResponse() { - return resp; - } -} \ No newline at end of file
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/http/handlers/CommandChannelEndpointHandlerTest.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,469 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assume.assumeTrue; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.junit.Before; -import org.junit.Test; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.redhat.thermostat.gateway.service.commands.channel.coders.typeadapters.MessageTypeAdapterFactory; -import com.redhat.thermostat.gateway.service.commands.channel.model.AgentRequest; -import com.redhat.thermostat.gateway.service.commands.channel.model.ClientRequest; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import com.redhat.thermostat.gateway.service.commands.channel.model.Message.MessageType; -import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse; -import com.redhat.thermostat.gateway.service.commands.channel.model.WebSocketResponse.ResponseType; - -public class CommandChannelEndpointHandlerTest extends AuthBasicCoreServerTest { - - private Gson gson; - private ClientRequest clientRequest; - - @Before - public void setUp() { - gson = new GsonBuilder() - .registerTypeAdapterFactory(new MessageTypeAdapterFactory()) - .serializeNulls() - .create(); - clientRequest = new ClientRequest(Message.UNKNOWN_SEQUENCE); - } - - @Test(timeout = 2000) - public void testHandshakeAllRoles() throws Exception { - String agentUser = "foo-agent-user"; - String clientUser = "bar-client-user"; - long clientSequence = 144L; - clientRequest.setSequenceId(clientSequence); - String agentId = "testAgent"; - URI clientUri = new URI( - baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId - + "/jvms/abc/sequence/" + clientSequence); - URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); - final CountDownLatch clientHasSentMessages = new CountDownLatch(1); - CmdChannelClientSocket clientSocket = new CmdChannelClientSocket(clientRequest, clientHasSentMessages); - final CountDownLatch waitForAgentConnect = new CountDownLatch(1); - CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( - new CmdChannelAgentSocket.OnMessageCallBack() { - @Override - public void run(Session session, Message msg) { - if (msg.getMessageType() != MessageType.AGENT_REQUEST) { - throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); - } - AgentRequest req = (AgentRequest)msg; - WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.OK); - String jsonResp = gson.toJson(resp); - try { - session.getRemote().sendString(jsonResp); - } catch (IOException e) { - e.printStackTrace(); - } - } - }, waitForAgentConnect, gson); - ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); - ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); - agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(agentUser, "agent-pwd")); - clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(clientUser, "client-pwd")); - - // Ensure agent conneted before we actually do anything - client.connect(agentSocket, agentUri, agentRequest); - waitForAgentConnect.await(); - - // Ensure client has connected and initiated handshake by - // sending its message - client.connect(clientSocket, clientUri, clientRequest); - clientHasSentMessages.await(); - - // wait for client to close the socket - clientSocket.awaitClose(); - - // now we are ready to close the agent socket too - agentSocket.closeSession(); - agentSocket.awaitClose(); - - assertNotNull(clientSocket.getResponse()); - assertEquals(WebSocketResponse.ResponseType.OK, - clientSocket.getResponse().getResponseType()); - } - - /** - * A client which tries to communicate with an agent that has not yet - * connected should fail. - * - * @throws Exception - */ - @Test(timeout = 2000) - public void testHandshakeAuthorizedMissingAgentConnect() throws Exception { - long sequenceId = 333L; - clientRequest.setSequenceId(sequenceId); - String clientUser = "bar-client-user"; - String agentId = "testAgent"; - URI clientUri = new URI( - baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId - + "/jvms/abc/sequence/" + sequenceId); - final CountDownLatch clientHasSentMessages = new CountDownLatch(1); - CmdChannelClientSocket clientSocket = new CmdChannelClientSocket(clientRequest, clientHasSentMessages); - ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); - clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(clientUser, "client-pwd")); - client.connect(clientSocket, clientUri, clientRequest); - - boolean clientConnected = clientHasSentMessages.await(1, TimeUnit.SECONDS); - // For some reason the client might not connect when stress-tested. - // Avoid false test failures by using assumeTrue - assumeTrue(clientConnected); - - // wait for closed socket connection. - clientSocket.awaitClose(); - assertNotNull(clientSocket.getResponse()); - assertEquals(WebSocketResponse.ResponseType.ERROR, - clientSocket.getResponse().getResponseType()); - } - - /** - * There is no guarantee in which order messages get processed on the server - * side. Multiple clients might connect before an agent responds. It's possible - * that one channel connect which happened later - * channel request is faster than another. This test simulates some of - * this behavior. - * - * @throws Exception - */ - @Test(timeout = 2000) - public void testMultipleHandshakesInterleavedAllRoles() throws Exception { - final long clientSequenceFirst = 901l; - final long clientSequenceSecond = 902l; - ClientRequest r1 = new ClientRequest(clientSequenceFirst); - ClientRequest r2 = new ClientRequest(clientSequenceSecond); - - String agentUser = "foo-agent-user"; - String clientUser = "bar-client-user"; - String clientPassword = "client-pwd"; - String agentId = "testAgent"; - URI firstClientUri = new URI( - baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId - + "/jvms/abc/sequence/" + clientSequenceFirst); - URI secondClientUri = new URI( - baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId - + "/jvms/abc/sequence/" + clientSequenceSecond); - URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); - CountDownLatch clientsHaveSentMessages = new CountDownLatch(2); - CmdChannelClientSocket firstClientSocket = new CmdChannelClientSocket(r1, clientsHaveSentMessages); - CmdChannelClientSocket secondClientSocket = new CmdChannelClientSocket(r2, clientsHaveSentMessages); - final CountDownLatch waitForSecondClientConnect = new CountDownLatch(1); - final CountDownLatch waitForAgentConnect = new CountDownLatch(1); - final AtomicInteger agentRespCount = new AtomicInteger(0); - final CountDownLatch allResponsesSent = new CountDownLatch(2); - CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( - new CmdChannelAgentSocket.OnMessageCallBack() { - @Override - public void run(Session session, Message msg) { - if (agentRespCount.get() == 0) { - try { - waitForSecondClientConnect.await(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } - } - if (msg.getMessageType() != MessageType.AGENT_REQUEST) { - throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); - } - AgentRequest req = (AgentRequest)msg; - agentRespCount.getAndAdd(1); - try { - if (req.getSequenceId() == clientSequenceFirst) { - WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.OK); - session.getRemote().sendString(gson.toJson(resp)); - session.getRemote().flush(); - allResponsesSent.countDown(); - } else if (req.getSequenceId() == clientSequenceSecond) { - WebSocketResponse resp = new WebSocketResponse(req.getSequenceId(), ResponseType.ERROR); - session.getRemote().sendString(gson.toJson(resp)); - session.getRemote().flush(); - allResponsesSent.countDown(); - } else { - throw new AssertionError("Should not get here"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - }, waitForAgentConnect, gson); - ClientUpgradeRequest firstClientRequest = new ClientUpgradeRequest(); - ClientUpgradeRequest secondClientRequest = new ClientUpgradeRequest(); - ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); - agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(agentUser, "agent-pwd")); - firstClientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(clientUser, clientPassword)); - secondClientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(clientUser, clientPassword)); - client.connect(agentSocket, agentUri, agentRequest); - waitForAgentConnect.await(); - - client.connect(firstClientSocket, firstClientUri, firstClientRequest); - client.connect(secondClientSocket, secondClientUri, secondClientRequest); - - // There is no guarantee that the second client actually connected, but - // at least we know that we've called relevant code to do the connection. - waitForSecondClientConnect.countDown(); - - clientsHaveSentMessages.await(); - - // wait before clients have been processed before we close the agent - boolean isAllResponsesSent = allResponsesSent.await(1, TimeUnit.SECONDS); - // We might hit a window where we've got the signal that the agent is - // "ready", but actually still in progress of getting its socket added - // to the registry. In that case, the agent call-back is not going to be - // sending the response. Instead the server aborts with ERROR, since it - // cannot find the socket in the agent socket registry yet. assumeTrue - // that the latch did not expire in order to avoid false test failures. - assumeTrue(isAllResponsesSent); - - // wait for client connections to close - secondClientSocket.awaitClose(); - firstClientSocket.awaitClose(); - - agentSocket.closeSession(); - agentSocket.awaitClose(); - - assertNotNull(firstClientSocket.getResponse()); - assertEquals(WebSocketResponse.ResponseType.OK, - firstClientSocket.getResponse().getResponseType()); - assertNotNull(secondClientSocket.getResponse()); - assertEquals(WebSocketResponse.ResponseType.ERROR, secondClientSocket.getResponse().getResponseType()); - } - - @Test(timeout = 2000) - public void testHandshakeNotAuthenticatedClient() throws Exception { - long clientSequence = 324L; - TestUser clientUser = null /* no auth-creds */; - doNoAuthTestClient(clientSequence, clientUser); - } - - @Test(timeout = 2000) - public void testHandshakeNotAuthenticatedAgent() throws Exception { - TestUser agentUser = null /* no auth-creds */; - doNoAuthTestAgent(agentUser); - } - - @Test(timeout = 2000) - public void testHandshakeNotAuthorizedClient() throws Exception { - long clientSequence = 332l; - TestUser clientUser = new TestUser(); - clientUser.username = "insufficient-roles-client"; - clientUser.password = "client-pwd"; - doNoAuthTestClient(clientSequence, clientUser); - } - - @Test(timeout = 2000) - public void testHandshakeNotAuthorizedAgent() throws Exception { - TestUser agentUser = new TestUser(); - agentUser.username = "insufficient-roles-agent"; - agentUser.password = "agent-pwd"; - doNoAuthTestAgent(agentUser); - } - - /** - * Tests whether the client can ping the server end-point. A server response is - * expected. - * - * @throws Exception - */ - @Test(timeout = 2000) - public void testAgentPing() throws Exception { - String agentUser = "foo-agent-user"; - String agentId = "testAgent"; - URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); - final CountDownLatch waitForAgentConnect = new CountDownLatch(1); - final CountDownLatch pongSignal = new CountDownLatch(1); - CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongSignal, false); - ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); - agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(agentUser, "agent-pwd")); - - // Ensure agent conneted before we actually do anything - client.connect(agentSocket, agentUri, agentRequest); - waitForAgentConnect.await(); - - // Initiate ping from client - String pingMsgPayload = "testAgent ping message"; - agentSocket.sendPingToServer(pingMsgPayload); - - // Wait for server pong to come back - pongSignal.await(); - - assertEquals(pingMsgPayload, agentSocket.getPongMsg()); - - // now we are ready to close the agent socket too - agentSocket.closeSession(); - agentSocket.awaitClose(); - } - - /** - * Tests whether a server ping is properly responded to by the client by sending a - * pong response. - * - * Note: The service sends a ping onConnect() and then every subsequent X minutes - * where X is strictly less than the currently set timeout value for the - * agent/receiver sockets. - * - * @throws Exception - */ - @Test(timeout = 2000) - public void testAgentPong() throws Exception { - String agentUser = "foo-agent-user"; - String agentId = "testAgent"; - URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); - final CountDownLatch waitForAgentConnect = new CountDownLatch(1); - final CountDownLatch pongResponseSent = new CountDownLatch(1); - CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket(waitForAgentConnect, pongResponseSent, true); - ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); - agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(agentUser, "agent-pwd")); - - // Ensure agent conneted before we actually do anything - client.connect(agentSocket, agentUri, agentRequest); - waitForAgentConnect.await(); - - String expectedPingPayload = "1|" + agentId; - pongResponseSent.await(); - assertEquals(expectedPingPayload, agentSocket.getPingMsg()); - - // now we are ready to close the agent socket too - agentSocket.closeSession(); - agentSocket.awaitClose(); - } - - private void doNoAuthTestClient(long clientSequence, TestUser clientUser) throws Exception { - ClientRequest noMatter = new ClientRequest(clientSequence); - String agentId = "testAgent"; - URI clientUri = new URI( - baseUrl + "actions/dump_heap/systems/foo/agents/" + agentId - + "/jvms/abc/sequence/" + clientSequence); - CmdChannelClientSocket clientSocket = new CmdChannelClientSocket( - noMatter /* doesn't matter */); - ClientUpgradeRequest clientRequest = new ClientUpgradeRequest(); - if (clientUser != null) { - clientRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(clientUser.username, clientUser.password)); - } - client.connect(clientSocket, clientUri, clientRequest); - - // wait for client connection to get closed (by the server) - clientSocket.awaitClose(); - - WebSocketResponse resp = clientSocket.getResponse(); - // Sometimes the client response is not sent due to EOF (short reads?) on the - // underlying socket. In order to avoid false test failures use assumeTrue. - // Symptoms: - // "Connection closed: 1006 - EOF: Broken pipe" or - // "Connection closed: 1006 - WebSocket Read EOF" - assumeTrue(resp != null); - assertNotNull(resp); - assertEquals(WebSocketResponse.ResponseType.AUTH_FAIL, - resp.getResponseType()); - assertEquals(clientSequence, resp.getSequenceId()); - } - - private void doNoAuthTestAgent(TestUser agentUser) throws Exception { - String agentId = "testAgent"; - URI agentUri = new URI(baseUrl + "systems/foo/agents/" + agentId); - final WebSocketResponse[] agentResponse = new WebSocketResponse[1]; - final CountDownLatch agentResponseReady = new CountDownLatch(1); - CmdChannelAgentSocket agentSocket = new CmdChannelAgentSocket( - new CmdChannelAgentSocket.OnMessageCallBack() { - - @Override - public void run(Session session, Message msg) { - if (msg.getMessageType() != MessageType.RESPONSE) { - throw new AssertionError("Wrong message type. Got: " + msg.getClass().getName()); - } - agentResponse[0] = (WebSocketResponse)msg; - agentResponseReady.countDown(); - } - }, agentResponseReady, gson); - ClientUpgradeRequest agentRequest = new ClientUpgradeRequest(); - if (agentUser != null) { - agentRequest.setHeader(HttpHeader.AUTHORIZATION.asString(), - getBasicAuthHeaderValue(agentUser.username, agentUser.password)); - } - client.connect(agentSocket, agentUri, agentRequest); - - boolean isAgentResponseReady = agentResponseReady.await(1, TimeUnit.SECONDS); - // Sometimes the agent response is not sent due to EOF (short reads?) on the - // underlying socket. In order to avoid false test failures use assumeTrue. - // Symptoms: - // "Connection closed: 1006 - EOF: Broken pipe" or - // "Connection closed: 1006 - WebSocket Read EOF" - assumeTrue(isAgentResponseReady); - - // wait for the agent connection to get closed (by the server) - agentSocket.awaitClose(); - - assertEquals("There is no sequence for agent connections", - Message.UNKNOWN_SEQUENCE, agentResponse[0].getSequenceId()); - } - - private String getBasicAuthHeaderValue(String testUser, String password) { - String userpassword = testUser + ":" + password; - @SuppressWarnings("restriction") - String encodedAuthorization = new sun.misc.BASE64Encoder() - .encode(userpassword.getBytes()); - return "Basic " + encodedAuthorization; - } - - private static class TestUser { - String password; - String username; - } -}
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/http/handlers/NoOpMsgCallback.java Thu Aug 31 15:34:39 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -/* - * Copyright 2012-2017 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.gateway.service.commands.http.handlers; - -import com.redhat.thermostat.gateway.service.commands.channel.model.Message; -import org.eclipse.jetty.websocket.api.Session; - -import com.redhat.thermostat.gateway.service.commands.http.handlers.CmdChannelAgentSocket.OnMessageCallBack; - -class NoOpMsgCallback implements OnMessageCallBack { - - @Override - public void run(Session session, Message msg) { - // nothing - } - -}
--- a/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/servlet/SocketRegistrationListenerTest.java Thu Aug 31 15:34:39 2017 +0200 +++ b/services/commands/src/test/java/com/redhat/thermostat/gateway/service/commands/servlet/SocketRegistrationListenerTest.java Mon Sep 04 08:46:20 2017 +0200 @@ -36,23 +36,34 @@ package com.redhat.thermostat.gateway.service.commands.servlet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; + import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.websocket.DeploymentException; import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerEndpointConfig; -import com.redhat.thermostat.gateway.service.commands.http.handlers.CommandChannelAgentEndpointHandler; -import com.redhat.thermostat.gateway.service.commands.http.handlers.CommandChannelClientEndpointHandler; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.gateway.common.core.servlet.GlobalConstants; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CommandChannelAgentEndpointHandler; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.CommandChannelClientEndpointHandler; +import com.redhat.thermostat.gateway.service.commands.channel.endpoints.RealmAuthorizerConfigurator; public class SocketRegistrationListenerTest { + private static final String VERSION = "1.0.1"; private ServletContextEvent event; private ServletContext context; private ServerContainer serverContainer; @@ -62,6 +73,7 @@ serverContainer = mock(ServerContainer.class); context = mock(ServletContext.class); when(context.getAttribute(eq("javax.websocket.server.ServerContainer"))).thenReturn(serverContainer); + when(context.getInitParameter(eq(GlobalConstants.SERVICE_VERSION_KEY))).thenReturn(VERSION); event = mock(ServletContextEvent.class); when(event.getServletContext()).thenReturn(context); } @@ -70,7 +82,18 @@ public void contextInitializedAddsSocketEndpoints() throws DeploymentException { SocketRegistrationListener listener = new SocketRegistrationListener(); listener.contextInitialized(event); - verify(serverContainer).addEndpoint(CommandChannelAgentEndpointHandler.class); - verify(serverContainer).addEndpoint(CommandChannelClientEndpointHandler.class); + ArgumentCaptor<ServerEndpointConfig> configCaptor = ArgumentCaptor.forClass(ServerEndpointConfig.class); + verify(serverContainer, times(2)).addEndpoint(configCaptor.capture()); + List<ServerEndpointConfig> configs = configCaptor.getAllValues(); + ServerEndpointConfig agentConfig = configs.get(0); + ServerEndpointConfig clientConfig = configs.get(1); + assertEquals(CommandChannelAgentEndpointHandler.class, agentConfig.getEndpointClass()); + assertEquals(CommandChannelClientEndpointHandler.class, clientConfig.getEndpointClass()); + assertEquals("expected config to be added", 1, agentConfig.getUserProperties().size()); + assertEquals("expected config to be added", 1, clientConfig.getUserProperties().size()); + assertEquals("/1.0.1/actions/{action}/systems/{systemId}/agents/{agentId}/jvms/{jvmId}/sequence/{seqId}", clientConfig.getPath()); + assertEquals("/1.0.1/systems/{systemId}/agents/{agentId}", agentConfig.getPath()); + assertTrue(agentConfig.getConfigurator() instanceof RealmAuthorizerConfigurator); + assertTrue(clientConfig.getConfigurator() instanceof RealmAuthorizerConfigurator); } }