changeset 1035:e7725b744e57

Make DbService.connect/disconnect synchronous. Reviewed-by: vanaltj Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2013-March/006005.html
author Severin Gehwolf <sgehwolf@redhat.com>
date Thu, 07 Mar 2013 15:37:35 +0100
parents b7d5c70e0251
children 361b7725dfce
files storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java
diffstat 6 files changed, 464 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/storage/core/src/main/java/com/redhat/thermostat/storage/internal/DbServiceImpl.java	Thu Mar 07 15:37:35 2013 +0100
@@ -36,15 +36,21 @@
 
 package com.redhat.thermostat.storage.internal;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.InvalidSyntaxException;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 
+import com.redhat.thermostat.common.utils.LoggingUtils;
 import com.redhat.thermostat.storage.config.ConnectionConfiguration;
 import com.redhat.thermostat.storage.config.StartupConfiguration;
 import com.redhat.thermostat.storage.core.Connection.ConnectionListener;
+import com.redhat.thermostat.storage.core.Connection.ConnectionStatus;
 import com.redhat.thermostat.storage.core.ConnectionException;
 import com.redhat.thermostat.storage.core.DbService;
 import com.redhat.thermostat.storage.core.Storage;
@@ -53,6 +59,7 @@
 
 public class DbServiceImpl implements DbService {
     
+    private static Logger logger = LoggingUtils.getLogger(DbServiceImpl.class);
     @SuppressWarnings("rawtypes")
     private ServiceRegistration dbServiceReg;
     @SuppressWarnings("rawtypes")
@@ -72,6 +79,12 @@
         init(context, username, password, dbUrl);
     }
     
+    // For testing. Injects custom storage.
+    DbServiceImpl(BundleContext context, Storage storage) {
+        this.context = context;
+        this.storage = storage;
+    }
+    
     private void init(BundleContext context, String username, String password, String dbUrl) {
         Storage storage = createStorage(context, username, password, dbUrl);
 
@@ -85,26 +98,71 @@
         // as service
         ensureConnectPreCondition();
         try {
-            this.storage.getConnection().connect();
-            dbServiceReg = context.registerService(DbService.class, this, null);
-            storageReg = context.registerService(Storage.class.getName(), this.storage, null);
+            // connection needs to be synchronous, otherwise there is no
+            // way to guarantee the postcondition if there's a delayed exception
+            // during connection handling.
+            doSynchronousConnect();
         } catch (Exception cause) {
             throw new ConnectionException(cause);
         }
+        // Connection didn't throw an exception. Now it is safe to register
+        // services.
+        dbServiceReg = context.registerService(DbService.class, this, null);
+        storageReg = context.registerService(Storage.class.getName(), this.storage, null);
     }
     
+    private void doSynchronousConnect() throws ConnectionException {
+        CountDownLatch latch = new CountDownLatch(1);
+        SynchronousConnectionListener listener = new SynchronousConnectionListener(
+                latch, ConnectionStatus.CONNECTED);
+        // Install listener in order to ensure connection is synchronous.
+        addConnectionListener(listener);
+        this.storage.getConnection().connect();
+        try {
+            // Wait for connection to finish.
+            // The synchronous connection listener gets removed once connection
+            // has finished.
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.log(Level.WARNING, e.getMessage(), e);
+        }
+        if (!listener.successful) {
+            throw new ConnectionException();
+        }
+    }
+
     public void disconnect() throws ConnectionException {
         // DbService and Storage must be registered as service at this point
         ensureDisconnectPrecondition();
         try {
-            storage.getConnection().disconnect();
-            storageReg.unregister();
-            dbServiceReg.unregister();
+            doSyncronousDisconnect();
         } catch (Exception cause) {
             throw new ConnectionException(cause);
         }
+        storageReg.unregister();
+        dbServiceReg.unregister();
     }
     
