changeset 874:d1641e56f1ad

Async/queued Storage, revisited. Reviewed-by: vanaltj, omajid Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-December/004851.html
author Roman Kennke <rkennke@redhat.com>
date Tue, 18 Dec 2012 19:25:39 +0100
parents 9132d7b911da
children 29f815f1bbed
files storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorageProvider.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java
diffstat 9 files changed, 694 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java	Tue Dec 18 19:25:39 2012 +0100
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+
+package com.redhat.thermostat.storage.core;
+
+import java.io.InputStream;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.redhat.thermostat.storage.model.AgentIdPojo;
+import com.redhat.thermostat.storage.model.Pojo;
+
+public final class QueuedStorage implements Storage {
+
+    private Storage delegate;
+    private Executor executor;
+    private Executor fileExecutor;
+
+    /*
+     * NOTE: We intentially use single-thread executor. All updates are put into a queue, from which
+     * a single dispatch thread calls the underlying storage. Using multiple dispatch threads
+     * could cause out-of-order issues, e.g. a VM death being reported before its VM start, which
+     * could confuse the heck out of clients.
+     */
+    public QueuedStorage(Storage delegate) {
+        this(delegate, Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
+    }
+
+    /*
+     * This is here solely for use by tests.
+     */
+    QueuedStorage(Storage delegate, Executor executor, Executor fileExecutor) {
+        this.delegate = delegate;
+        this.executor = executor;
+        this.fileExecutor = fileExecutor;
+    }
+
+    Executor getExecutor() {
+        return executor;
+    }
+
+    Executor getFileExecutor() {
+        return fileExecutor;
+    }
+
+    @Override
+    public void putPojo(final Category category, final boolean replace, final AgentIdPojo pojo) {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.putPojo(category, replace, pojo);
+            }
+
+        });
+
+    }
+
+    @Override
+    public void updatePojo(final Update update) {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.updatePojo(update);
+            }
+
+        });
+
+    }
+
+    @Override
+    public void removePojo(final Remove remove) {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.removePojo(remove);
+            }
+
+        });
+
+    }
+
+    @Override
+    public void purge() {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.purge();
+            }
+
+        });
+
+    }
+
+    @Override
+    public <T extends Pojo> Cursor<T> findAllPojos(Query query, Class<T> resultClass) {
+        return delegate.findAllPojos(query, resultClass);
+    }
+
+    @Override
+    public <T extends Pojo> T findPojo(Query query, Class<T> resultClass) {
+        return delegate.findPojo(query, resultClass);
+    }
+
+    @Override
+    public long getCount(Category category) {
+        return delegate.getCount(category);
+    }
+
+    @Override
+    public void saveFile(final String filename, final InputStream data) {
+
+        fileExecutor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.saveFile(filename, data);
+            }
+
+        });
+
+    }
+
+    @Override
+    public InputStream loadFile(String filename) {
+        return delegate.loadFile(filename);
+    }
+
+    @Override
+    public Query createQuery() {
+        return delegate.createQuery();
+    }
+
+    @Override
+    public Update createUpdate() {
+        return delegate.createUpdate();
+    }
+
+    @Override
+    public Remove createRemove() {
+        return delegate.createRemove();
+    }
+
+    @Override
+    public void setAgentId(final UUID id) {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.setAgentId(id);
+            }
+
+        });
+
+    }
+
+    @Override
+    public String getAgentId() {
+        return delegate.getAgentId();
+    }
+
+    @Override
+    public void registerCategory(final Category category) {
+
+        executor.execute(new Runnable() {
+            
+            @Override
+            public void run() {
+                delegate.registerCategory(category);
+            }
+
+        });
+
+    }
+
+    @Override
+    public Connection getConnection() {
+        return delegate.getConnection();
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java	Tue Dec 18 19:25:39 2012 +0100
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+
+package com.redhat.thermostat.storage.core;
+
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueuedStorageExecutorTest {
+
+    private static final int NUM_THREADS = 50;
+    private static final int TASK_DURATION_MS = 50;
+
+    private QueuedStorage queuedStorage;
+
+    private volatile int activeTasks;
+
+    private volatile boolean passed;
+    private volatile Thread workerThread;
+
+    private CountDownLatch latch;
+
+    private class TestRunnable implements Runnable {
+        public void run() {
+            if (activeTasks != 0) {
+                passed = false;
+            }
+            synchronized(QueuedStorageExecutorTest.this) {
+                if (workerThread == null) {
+                    workerThread = Thread.currentThread();
+                } else {
+                    if (workerThread != Thread.currentThread()) {
+                        passed = false;
+                    }
+                }
+            }
+            activeTasks++;
+            try {
+                Thread.sleep(TASK_DURATION_MS);
+            } catch (InterruptedException e) {
+                // Get out of here ASAP.
+            }
+            activeTasks--;
+            latch.countDown();
+        }
+    }
+
+    @Before
+    public void setUp() {
+        Storage mockStorage = mock(Storage.class);
+        queuedStorage = new QueuedStorage(mockStorage);
+        activeTasks = 0;
+        passed = true;
+        workerThread = null;
+        latch = null;
+    }
+
+    @After
+    public void tearDown() {
+        queuedStorage = null;
+        activeTasks = 0;
+        passed = true;
+        workerThread = null;
+        latch = null;
+    }
+
+    @Test
+    public void testMainExecutor() {
+        testExecutor(queuedStorage.getExecutor());
+    }
+
+    @Test
+    public void testFileExecutor() {
+        testExecutor(queuedStorage.getFileExecutor());
+    }
+
+    private void testExecutor(final Executor executor) {
+        latch = new CountDownLatch(NUM_THREADS);
+        Thread[] threads = new Thread[NUM_THREADS]; 
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread() {
+                public void run() {
+                    executor.execute(new TestRunnable());
+                }
+            };
+        }
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            // Get out as soon as possible.
+        }
+        assertTrue(passed);
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java	Tue Dec 18 19:25:39 2012 +0100
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+
+package com.redhat.thermostat.storage.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.redhat.thermostat.storage.model.AgentIdPojo;
+import com.redhat.thermostat.storage.model.Pojo;
+
+
+public class QueuedStorageTest {
+
+    private static class TestExecutor implements Executor {
+
+        private Runnable task;
+
+        @Override
+        public void execute(Runnable task) {
+            this.task = task;
+        }
+
+        Runnable getTask() {
+            return task;
+        }
+    }
+
+    private static class TestPojo implements Pojo {
+        
+    }
+
+    private QueuedStorage queuedStorage;
+    private Storage delegateStorage;
+
+    private TestExecutor executor;
+    private TestExecutor fileExecutor;
+
+    @SuppressWarnings("rawtypes")
+    private Cursor expectedResults;
+    private TestPojo expectedResult;
+    private InputStream expectedFile;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() {
+        executor = new TestExecutor();
+        fileExecutor = new TestExecutor();
+        delegateStorage = mock(Storage.class);
+        Update update = mock(Update.class);
+        Remove remove = mock(Remove.class);
+        Query query = mock(Query.class);
+        when(delegateStorage.createUpdate()).thenReturn(update);
+        when(delegateStorage.createRemove()).thenReturn(remove);
+        when(delegateStorage.createQuery()).thenReturn(query);
+        expectedResults = mock(Cursor.class);
+        when(delegateStorage.findAllPojos(query, TestPojo.class)).thenReturn(expectedResults);
+        expectedResult = new TestPojo();
+        when(delegateStorage.findPojo(query, TestPojo.class)).thenReturn(expectedResult);
+        when(delegateStorage.getCount(any(Category.class))).thenReturn(42l);
+        expectedFile = mock(InputStream.class);
+        when(delegateStorage.loadFile(anyString())).thenReturn(expectedFile);
+        when(delegateStorage.getAgentId()).thenReturn("huzzah");
+        queuedStorage = new QueuedStorage(delegateStorage, executor, fileExecutor);
+        
+    }
+
+    @After
+    public void tearDown() {
+        expectedFile = null;
+        expectedResult = null;
+        expectedResults = null;
+        queuedStorage = null;
+        delegateStorage = null;
+        fileExecutor = null;
+        executor = null;
+    }
+
+    @Test
+    public void testPutPojo() {
+        Category category = mock(Category.class);
+        AgentIdPojo pojo = mock(AgentIdPojo.class);
+
+        queuedStorage.putPojo(category, true, pojo);
+
+        Runnable r = executor.getTask();
+        assertNotNull(r);
+        verifyZeroInteractions(delegateStorage);
+        r.run();
+        verify(delegateStorage, times(1)).putPojo(category, true, pojo);
+        verifyNoMoreInteractions(delegateStorage);
+
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testUpdatePojo() {
+
+        Update update = queuedStorage.createUpdate();
+        verify(delegateStorage).createUpdate();
+        verifyNoMoreInteractions(delegateStorage);
+
+        queuedStorage.updatePojo(update);
+
+        Runnable r = executor.getTask();
+        assertNotNull(r);
+        verifyZeroInteractions(delegateStorage);
+        r.run();
+        verify(delegateStorage, times(1)).updatePojo(update);
+        verifyNoMoreInteractions(delegateStorage);
+
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testRemovePojo() {
+
+        Remove remove = queuedStorage.createRemove();
+        verify(delegateStorage).createRemove();
+        verifyNoMoreInteractions(delegateStorage);
+
+        queuedStorage.removePojo(remove);
+
+        Runnable r = executor.getTask();
+        assertNotNull(r);
+        verifyZeroInteractions(delegateStorage);
+        r.run();
+        verify(delegateStorage, times(1)).removePojo(remove);
+        verifyNoMoreInteractions(delegateStorage);
+
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testPurge() {
+
+        queuedStorage.purge();
+
+        Runnable r = executor.getTask();
+        assertNotNull(r);
+        verifyZeroInteractions(delegateStorage);
+        r.run();
+        verify(delegateStorage, times(1)).purge();
+        verifyNoMoreInteractions(delegateStorage);
+
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testFindAllPojos() {
+        Query query = queuedStorage.createQuery();
+        verify(delegateStorage).createQuery();
+        verifyNoMoreInteractions(delegateStorage);
+
+        Cursor<TestPojo> result = queuedStorage.findAllPojos(query, TestPojo.class);
+        verify(delegateStorage).findAllPojos(query, TestPojo.class);
+        assertSame(expectedResults, result);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testFindPojo() {
+        Query query = queuedStorage.createQuery();
+        verify(delegateStorage).createQuery();
+        verifyNoMoreInteractions(delegateStorage);
+
+        TestPojo result = queuedStorage.findPojo(query, TestPojo.class);
+        verify(delegateStorage).findPojo(query, TestPojo.class);
+        assertSame(expectedResult, result);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testGetCount() {
+        Category category = mock(Category.class);
+
+        long result = queuedStorage.getCount(category);
+        assertEquals(42, result);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testSaveFile() {
+        InputStream stream = mock(InputStream.class);
+
+        queuedStorage.saveFile("fluff", stream);
+
+        Runnable task = fileExecutor.getTask();
+        assertNotNull(task);
+        verifyZeroInteractions(delegateStorage);
+        task.run();
+        verify(delegateStorage).saveFile(eq("fluff"), same(stream));
+
+        assertNull(executor.getTask());
+    }
+
+    @Test
+    public void testLoadFile() {
+
+        InputStream stream = queuedStorage.loadFile("fluff");
+
+        assertSame(expectedFile, stream);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testSetAgentId() {
+        UUID id = new UUID(123, 456);
+
+        queuedStorage.setAgentId(id);
+
+        verifyZeroInteractions(delegateStorage);
+        Runnable task = executor.getTask();
+        task.run();
+        verify(delegateStorage).setAgentId(id);
+        
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testGetAgentId() {
+        String agentId = queuedStorage.getAgentId();
+
+        verify(delegateStorage).getAgentId();
+        assertEquals("huzzah", agentId);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testRegisterCategory() {
+
+        Category category = mock(Category.class);
+
+        queuedStorage.registerCategory(category);
+
+        Runnable task = executor.getTask();
+        verifyZeroInteractions(delegateStorage);
+        task.run();
+        verify(delegateStorage).registerCategory(category);
+
+        assertNull(fileExecutor.getTask());
+    }
+
+    @Test
+    public void testGetConnection() {
+        Connection connection = mock(Connection.class);
+        when(delegateStorage.getConnection()).thenReturn(connection);
+
+        Connection conn = queuedStorage.getConnection();
+
+        verify(delegateStorage).getConnection();
+
+        assertSame(conn, connection);
+
+        assertNull(executor.getTask());
+        assertNull(fileExecutor.getTask());
+    }
+}
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java	Tue Dec 18 19:25:39 2012 +0100
@@ -37,6 +37,7 @@
 package com.redhat.thermostat.storage.mongodb;
 
 import com.redhat.thermostat.storage.config.StartupConfiguration;
+import com.redhat.thermostat.storage.core.QueuedStorage;
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.StorageProvider;
 import com.redhat.thermostat.storage.mongodb.internal.MongoStorage;
@@ -51,7 +52,8 @@
 
     @Override
     public Storage createStorage() {
-        return new MongoStorage(configuration);
+        MongoStorage storage = new MongoStorage(configuration);
+        return new QueuedStorage(storage);
     }
 
     @Override
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Tue Dec 18 19:25:39 2012 +0100
@@ -42,8 +42,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
@@ -83,13 +81,7 @@
 
     private UUID agentId;
 
-    private Executor threadpool;
-
     public MongoStorage(StartupConfiguration conf) {
-        this(conf, Executors.newCachedThreadPool());
-    }
-
-    MongoStorage(StartupConfiguration conf, Executor threadpool) {
         conn = new MongoConnection(conf);
         conn.addListener(new ConnectionListener() {
             @Override
@@ -104,8 +96,6 @@
                 }
             }
         });
-
-        this.threadpool = threadpool;
     }
 
     @Override
@@ -142,16 +132,6 @@
 
     @Override
     public void putPojo(final Category cat, final boolean replace, final AgentIdPojo pojo) {
-        threadpool.execute(new Runnable() {
-            
-            @Override
-            public void run() {
-                putImpl(cat, replace, pojo);
-            }
-        });
-    }
-
-    private void putImpl(Category cat, boolean replace, AgentIdPojo pojo) {
         DBCollection coll = getCachedCollection(cat);
         MongoPojoConverter converter = new MongoPojoConverter();
         DBObject toInsert = converter.convertPojoToMongo(pojo);
--- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java	Tue Dec 18 19:25:39 2012 +0100
@@ -53,7 +53,6 @@
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import org.junit.After;
 import org.junit.Before;
@@ -81,13 +80,9 @@
 import com.redhat.thermostat.storage.core.Key;
 import com.redhat.thermostat.storage.core.Persist;
 import com.redhat.thermostat.storage.core.Query;
+import com.redhat.thermostat.storage.core.Query.Criteria;
 import com.redhat.thermostat.storage.core.Update;
-import com.redhat.thermostat.storage.core.Query.Criteria;
-import com.redhat.thermostat.storage.core.Query.SortDirection;
 import com.redhat.thermostat.storage.model.BasePojo;
-import com.redhat.thermostat.storage.mongodb.internal.MongoConnection;
-import com.redhat.thermostat.storage.mongodb.internal.MongoQuery;
-import com.redhat.thermostat.storage.mongodb.internal.MongoStorage;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ DBCollection.class, DB.class, Mongo.class, MongoStorage.class, MongoConnection.class })
@@ -157,13 +152,7 @@
     private DBCursor cursor;
 
     private MongoStorage makeStorage() {
-        Executor exec = new Executor() {
-            @Override
-            public void execute(Runnable command) {
-                command.run();
-            }
-        };
-        MongoStorage storage = new MongoStorage(conf, exec);
+        MongoStorage storage = new MongoStorage(conf);
         storage.mapCategoryToDBCollection(testCategory, testCollection);
         storage.mapCategoryToDBCollection(emptyTestCategory, emptyTestCollection);
         return storage;
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Tue Dec 18 19:25:39 2012 +0100
@@ -52,8 +52,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -284,14 +282,7 @@
     private String password;
     private SecureRandom random;
 
-    private Executor threadpool;
-
     public WebStorage(StartupConfiguration config) throws StorageException {
-        this(config, Executors.newCachedThreadPool());
-    }
-
-    // Used for testing.
-    WebStorage(StartupConfiguration config, Executor threadpool) {
         categoryIds = new HashMap<>();
         gson = new GsonBuilder().registerTypeHierarchyAdapter(Pojo.class,
                 new ThermostatGSONConverter()).create();
@@ -304,8 +295,6 @@
         if (config.getDBConnectionString().startsWith(HTTPS_PREFIX)) {
             registerSSLScheme(connManager);
         }
-
-        this.threadpool = threadpool;
     }
 
     private void registerSSLScheme(ClientConnectionManager conManager)
@@ -497,16 +486,6 @@
     public void putPojo(final Category category, final boolean replace, final AgentIdPojo pojo)
             throws StorageException {
 
-        threadpool.execute(new Runnable() {
-            
-            @Override
-            public void run() {
-                putImpl(category, replace, pojo);
-            }
-        });
-    }
-
-    private void putImpl(Category category, boolean replace, AgentIdPojo pojo) {
         // TODO: This logic should probably be moved elsewhere. I.e. out of the
         // Storage API.
         if (pojo.getAgentId() == null) {
@@ -522,6 +501,7 @@
                 gson.toJson(pojo));
         List<NameValuePair> formparams = Arrays.asList(insertParam, pojoParam);
         post(endpoint + "/put-pojo", formparams).close();
+
     }
 
     @Override
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorageProvider.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorageProvider.java	Tue Dec 18 19:25:39 2012 +0100
@@ -2,6 +2,7 @@
 
 import com.redhat.thermostat.storage.config.AuthenticationConfiguration;
 import com.redhat.thermostat.storage.config.StartupConfiguration;
+import com.redhat.thermostat.storage.core.QueuedStorage;
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.StorageProvider;
 
@@ -17,7 +18,7 @@
             AuthenticationConfiguration authConf = (AuthenticationConfiguration) config;
             storage.setAuthConfig(authConf.getUsername(), authConf.getPassword());
         }
-        return storage;
+        return new QueuedStorage(storage);
     }
 
     @Override
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Fri Dec 14 18:07:19 2012 -0500
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Tue Dec 18 19:25:39 2012 +0100
@@ -58,7 +58,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -149,13 +148,7 @@
                 return "http://fluff.example.org";
             }
         };
-        Executor exec = new Executor() {
-            @Override
-            public void execute(Runnable command) {
-                command.run();
-            }
-        };
-        storage = new WebStorage(config, exec);
+        storage = new WebStorage(config);
         storage.setEndpoint("http://localhost:" + port + "/");
         storage.setAgentId(new UUID(123, 456));
         headers = new HashMap<>();