# HG changeset patch # User Severin Gehwolf # Date 1362667055 -3600 # Node ID e7725b744e57a133c230cc8309c5c0fe1eb618ff # Parent b7d5c70e0251d969ba5a343f702ef0d6278ef320 Make DbService.connect/disconnect synchronous. Reviewed-by: vanaltj Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2013-March/006005.html diff -r b7d5c70e0251 -r e7725b744e57 storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java --- a/storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java Mon Mar 18 15:58:38 2013 +0100 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java Thu Mar 07 15:37:35 2013 +0100 @@ -36,15 +36,21 @@ package com.redhat.thermostat.storage.internal; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; import org.osgi.framework.ServiceRegistration; +import com.redhat.thermostat.common.utils.LoggingUtils; import com.redhat.thermostat.storage.config.ConnectionConfiguration; import com.redhat.thermostat.storage.config.StartupConfiguration; import com.redhat.thermostat.storage.core.Connection.ConnectionListener; +import com.redhat.thermostat.storage.core.Connection.ConnectionStatus; import com.redhat.thermostat.storage.core.ConnectionException; import com.redhat.thermostat.storage.core.DbService; import com.redhat.thermostat.storage.core.Storage; @@ -53,6 +59,7 @@ public class DbServiceImpl implements DbService { + private static Logger logger = LoggingUtils.getLogger(DbServiceImpl.class); @SuppressWarnings("rawtypes") private ServiceRegistration dbServiceReg; @SuppressWarnings("rawtypes") @@ -72,6 +79,12 @@ init(context, username, password, dbUrl); } + // For testing. Injects custom storage. + DbServiceImpl(BundleContext context, Storage storage) { + this.context = context; + this.storage = storage; + } + private void init(BundleContext context, String username, String password, String dbUrl) { Storage storage = createStorage(context, username, password, dbUrl); @@ -85,26 +98,71 @@ // as service ensureConnectPreCondition(); try { - this.storage.getConnection().connect(); - dbServiceReg = context.registerService(DbService.class, this, null); - storageReg = context.registerService(Storage.class.getName(), this.storage, null); + // connection needs to be synchronous, otherwise there is no + // way to guarantee the postcondition if there's a delayed exception + // during connection handling. + doSynchronousConnect(); } catch (Exception cause) { throw new ConnectionException(cause); } + // Connection didn't throw an exception. Now it is safe to register + // services. + dbServiceReg = context.registerService(DbService.class, this, null); + storageReg = context.registerService(Storage.class.getName(), this.storage, null); } + private void doSynchronousConnect() throws ConnectionException { + CountDownLatch latch = new CountDownLatch(1); + SynchronousConnectionListener listener = new SynchronousConnectionListener( + latch, ConnectionStatus.CONNECTED); + // Install listener in order to ensure connection is synchronous. + addConnectionListener(listener); + this.storage.getConnection().connect(); + try { + // Wait for connection to finish. + // The synchronous connection listener gets removed once connection + // has finished. + latch.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + if (!listener.successful) { + throw new ConnectionException(); + } + } + public void disconnect() throws ConnectionException { // DbService and Storage must be registered as service at this point ensureDisconnectPrecondition(); try { - storage.getConnection().disconnect(); - storageReg.unregister(); - dbServiceReg.unregister(); + doSyncronousDisconnect(); } catch (Exception cause) { throw new ConnectionException(cause); } + storageReg.unregister(); + dbServiceReg.unregister(); } + private void doSyncronousDisconnect() { + CountDownLatch latch = new CountDownLatch(1); + SynchronousConnectionListener listener = new SynchronousConnectionListener( + latch, ConnectionStatus.DISCONNECTED); + // Install listener in order to ensure connection is synchronous. + addConnectionListener(listener); + this.storage.getConnection().disconnect(); + try { + // Wait for disconnect to finish. + // The synchronous connection listener gets removed once connection + // has finished. + latch.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + if (!listener.successful) { + throw new ConnectionException(); + } + } + @Override public String getConnectionUrl() { return dbUrl; @@ -192,4 +250,41 @@ storage.getConnection().removeListener(listener); } + class SynchronousConnectionListener implements ConnectionListener { + + CountDownLatch latch; + boolean successful = false; + ConnectionStatus expectedType; + + public SynchronousConnectionListener(CountDownLatch latch, ConnectionStatus expectedType) { + this.latch = latch; + this.expectedType = expectedType; + } + + @Override + public void changed(ConnectionStatus newStatus) { + switch (newStatus) { + case CONNECTED: { + successful = (expectedType == ConnectionStatus.CONNECTED); + latch.countDown(); + removeConnectionListener(this); + break; + } + case FAILED_TO_CONNECT: { + latch.countDown(); + removeConnectionListener(this); + break; + } + case DISCONNECTED: { + successful = (expectedType == ConnectionStatus.DISCONNECTED); + latch.countDown(); + removeConnectionListener(this); + } + default: { + // nothing + } + } + } + + } } diff -r b7d5c70e0251 -r e7725b744e57 storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java --- a/storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java Mon Mar 18 15:58:38 2013 +0100 +++ b/storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java Thu Mar 07 15:37:35 2013 +0100 @@ -37,6 +37,7 @@ package com.redhat.thermostat.storage.internal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -52,6 +53,8 @@ import com.redhat.thermostat.storage.core.Connection; import com.redhat.thermostat.storage.core.Connection.ConnectionListener; +import com.redhat.thermostat.storage.core.Connection.ConnectionStatus; +import com.redhat.thermostat.storage.core.ConnectionException; import com.redhat.thermostat.storage.core.DbService; import com.redhat.thermostat.storage.core.Storage; import com.redhat.thermostat.storage.core.StorageException; @@ -60,7 +63,9 @@ public class DbServiceImplTest { - private Connection connection; + // Stub connection which is always successful and + // firesChanged immediately. + private ImmediateConnection connection = new ImmediateConnection(); private StorageProvider storageProvider; private Storage storage; private StubBundleContext context; @@ -68,7 +73,6 @@ @Before public void setup() { context = new StubBundleContext(); - connection = mock(Connection.class); storage = mock(Storage.class); when(storage.getConnection()).thenReturn(connection); @@ -105,20 +109,84 @@ @Test public void testConnect() { - DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - - dbService.connect(); - - verify(connection).connect(); + DbService dbService = new DbServiceImpl(context, storage); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); + } + + @Test + public void connectEnsuresPostConditionOnDelayedException() { + Storage mockStorage = mock(Storage.class); + DelayedConnection connection = new DelayedConnection(); + when(mockStorage.getConnection()).thenReturn(connection); + DbService dbService = new DbServiceImpl(context, mockStorage); + + try { + dbService.connect(); + fail("Should have thrown ConnectionException!"); + } catch (ConnectionException e) { + // pass + } + assertTrue(connection.connectCalled); + assertTrue(connection.delayAwaited); + assertFalse(context.isServiceRegistered(Storage.class.getName(), mockStorage.getClass())); + assertFalse(context.isServiceRegistered(DbService.class.getName(), DbServiceImpl.class)); } @Test + public void disconnectAwaitsConnectionDisconnect() { + Storage mockStorage = mock(Storage.class); + DelayedConnection connection = new DelayedConnection() { + @Override + public void connect() { + fireChanged(ConnectionStatus.CONNECTED); + connectCalled = true; + } + }; + when(mockStorage.getConnection()).thenReturn(connection); + DbService dbService = new DbServiceImpl(context, mockStorage); + + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); + connection.connectCalled = false; + assertFalse(connection.delayAwaited); + assertFalse(connection.disconnectCalled); + assertNotNull(context.getServiceReference(DbService.class)); + assertNotNull(context.getServiceReference(Storage.class)); + + try { + dbService.disconnect(); + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.disconnectCalled); + assertFalse(connection.connectCalled); + assertTrue(connection.delayAwaited); + assertNull(context.getServiceReference(DbService.class)); + assertNull(context.getServiceReference(Storage.class)); + } + + @Test public void testConnectRegistersDbService() { DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - dbService.connect(); - - verify(connection).connect(); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); @SuppressWarnings("rawtypes") ServiceReference dbServiceRef = context.getServiceReference(DbService.class); // connect registers DbService @@ -131,9 +199,13 @@ public void testConnectRegistersStorage() { DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - dbService.connect(); - - verify(connection).connect(); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); @SuppressWarnings("rawtypes") ServiceReference storageRef = context.getServiceReference(Storage.class); // connect registers DbService @@ -194,24 +266,36 @@ public void testDisconnect() { DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - dbService.connect(); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); assertNotNull(context.getServiceReference(DbService.class)); dbService.disconnect(); - verify(connection).disconnect(); + assertTrue(connection.disconnectCalled); } @Test public void testDisconnectUnregistersDbService() { DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - dbService.connect(); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); assertNotNull(context.getServiceReference(DbService.class)); dbService.disconnect(); - verify(connection).disconnect(); + assertTrue(connection.disconnectCalled); // disconnect unregisters DbService assertNull(context.getServiceReference(DbService.class)); } @@ -220,12 +304,19 @@ public void testDisconnectUnregistersStorage() { DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); - dbService.connect(); + try { + dbService.connect(); + // pass + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); assertNotNull(context.getServiceReference(Storage.class)); dbService.disconnect(); - verify(connection).disconnect(); + assertTrue(connection.disconnectCalled); + // disconnect unregisters Storage assertNull(context.getServiceReference(Storage.class)); } @@ -241,6 +332,8 @@ @Test public void testAddListener() { ConnectionListener listener = mock(ConnectionListener.class); + Connection connection = mock(Connection.class); + when(storage.getConnection()).thenReturn(connection); DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); dbService.addConnectionListener(listener); @@ -248,12 +341,132 @@ } @Test + public void testListenerGetsEvent() { + ConnectingConnectionListener listener = new ConnectingConnectionListener(); + DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); + + ConnectingConnection connection = new ConnectingConnection(); + when(storage.getConnection()).thenReturn(connection); + dbService.addConnectionListener(listener); + assertFalse(listener.eventReceived); + try { + dbService.connect(); + } catch (ConnectionException e) { + fail(); + } + assertTrue(connection.connectCalled); + assertTrue(listener.eventReceived); + listener.eventReceived = false; + dbService.removeConnectionListener(listener); + try { + dbService.disconnect(); + dbService.connect(); + } catch (ConnectionException e) { + fail(); + } + assertFalse(listener.eventReceived); + } + + @Test public void testRemoveListener() { // Remove called regardless of listener actually being added ConnectionListener listener = mock(ConnectionListener.class); + Connection connection = mock(Connection.class); + when(storage.getConnection()).thenReturn(connection); DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com"); dbService.removeConnectionListener(listener); verify(connection).removeListener(listener); } + + static class DelayedConnection extends Connection { + + boolean connectCalled = false; + boolean disconnectCalled = false; + private Thread thread; + boolean delayAwaited = false; + + @Override + public void connect() { + // delay connection and then fail + Runnable runnable = new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(100); + // This makes the connection fail (although delayed). + delayAwaited = true; + fireChanged(ConnectionStatus.FAILED_TO_CONNECT); + } catch (InterruptedException e) { + // ignore + } + } + }; + thread = new Thread(runnable); + thread.start(); + connectCalled = true; + } + + @Override + public void disconnect() { + Runnable runnable = new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(100); + delayAwaited = true; + fireChanged(ConnectionStatus.DISCONNECTED); + } catch (InterruptedException e) { + // ignore + } + } + }; + thread = new Thread(runnable); + thread.start(); + disconnectCalled = true; + } + + } + + static class ImmediateConnection extends Connection { + + boolean connectCalled = false; + boolean disconnectCalled = false; + + @Override + public void connect() { + connectCalled = true; + fireChanged(ConnectionStatus.CONNECTED); + } + + @Override + public void disconnect() { + disconnectCalled = true; + fireChanged(ConnectionStatus.DISCONNECTED); + } + + } + + static class ConnectingConnectionListener implements ConnectionListener { + + boolean eventReceived = false; + + @Override + public void changed(ConnectionStatus newStatus) { + if (newStatus == ConnectionStatus.CONNECTING) { + eventReceived = true; + } + } + + } + + static class ConnectingConnection extends ImmediateConnection { + @Override + public void connect() { + fireChanged(ConnectionStatus.CONNECTING); + super.connect(); + } + } } diff -r b7d5c70e0251 -r e7725b744e57 storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java --- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java Mon Mar 18 15:58:38 2013 +0100 +++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java Thu Mar 07 15:37:35 2013 +0100 @@ -112,6 +112,7 @@ if (m != null) { m.close(); } + fireChanged(ConnectionStatus.DISCONNECTED); } public DB getDB() { diff -r b7d5c70e0251 -r e7725b744e57 storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java --- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java Mon Mar 18 15:58:38 2013 +0100 +++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java Thu Mar 07 15:37:35 2013 +0100 @@ -116,6 +116,24 @@ verify(listener).changed(ConnectionStatus.CONNECTED); } + + @PrepareForTest({ MongoConnection.class }) + @Test + public void testDisconnect() throws Exception { + DBCollection collection = mock(DBCollection.class); + DB db = mock(DB.class); + when(db.getCollection("agent-config")).thenReturn(collection); + Mongo m = mock(Mongo.class); + when(m.getDB(MongoConnection.THERMOSTAT_DB_NAME)).thenReturn(db); + PowerMockito.whenNew(Mongo.class).withParameterTypes(MongoURI.class).withArguments(any(MongoURI.class)).thenReturn(m); + conn.connect(); + + verify(listener).changed(ConnectionStatus.CONNECTED); + + conn.disconnect(); + verify(m).close(); + verify(listener).changed(ConnectionStatus.DISCONNECTED); + } @PrepareForTest({ MongoConnection.class }) @Test @@ -222,5 +240,6 @@ // pass } } + } diff -r b7d5c70e0251 -r e7725b744e57 web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java --- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Mon Mar 18 15:58:38 2013 +0100 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Thu Mar 07 15:37:35 2013 +0100 @@ -331,18 +331,27 @@ private String password; private SecureRandom random; private WebConnection conn; + + // for testing + WebStorage(StartupConfiguration config, DefaultHttpClient client, ClientConnectionManager connManager) { + init(config, client, connManager); + } public WebStorage(StartupConfiguration config) throws StorageException { + ClientConnectionManager connManager = new ThreadSafeClientConnManager(); + DefaultHttpClient client = new DefaultHttpClient(connManager); + client.getParams().setParameter("http.protocol.expect-continue", Boolean.TRUE); + init(config, client, connManager); + } + + private void init(StartupConfiguration config, DefaultHttpClient client, ClientConnectionManager connManager) { categoryIds = new HashMap<>(); gson = new GsonBuilder().registerTypeHierarchyAdapter(Pojo.class, new ThermostatGSONConverter()).create(); - ClientConnectionManager connManager = new ThreadSafeClientConnManager(); - DefaultHttpClient client = new DefaultHttpClient(connManager); - client.getParams().setParameter("http.protocol.expect-continue", Boolean.TRUE); httpClient = client; random = new SecureRandom(); conn = new WebConnection(); - + setEndpoint(config.getDBConnectionString()); if (config instanceof AuthenticationConfiguration) { AuthenticationConfiguration authConfig = (AuthenticationConfiguration) config; diff -r b7d5c70e0251 -r e7725b744e57 web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java --- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Mon Mar 18 15:58:38 2013 +0100 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Thu Mar 07 15:37:35 2013 +0100 @@ -43,6 +43,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -59,6 +60,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -66,8 +68,10 @@ import org.apache.commons.codec.binary.Base64; import org.apache.http.client.HttpClient; +import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.scheme.Scheme; import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.impl.client.DefaultHttpClient; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -76,6 +80,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import com.google.gson.Gson; import com.google.gson.JsonArray; @@ -85,6 +90,8 @@ import com.redhat.thermostat.storage.core.AuthToken; import com.redhat.thermostat.storage.core.Categories; import com.redhat.thermostat.storage.core.Category; +import com.redhat.thermostat.storage.core.Connection.ConnectionListener; +import com.redhat.thermostat.storage.core.Connection.ConnectionStatus; import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.Key; import com.redhat.thermostat.storage.core.Put; @@ -562,5 +569,96 @@ assertFalse(ok); } + + @Test + public void verifyConnectFiresEventOnConnectionFailure() { + DefaultHttpClient client = mock(DefaultHttpClient.class); + ClientConnectionManager connManager = mock(ClientConnectionManager.class); + // this should make connect fail + Mockito.doThrow(RuntimeException.class).when(client).getCredentialsProvider(); + StartupConfiguration config = new StartupConfiguration() { + + @Override + public String getDBConnectionString() { + return "http://fluff.example.org"; + } + }; + storage = new WebStorage(config, client, connManager); + storage.setEndpoint("http://localhost:" + port + "/"); + storage.setAgentId(new UUID(123, 456)); + + CountDownLatch latch = new CountDownLatch(1); + MyListener listener = new MyListener(latch); + storage.getConnection().addListener(listener); + storage.getConnection().connect(); + // wait for connection to fail + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertFalse(listener.connectEvent); + assertTrue(listener.failedToConnectEvent); + } + + @Test + public void verifyConnectFiresEventOnSuccessfulConnect() { + CountDownLatch latch = new CountDownLatch(1); + MyListener listener = new MyListener(latch); + storage.getConnection().addListener(listener); + storage.getConnection().connect(); + // wait for connection to happen + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertTrue(listener.connectEvent); + assertFalse(listener.failedToConnectEvent); + } + + @Test + public void verifyDisconnectFiresDisconnectEvent() { + CountDownLatch latch = new CountDownLatch(1); + MyListener listener = new MyListener(latch); + storage.getConnection().addListener(listener); + storage.getConnection().disconnect(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertFalse(listener.connectEvent); + assertFalse(listener.failedToConnectEvent); + assertTrue(listener.disconnectEvent); + } + + static class MyListener implements ConnectionListener { + + private CountDownLatch latch; + boolean failedToConnectEvent = false; + boolean connectEvent = false; + boolean disconnectEvent = false; + + MyListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void changed(ConnectionStatus newStatus) { + if (newStatus == ConnectionStatus.CONNECTED) { + connectEvent = true; + latch.countDown(); + } + if (newStatus == ConnectionStatus.FAILED_TO_CONNECT) { + failedToConnectEvent = true; + latch.countDown(); + } + if (newStatus == ConnectionStatus.DISCONNECTED) { + disconnectEvent = true; + latch.countDown(); + } + } + } }