+    private void doSyncronousDisconnect() {
+        CountDownLatch latch = new CountDownLatch(1);
+        SynchronousConnectionListener listener = new SynchronousConnectionListener(
+                latch, ConnectionStatus.DISCONNECTED);
+        // Install listener in order to ensure connection is synchronous.
+        addConnectionListener(listener);
+        this.storage.getConnection().disconnect();
+        try {
+            // Wait for disconnect to finish.
+            // The synchronous connection listener gets removed once connection
+            // has finished.
+            latch.await();
+        } catch (InterruptedException e) {
+            logger.log(Level.WARNING, e.getMessage(), e);
+        }
+        if (!listener.successful) {
+            throw new ConnectionException();
+        }
+    }
+
     @Override
     public String getConnectionUrl() {
         return dbUrl;
@@ -192,4 +250,41 @@
         storage.getConnection().removeListener(listener);
     }
     
+    class SynchronousConnectionListener implements ConnectionListener {
+
+        CountDownLatch latch;
+        boolean successful = false;
+        ConnectionStatus expectedType;
+        
+        public SynchronousConnectionListener(CountDownLatch latch, ConnectionStatus expectedType) {
+            this.latch = latch;
+            this.expectedType = expectedType;
+        }
+        
+        @Override
+        public void changed(ConnectionStatus newStatus) {
+            switch (newStatus) {
+            case CONNECTED: {
+                successful = (expectedType == ConnectionStatus.CONNECTED);
+                latch.countDown();
+                removeConnectionListener(this);
+                break;
+            }
+            case FAILED_TO_CONNECT: {
+                latch.countDown();
+                removeConnectionListener(this);
+                break;
+            }
+            case DISCONNECTED: {
+                successful = (expectedType == ConnectionStatus.DISCONNECTED);
+                latch.countDown();
+                removeConnectionListener(this);
+            }
+            default: {
+                // nothing
+            }
+            }
+        }
+        
+    }
 }
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/internal/DbServiceImplTest.java	Thu Mar 07 15:37:35 2013 +0100
@@ -37,6 +37,7 @@
 package com.redhat.thermostat.storage.internal;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -52,6 +53,8 @@
 
 import com.redhat.thermostat.storage.core.Connection;
 import com.redhat.thermostat.storage.core.Connection.ConnectionListener;
+import com.redhat.thermostat.storage.core.Connection.ConnectionStatus;
+import com.redhat.thermostat.storage.core.ConnectionException;
 import com.redhat.thermostat.storage.core.DbService;
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.StorageException;
@@ -60,7 +63,9 @@
 
 public class DbServiceImplTest {
     
-    private Connection connection;
+    // Stub connection which is always successful and
+    // firesChanged immediately.
+    private ImmediateConnection connection = new ImmediateConnection();
     private StorageProvider storageProvider;
     private Storage storage;
     private StubBundleContext context;
@@ -68,7 +73,6 @@
     @Before
     public void setup() {
         context = new StubBundleContext();
-        connection = mock(Connection.class);
 
         storage = mock(Storage.class);
         when(storage.getConnection()).thenReturn(connection);
@@ -105,20 +109,84 @@
 
     @Test
     public void testConnect() {
-        DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
-
-        dbService.connect();
-
-        verify(connection).connect();
+        DbService dbService = new DbServiceImpl(context, storage);
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
+    }
+    
+    @Test
+    public void connectEnsuresPostConditionOnDelayedException() {
+        Storage mockStorage = mock(Storage.class);
+        DelayedConnection connection = new DelayedConnection();
+        when(mockStorage.getConnection()).thenReturn(connection);
+        DbService dbService = new DbServiceImpl(context, mockStorage);
+        
+        try {
+            dbService.connect();
+            fail("Should have thrown ConnectionException!");
+        } catch (ConnectionException e) {
+            // pass
+        }
+        assertTrue(connection.connectCalled);
+        assertTrue(connection.delayAwaited);
+        assertFalse(context.isServiceRegistered(Storage.class.getName(), mockStorage.getClass()));
+        assertFalse(context.isServiceRegistered(DbService.class.getName(), DbServiceImpl.class));
     }
     
     @Test
+    public void disconnectAwaitsConnectionDisconnect() {
+        Storage mockStorage = mock(Storage.class);
+        DelayedConnection connection = new DelayedConnection() {
+            @Override
+            public void connect() {
+                fireChanged(ConnectionStatus.CONNECTED);
+                connectCalled = true;
+            }
+        };
+        when(mockStorage.getConnection()).thenReturn(connection);
+        DbService dbService = new DbServiceImpl(context, mockStorage);
+        
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
+        connection.connectCalled = false;
+        assertFalse(connection.delayAwaited);
+        assertFalse(connection.disconnectCalled);
+        assertNotNull(context.getServiceReference(DbService.class));
+        assertNotNull(context.getServiceReference(Storage.class));
+        
+        try {
+            dbService.disconnect();
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.disconnectCalled);
+        assertFalse(connection.connectCalled);
+        assertTrue(connection.delayAwaited);
+        assertNull(context.getServiceReference(DbService.class));
+        assertNull(context.getServiceReference(Storage.class));
+    }
+
+    @Test
     public void testConnectRegistersDbService() {
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
-        dbService.connect();
-
-        verify(connection).connect();
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
         @SuppressWarnings("rawtypes")
         ServiceReference dbServiceRef = context.getServiceReference(DbService.class);
         // connect registers DbService
@@ -131,9 +199,13 @@
     public void testConnectRegistersStorage() {
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
-        dbService.connect();
-
-        verify(connection).connect();
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
         @SuppressWarnings("rawtypes")
         ServiceReference storageRef = context.getServiceReference(Storage.class);
         // connect registers DbService
@@ -194,24 +266,36 @@
     public void testDisconnect() {
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
-        dbService.connect();
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
         assertNotNull(context.getServiceReference(DbService.class));
         
         dbService.disconnect();
 
-        verify(connection).disconnect();
+        assertTrue(connection.disconnectCalled);
     }
 
     @Test
     public void testDisconnectUnregistersDbService() {
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
-        dbService.connect();
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
         assertNotNull(context.getServiceReference(DbService.class));
         
         dbService.disconnect();
 
-        verify(connection).disconnect();
+        assertTrue(connection.disconnectCalled);
         // disconnect unregisters DbService
         assertNull(context.getServiceReference(DbService.class));
     }
@@ -220,12 +304,19 @@
     public void testDisconnectUnregistersStorage() {
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
-        dbService.connect();
+        try {
+            dbService.connect();
+            // pass
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
         assertNotNull(context.getServiceReference(Storage.class));
         
         dbService.disconnect();
 
-        verify(connection).disconnect();
+        assertTrue(connection.disconnectCalled);
+        
         // disconnect unregisters Storage
         assertNull(context.getServiceReference(Storage.class));
     }
@@ -241,6 +332,8 @@
     @Test
     public void testAddListener() {
         ConnectionListener listener = mock(ConnectionListener.class);
+        Connection connection = mock(Connection.class);
+        when(storage.getConnection()).thenReturn(connection);
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
 
         dbService.addConnectionListener(listener);
@@ -248,12 +341,132 @@
     }
     
     @Test
+    public void testListenerGetsEvent() {
+        ConnectingConnectionListener listener = new ConnectingConnectionListener();
+        DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
+
+        ConnectingConnection connection = new ConnectingConnection();
+        when(storage.getConnection()).thenReturn(connection);
+        dbService.addConnectionListener(listener);
+        assertFalse(listener.eventReceived);
+        try {
+            dbService.connect();
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertTrue(connection.connectCalled);
+        assertTrue(listener.eventReceived);
+        listener.eventReceived = false;
+        dbService.removeConnectionListener(listener);
+        try {
+            dbService.disconnect();
+            dbService.connect();
+        } catch (ConnectionException e) {
+            fail();
+        }
+        assertFalse(listener.eventReceived);
+    }
+    
+    @Test
     public void testRemoveListener() {
         // Remove called regardless of listener actually being added
         ConnectionListener listener = mock(ConnectionListener.class);
+        Connection connection = mock(Connection.class);
+        when(storage.getConnection()).thenReturn(connection);
         DbService dbService = new DbServiceImpl(context, "ignore", "ignore", "http://ignored.example.com");
         dbService.removeConnectionListener(listener);
         verify(connection).removeListener(listener);
     }
+    
+    static class DelayedConnection extends Connection {
+
+        boolean connectCalled = false;
+        boolean disconnectCalled = false;
+        private Thread thread;
+        boolean delayAwaited = false;
+        
+        @Override
+        public void connect() {
+            // delay connection and then fail
+            Runnable runnable = new Runnable() {
+                
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(100);
+                        // This makes the connection fail (although delayed).
+                        delayAwaited = true;
+                        fireChanged(ConnectionStatus.FAILED_TO_CONNECT);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            };
+            thread = new Thread(runnable);
+            thread.start();
+            connectCalled = true;
+        }
+
+        @Override
+        public void disconnect() {
+            Runnable runnable = new Runnable() {
+                
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(100);
+                        delayAwaited = true;
+                        fireChanged(ConnectionStatus.DISCONNECTED);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            };
+            thread = new Thread(runnable);
+            thread.start();
+            disconnectCalled = true;
+        }
+
+    }
+    
+    static class ImmediateConnection extends Connection {
+
+        boolean connectCalled = false;
+        boolean disconnectCalled = false;
+
+        @Override
+        public void connect() {
+            connectCalled = true;
+            fireChanged(ConnectionStatus.CONNECTED);
+        }
+
+        @Override
+        public void disconnect() {
+            disconnectCalled = true;
+            fireChanged(ConnectionStatus.DISCONNECTED);
+        }
+
+    }
+    
+    static class ConnectingConnectionListener implements ConnectionListener {
+        
+        boolean eventReceived = false;
+
+        @Override
+        public void changed(ConnectionStatus newStatus) {
+            if (newStatus == ConnectionStatus.CONNECTING) {
+                eventReceived = true;
+            }
+        }
+        
+    }
+    
+    static class ConnectingConnection extends ImmediateConnection {
+        @Override
+        public void connect() {
+            fireChanged(ConnectionStatus.CONNECTING);
+            super.connect();
+        }
+    }
 }
 
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnection.java	Thu Mar 07 15:37:35 2013 +0100
@@ -112,6 +112,7 @@
         if (m != null) {
             m.close();
         }
+        fireChanged(ConnectionStatus.DISCONNECTED);
     }
 
     public DB getDB() {
--- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoConnectionTest.java	Thu Mar 07 15:37:35 2013 +0100
@@ -116,6 +116,24 @@
 
         verify(listener).changed(ConnectionStatus.CONNECTED);
     }
+    
+    @PrepareForTest({ MongoConnection.class })
+    @Test
+    public void testDisconnect() throws Exception {
+        DBCollection collection = mock(DBCollection.class);
+        DB db = mock(DB.class);
+        when(db.getCollection("agent-config")).thenReturn(collection);
+        Mongo m = mock(Mongo.class);
+        when(m.getDB(MongoConnection.THERMOSTAT_DB_NAME)).thenReturn(db);
+        PowerMockito.whenNew(Mongo.class).withParameterTypes(MongoURI.class).withArguments(any(MongoURI.class)).thenReturn(m);
+        conn.connect();
+
+        verify(listener).changed(ConnectionStatus.CONNECTED);
+        
+        conn.disconnect();
+        verify(m).close();
+        verify(listener).changed(ConnectionStatus.DISCONNECTED);
+    }
 
     @PrepareForTest({ MongoConnection.class })
     @Test
@@ -222,5 +240,6 @@
             // pass
         }
     }
+    
 }
 
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Thu Mar 07 15:37:35 2013 +0100
@@ -331,18 +331,27 @@
     private String password;
     private SecureRandom random;
     private WebConnection conn;
+    
+    // for testing
+    WebStorage(StartupConfiguration config, DefaultHttpClient client, ClientConnectionManager connManager) {
+        init(config, client, connManager);
+    }
 
     public WebStorage(StartupConfiguration config) throws StorageException {
+        ClientConnectionManager connManager = new ThreadSafeClientConnManager();
+        DefaultHttpClient client = new DefaultHttpClient(connManager);
+        client.getParams().setParameter("http.protocol.expect-continue", Boolean.TRUE);
+        init(config, client, connManager);
+    }
+    
+    private void init(StartupConfiguration config, DefaultHttpClient client, ClientConnectionManager connManager) {
         categoryIds = new HashMap<>();
         gson = new GsonBuilder().registerTypeHierarchyAdapter(Pojo.class,
                 new ThermostatGSONConverter()).create();
-        ClientConnectionManager connManager = new ThreadSafeClientConnManager();
-        DefaultHttpClient client = new DefaultHttpClient(connManager);
-        client.getParams().setParameter("http.protocol.expect-continue", Boolean.TRUE);
         httpClient = client;
         random = new SecureRandom();
         conn = new WebConnection();
-
+        
         setEndpoint(config.getDBConnectionString());
         if (config instanceof AuthenticationConfiguration) {
             AuthenticationConfiguration authConfig = (AuthenticationConfiguration) config;
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Mon Mar 18 15:58:38 2013 +0100
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Thu Mar 07 15:37:35 2013 +0100
@@ -43,6 +43,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -59,6 +60,7 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -66,8 +68,10 @@
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ClientConnectionManager;
 import org.apache.http.conn.scheme.Scheme;
 import org.apache.http.conn.scheme.SchemeRegistry;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -76,6 +80,7 @@
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
@@ -85,6 +90,8 @@
 import com.redhat.thermostat.storage.core.AuthToken;
 import com.redhat.thermostat.storage.core.Categories;
 import com.redhat.thermostat.storage.core.Category;
+import com.redhat.thermostat.storage.core.Connection.ConnectionListener;
+import com.redhat.thermostat.storage.core.Connection.ConnectionStatus;
 import com.redhat.thermostat.storage.core.Cursor;
 import com.redhat.thermostat.storage.core.Key;
 import com.redhat.thermostat.storage.core.Put;
@@ -562,5 +569,96 @@
         assertFalse(ok);
         
     }
+    
+    @Test
+    public void verifyConnectFiresEventOnConnectionFailure() {
+        DefaultHttpClient client = mock(DefaultHttpClient.class);
+        ClientConnectionManager connManager = mock(ClientConnectionManager.class);
+        // this should make connect fail
+        Mockito.doThrow(RuntimeException.class).when(client).getCredentialsProvider();
+        StartupConfiguration config = new StartupConfiguration() {
+            
+            @Override
+            public String getDBConnectionString() {
+                return "http://fluff.example.org";
+            }
+        };
+        storage = new WebStorage(config, client, connManager);
+        storage.setEndpoint("http://localhost:" + port + "/");
+        storage.setAgentId(new UUID(123, 456));
+        
+        CountDownLatch latch = new CountDownLatch(1);
+        MyListener listener = new MyListener(latch);
+        storage.getConnection().addListener(listener);
+        storage.getConnection().connect();
+        // wait for connection to fail
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertFalse(listener.connectEvent);
+        assertTrue(listener.failedToConnectEvent);
+    }
+    
+    @Test
+    public void verifyConnectFiresEventOnSuccessfulConnect() {
+        CountDownLatch latch = new CountDownLatch(1);
+        MyListener listener = new MyListener(latch);
+        storage.getConnection().addListener(listener);
+        storage.getConnection().connect();
+        // wait for connection to happen
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertTrue(listener.connectEvent);
+        assertFalse(listener.failedToConnectEvent);
+    }
+    
+    @Test
+    public void verifyDisconnectFiresDisconnectEvent() {
+        CountDownLatch latch = new CountDownLatch(1);
+        MyListener listener = new MyListener(latch);
+        storage.getConnection().addListener(listener);
+        storage.getConnection().disconnect();
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertFalse(listener.connectEvent);
+        assertFalse(listener.failedToConnectEvent);
+        assertTrue(listener.disconnectEvent);
+    }
+    
+    static class MyListener implements ConnectionListener {
+
+        private CountDownLatch latch;
+        boolean failedToConnectEvent = false;
+        boolean connectEvent = false;
+        boolean disconnectEvent = false;
+        
+        MyListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+        
+        @Override
+        public void changed(ConnectionStatus newStatus) {
+            if (newStatus == ConnectionStatus.CONNECTED) {
+                connectEvent = true;
+                latch.countDown();
+            }
+            if (newStatus == ConnectionStatus.FAILED_TO_CONNECT) {
+                failedToConnectEvent = true;
+                latch.countDown();
+            }
+            if (newStatus == ConnectionStatus.DISCONNECTED) {
+                disconnectEvent = true;
+                latch.countDown();
+            }
+        }
+    }
 }