changeset 1281:7a4e7c3c4a1d

PR1529: Revisit implementation of asynchronous Storage. Reviewed-by: neugens Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2013-October/008376.html
author Severin Gehwolf <sgehwolf@redhat.com>
date Wed, 02 Oct 2013 18:14:39 +0200
parents e88531d6fdc6
children a4c836ada099
files storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedBackingStorage.java storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedBackingStorageTest.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageProviderTest.java storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java
diffstat 9 files changed, 232 insertions(+), 465 deletions(-) [+]
line wrap: on
line diff
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedBackingStorage.java	Fri Apr 05 18:57:56 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,231 +0,0 @@
-/*
- * Copyright 2012, 2013 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-package com.redhat.thermostat.storage.core;
-
-import java.util.concurrent.ExecutorService;
-
-import com.redhat.thermostat.storage.core.AggregateQuery.AggregateFunction;
-import com.redhat.thermostat.storage.model.Pojo;
-import com.redhat.thermostat.storage.query.Expression;
-
-public class QueuedBackingStorage extends QueuedStorage implements
-        BackingStorage {
-    
-    private class QueuedReplace<T extends Pojo> implements Replace<T> {
-
-        private final Replace<T> delegateReplace;
-        
-        private QueuedReplace(Replace<T> delegateReplace) {
-            this.delegateReplace = delegateReplace;
-        }
-        
-        @Override
-        public int apply() {
-            executor.execute(new Runnable() {
-                
-                @Override
-                public void run() {
-                    delegateReplace.apply();
-                }
-
-            });
-            return DataModifyingStatement.DEFAULT_STATUS_SUCCESS;
-        }
-
-        @Override
-        public void where(Expression expression) {
-            delegateReplace.where(expression);
-        }
-
-        @Override
-        public void set(String key, Object value) {
-            delegateReplace.set(key, value);
-        }
-        
-    }
-
-    private class QueuedAdd<T extends Pojo> implements Add<T> {
-        
-        private final Add<T> delegateAdd;
-        
-        private QueuedAdd(Add<T> delegateAdd) {
-            this.delegateAdd = delegateAdd;
-        }
-        
-        @Override
-        public int apply() {
-            executor.execute(new Runnable() {
-                
-                @Override
-                public void run() {
-                    delegateAdd.apply();
-                }
-
-            });
-            return DataModifyingStatement.DEFAULT_STATUS_SUCCESS;
-        }
-
-        @Override
-        public void set(String key, Object value) {
-            delegateAdd.set(key, value);
-        }
-        
-    }
-
-    private class QueuedUpdate<T extends Pojo> implements Update<T> {
-        
-        private final Update<T> delegateUpdate;
-
-        private QueuedUpdate(Update<T> delegateUpdate) {
-            this.delegateUpdate = delegateUpdate;
-        }
-
-        @Override
-        public void where(Expression expr) {
-            delegateUpdate.where(expr);
-            
-        }
-
-        @Override
-        public void set(String key, Object value) {
-            delegateUpdate.set(key, value);
-        }
-
-        @Override
-        public int apply() {
-            executor.execute(new Runnable() {
-                
-                @Override
-                public void run() {
-                    delegateUpdate.apply();
-                }
-
-            });
-            return DataModifyingStatement.DEFAULT_STATUS_SUCCESS;
-        }
-
-    }
-    
-    private class QueuedRemove<T extends Pojo> implements Remove<T> {
-        
-        private final Remove<T> delegateRemove;
-        
-        private QueuedRemove(Remove<T> delegateRemove) {
-            this.delegateRemove = delegateRemove;
-        }
-
-        @Override
-        public void where(Expression where) {
-            delegateRemove.where(where);
-        }
-
-        @Override
-        public int apply() {
-            executor.execute(new Runnable() {
-                
-                @Override
-                public void run() {
-                    delegateRemove.apply();
-                }
-
-            });
-            return DataModifyingStatement.DEFAULT_STATUS_SUCCESS;
-        }
-    }
-    
-    public QueuedBackingStorage(BackingStorage delegate) {
-        super(delegate);
-    }
-
-    QueuedBackingStorage(BackingStorage delegate, ExecutorService executor,
-            ExecutorService fileExecutor) {
-        super(delegate, executor, fileExecutor);
-    }
-
-    @Override
-    public <T extends Pojo> Query<T> createQuery(Category<T> category) {
-        return ((BackingStorage) delegate).createQuery(category);
-    }
-    
-    @Override
-    public <T extends Pojo> PreparedStatement<T> prepareStatement(
-            StatementDescriptor<T> desc) throws DescriptorParsingException {
-        // FIXME: Use some kind of cache in order to avoid parsing of
-        // descriptors each time this is called. At least if the descriptor
-        // class is the same we should be able to do something here.
-        
-        // Don't just defer to the delegate, since we want statements
-        // prepared by this method to create queries using the
-        // createQuery method in this class.
-        return PreparedStatementFactory.getInstance(this, desc);
-    }
-
-    @Override
-    public <T extends Pojo> Query<T> createAggregateQuery(
-            AggregateFunction function, Category<T> category) {
-        return ((BackingStorage) delegate).createAggregateQuery(function, category);
-    }
-
-    @Override
-    public <T extends Pojo> Add<T> createAdd(Category<T> category) {
-        Add<T> delegateAdd = ((BackingStorage)delegate).createAdd(category);
-        QueuedAdd<T> add = new QueuedAdd<>(delegateAdd);
-        return add;
-    }
-
-    @Override
-    public <T extends Pojo> Replace<T> createReplace(Category<T> category) {
-        Replace<T> delegateReplace = ((BackingStorage)delegate).createReplace(category);
-        QueuedReplace<T> replace = new QueuedReplace<>(delegateReplace);
-        return replace;
-    }
-
-    @Override
-    public <T extends Pojo> Update<T> createUpdate(Category<T> category) {
-        Update<T> delegateUpdate = ((BackingStorage)delegate).createUpdate(category);
-        QueuedUpdate<T> update = new QueuedUpdate<>(delegateUpdate);
-        return update;
-    }
-
-    @Override
-    public <T extends Pojo> Remove<T> createRemove(Category<T> category) {
-        Remove<T> delegateRemove = ((BackingStorage) delegate).createRemove(category);
-        QueuedRemove<T> remove = new QueuedRemove<>(delegateRemove);
-        return remove;
-    }
-
-}
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java	Wed Oct 02 18:14:39 2013 +0200
@@ -38,14 +38,19 @@
 package com.redhat.thermostat.storage.core;
 
 import java.io.InputStream;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import com.redhat.thermostat.common.utils.LoggingUtils;
 import com.redhat.thermostat.storage.model.Pojo;
 
 public class QueuedStorage implements Storage {
-
+    
+    private static final Logger logger = LoggingUtils.getLogger(QueuedStorage.class);
     private static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
 
     protected final Storage delegate;
@@ -53,10 +58,115 @@
     protected final ExecutorService fileExecutor;
 
     /*
-     * NOTE: We intentially use single-thread executor. All updates are put into a queue, from which
-     * a single dispatch thread calls the underlying storage. Using multiple dispatch threads
-     * could cause out-of-order issues, e.g. a VM death being reported before its VM start, which
-     * could confuse the heck out of clients.
+     * Decorates PreparedStatement.execute() so that executions of writes
+     * are queued.
+     */
+    class QueuedPreparedStatement<T extends Pojo> implements PreparedStatement<T> {
+        
+        private final PreparedStatement<T> delegate;
+        
+        private QueuedPreparedStatement(PreparedStatement<T> delegate) {
+            this.delegate = Objects.requireNonNull(delegate);
+        }
+
+        @Override
+        public void setBooleanList(int paramIndex, boolean[] paramValue) {
+            delegate.setBooleanList(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setLongList(int paramIndex, long[] paramValue) {
+            delegate.setLongList(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setIntList(int paramIndex, int[] paramValue) {
+            delegate.setIntList(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setDouble(int paramIndex, double paramValue) {
+            delegate.setDouble(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setDoubleList(int paramIndex, double[] paramValue) {
+            delegate.setDoubleList(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setPojo(int paramIndex, Pojo paramValue) {
+            delegate.setPojo(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setPojoList(int paramIndex, Pojo[] paramValue) {
+            delegate.setPojoList(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setBoolean(int paramIndex, boolean paramValue) {
+            delegate.setBoolean(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setLong(int paramIndex, long paramValue) {
+            delegate.setLong(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setInt(int paramIndex, int paramValue) {
+            delegate.setInt(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setString(int paramIndex, String paramValue) {
+            delegate.setString(paramIndex, paramValue);
+        }
+
+        @Override
+        public void setStringList(int paramIndex, String[] paramValue) {
+            delegate.setStringList(paramIndex, paramValue);
+        }
+
+        @Override
+        public int execute() throws StatementExecutionException {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        // TODO log return code of delegate, time execution?
+                        delegate.execute();
+                    } catch (StatementExecutionException e) {
+                        // There isn't much we can do in case of invalid
+                        // patch or the likes. Log it and move on.
+                        logger.log(Level.WARNING, "Failed to execute statement", e);
+                    }
+                }
+                
+            });
+            return DataModifyingStatement.DEFAULT_STATUS_SUCCESS;
+        }
+
+        @Override
+        public Cursor<T> executeQuery() throws StatementExecutionException {
+            return delegate.executeQuery();
+        }
+
+        @Override
+        public ParsedStatement<T> getParsedStatement() {
+            return delegate.getParsedStatement();
+        }
+        
+    }
+    
+    /*
+     * NOTE: We intentially use single-thread executor. All updates are put into
+     * a queue, from which a single dispatch thread calls the underlying
+     * storage. Using multiple dispatch threads could cause out-of-order issues,
+     * e.g. a VM death being reported before its VM start, which could confuse
+     * the heck out of clients.
      */
     public QueuedStorage(Storage delegate) {
         this(delegate, Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
@@ -120,7 +230,8 @@
     @Override
     public <T extends Pojo> PreparedStatement<T> prepareStatement(final StatementDescriptor<T> desc)
             throws DescriptorParsingException {
-        return delegate.prepareStatement(desc);
+        PreparedStatement<T> decoratee = delegate.prepareStatement(desc);
+        return new QueuedPreparedStatement<>(decoratee);
     }
 
     @Override
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedBackingStorageTest.java	Fri Apr 05 18:57:56 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,217 +0,0 @@
-/*
- * Copyright 2012, 2013 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-
-package com.redhat.thermostat.storage.core;
-
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.redhat.thermostat.storage.model.Pojo;
-
-
-public class QueuedBackingStorageTest {
-
-    private static class TestExecutor implements ExecutorService {
-
-        private Runnable task;
-        private boolean shutdown;
-
-        @Override
-        public void execute(Runnable task) {
-            this.task = task;
-        }
-
-        Runnable getTask() {
-            return task;
-        }
-
-        @Override
-        public void shutdown() {
-            shutdown = true;
-        }
-
-        @Override
-        public List<Runnable> shutdownNow() {
-            // Not used.
-            shutdown = true;
-            return null;
-        }
-
-        @Override
-        public boolean isShutdown() {
-            return shutdown;
-        }
-
-        @Override
-        public boolean isTerminated() {
-            // Not used.
-            return shutdown;
-        }
-
-        @Override
-        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-            return true;
-        }
-
-        @Override
-        public <T> Future<T> submit(Callable<T> task) {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public <T> Future<T> submit(Runnable task, T result) {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public Future<?> submit(Runnable task) {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(
-                Collection<? extends Callable<T>> tasks)
-                throws InterruptedException {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(
-                Collection<? extends Callable<T>> tasks, long timeout,
-                TimeUnit unit) throws InterruptedException {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException, ExecutionException {
-            // Not used.
-            return null;
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
-                long timeout, TimeUnit unit) throws InterruptedException,
-                ExecutionException, TimeoutException {
-            // Not used.
-            return null;
-        }
-
-    }
-    
-    private QueuedBackingStorage queuedStorage;
-    private BackingStorage delegateStorage;
-    private Query<TestPojo> delegateQuery;
-    private Replace<?> delegateReplace;
-
-    private TestExecutor executor;
-    private TestExecutor fileExecutor;
-
-    private Cursor<TestPojo> expectedResults;
-
-    @SuppressWarnings("unchecked")
-    @Before
-    public void setUp() {
-        executor = new TestExecutor();
-        fileExecutor = new TestExecutor();
-        delegateStorage = mock(BackingStorage.class);
-
-        delegateReplace = mock(Replace.class);
-        delegateQuery = (Query<TestPojo>) mock(Query.class);
-        expectedResults = (Cursor<TestPojo>) mock(Cursor.class);
-        when(delegateStorage.createQuery(any(Category.class))).thenReturn(delegateQuery);
-        when(delegateQuery.execute()).thenReturn(expectedResults);
-        when(delegateStorage.createReplace(any(Category.class))).thenReturn(delegateReplace);
-        
-        queuedStorage = new QueuedBackingStorage(delegateStorage, executor, fileExecutor);
-    }
-
-    @After
-    public void tearDown() {
-        expectedResults = null;
-        queuedStorage = null;
-        delegateStorage = null;
-        fileExecutor = null;
-        executor = null;
-        delegateQuery = null;
-    }
-    
-    @Test
-    public void testCreateQuery() {
-        @SuppressWarnings("unchecked")
-        Category<TestPojo> category = (Category<TestPojo>) mock(Category.class);
-        Query<TestPojo> query = queuedStorage.createQuery(category);
-        verify(delegateStorage).createQuery(category);
-        verifyNoMoreInteractions(delegateStorage);
-
-        Cursor<TestPojo> result = query.execute();
-        verify(delegateQuery).execute();
-        assertSame(expectedResults, result);
-
-        assertNull(executor.getTask());
-        assertNull(fileExecutor.getTask());
-    }
-    
-    private static class TestPojo implements Pojo {
-        
-    }
-
-}
-
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java	Wed Oct 02 18:14:39 2013 +0200
@@ -39,12 +39,15 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -65,7 +68,9 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import com.redhat.thermostat.storage.core.QueuedStorage.QueuedPreparedStatement;
 import com.redhat.thermostat.storage.model.Pojo;
 
 
@@ -193,8 +198,8 @@
     }
 
     private QueuedStorage queuedStorage;
-    private BackingStorage delegateStorage;
-
+    private Storage delegateStorage;
+    
     private TestExecutor executor;
     private TestExecutor fileExecutor;
 
@@ -204,7 +209,7 @@
     public void setUp() {
         executor = new TestExecutor();
         fileExecutor = new TestExecutor();
-        delegateStorage = mock(BackingStorage.class);
+        delegateStorage = mock(Storage.class);
 
         expectedFile = mock(InputStream.class);
         when(delegateStorage.loadFile(anyString())).thenReturn(expectedFile);
@@ -219,7 +224,7 @@
         fileExecutor = null;
         executor = null;
     }
-
+    
     @Test
     public void testPurge() {
 
@@ -260,6 +265,87 @@
         assertNull(executor.getTask());
         assertNull(fileExecutor.getTask());
     }
+    
+    @Test
+    public void testPrepareStatement() throws DescriptorParsingException, StatementExecutionException {
+        @SuppressWarnings("unchecked")
+        PreparedStatement<Pojo> statement = (PreparedStatement<Pojo>)mock(PreparedStatement.class);
+        when(delegateStorage.prepareStatement(anyStatementDescriptor())).thenReturn(statement);
+        
+        @SuppressWarnings("unchecked")
+        StatementDescriptor<Pojo> desc = mock(StatementDescriptor.class);
+        PreparedStatement<Pojo> decorated = queuedStorage.prepareStatement(desc);
+        assertNotNull(decorated);
+        assertTrue(decorated instanceof QueuedPreparedStatement);
+        assertNotSame(decorated, statement);
+        assertNull(executor.getTask());
+        
+        // make sure execution queues a runnable
+        decorated.execute();
+        assertNotNull(executor.getTask());
+    }
+    
+    /*
+     * QueuedStorage decorates PreparedStatement, which may throw a
+     * StatementExecution exception on stmt.execute(). All the decorator can do
+     * is to log the exception and continue. As such, the decorated
+     * PreparedStatement should never throw that exception. It can't since it
+     * solely submits a runnable for execution.
+     */
+    @Test
+    public void testExecutePreparedStatementFails()
+            throws DescriptorParsingException, StatementExecutionException {
+        @SuppressWarnings("unchecked")
+        PreparedStatement<Pojo> statement = (PreparedStatement<Pojo>)mock(PreparedStatement.class);
+        
+        // make sure the delegate throws a StatementExecutionException
+        Mockito.doThrow(StatementExecutionException.class).when(statement).execute();
+        when(delegateStorage.prepareStatement(anyStatementDescriptor())).thenReturn(statement);
+        
+        @SuppressWarnings("unchecked")
+        StatementDescriptor<Pojo> desc = mock(StatementDescriptor.class);
+        PreparedStatement<Pojo> decorated = queuedStorage.prepareStatement(desc);
+        assertNotNull(decorated);
+        assertTrue(decorated instanceof QueuedPreparedStatement);
+        assertNotSame(decorated, statement);
+        assertNull(executor.getTask());
+        
+        // this should not throw the exception the decoratee throws.
+        try {
+            decorated.execute();
+            // pass
+        } catch (StatementExecutionException e) {
+            fail("decorator should only submit task and continue");
+        }
+        assertNotNull(executor.getTask());
+        try {
+            executor.getTask().run();
+            // pass
+        } catch (Exception e) {
+            fail("delegate's StatementExecutionException should have been caught");
+        }
+    }
+    
+    @Test
+    public void testPrepareStatementDescriptorParseFailure() throws DescriptorParsingException, StatementExecutionException {
+        
+        Mockito.doThrow(DescriptorParsingException.class).when(delegateStorage).prepareStatement(anyStatementDescriptor());
+        
+        @SuppressWarnings("unchecked")
+        StatementDescriptor<Pojo> desc = mock(StatementDescriptor.class);
+        
+        try {
+            queuedStorage.prepareStatement(desc);
+            fail("should have thrown descptor parsing exception!");
+        } catch (DescriptorParsingException e) {
+            // pass
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private <T extends Pojo> StatementDescriptor<T> anyStatementDescriptor() {
+        return any(StatementDescriptor.class);
+    }
 
     @Test
     public void testRegisterCategory() {
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/MongoStorageProvider.java	Wed Oct 02 18:14:39 2013 +0200
@@ -37,7 +37,7 @@
 package com.redhat.thermostat.storage.mongodb;
 
 import com.redhat.thermostat.storage.config.StartupConfiguration;
-import com.redhat.thermostat.storage.core.QueuedBackingStorage;
+import com.redhat.thermostat.storage.core.QueuedStorage;
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.StorageProvider;
 import com.redhat.thermostat.storage.mongodb.internal.MongoStorage;
@@ -53,7 +53,7 @@
     @Override
     public Storage createStorage() {
         MongoStorage storage = new MongoStorage(configuration);
-        return new QueuedBackingStorage(storage);
+        return new QueuedStorage(storage);
     }
 
     @Override
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Wed Oct 02 18:14:39 2013 +0200
@@ -67,6 +67,7 @@
 import com.redhat.thermostat.storage.core.DescriptorParsingException;
 import com.redhat.thermostat.storage.core.Key;
 import com.redhat.thermostat.storage.core.PreparedStatement;
+import com.redhat.thermostat.storage.core.PreparedStatementFactory;
 import com.redhat.thermostat.storage.core.Query;
 import com.redhat.thermostat.storage.core.Remove;
 import com.redhat.thermostat.storage.core.Replace;
@@ -474,12 +475,17 @@
         }
     }
 
+    /*
+     *  QueuedStorage decorator uses this method and "wraps" the returned
+     *  PreparedStatement so that it executes in a queued fashion.
+     */
     @Override
     public <T extends Pojo> PreparedStatement<T> prepareStatement(StatementDescriptor<T> statementDesc)
             throws DescriptorParsingException {
-        // Queued storage decorator should override this. This should never
-        // be called.
-        throw new IllegalStateException();
+        // FIXME: Use some kind of cache in order to avoid parsing of
+        // descriptors each time this is called. At least if the descriptor
+        // class is the same we should be able to do something here.
+        return PreparedStatementFactory.getInstance(this, statementDesc);
     }
 
     @Override
--- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageProviderTest.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageProviderTest.java	Wed Oct 02 18:14:39 2013 +0200
@@ -44,7 +44,6 @@
 import org.junit.Test;
 
 import com.redhat.thermostat.storage.config.StartupConfiguration;
-import com.redhat.thermostat.storage.core.BackingStorage;
 import com.redhat.thermostat.storage.core.QueuedStorage;
 import com.redhat.thermostat.storage.core.SecureStorage;
 import com.redhat.thermostat.storage.core.Storage;
@@ -60,7 +59,6 @@
         provider.setConfig(config);
         Storage result = provider.createStorage();
         assertTrue(result instanceof QueuedStorage);
-        assertTrue(result instanceof BackingStorage);
         assertFalse(result instanceof SecureStorage);
     }
 }
--- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorageTest.java	Wed Oct 02 18:14:39 2013 +0200
@@ -84,6 +84,7 @@
 import com.mongodb.gridfs.GridFSInputFile;
 import com.redhat.thermostat.storage.config.StartupConfiguration;
 import com.redhat.thermostat.storage.core.Add;
+import com.redhat.thermostat.storage.core.BackingStorage;
 import com.redhat.thermostat.storage.core.Category;
 import com.redhat.thermostat.storage.core.CategoryAdapter;
 import com.redhat.thermostat.storage.core.Cursor;
@@ -251,6 +252,12 @@
     }
     
     @Test
+    public void isBackingStorage() {
+        MongoStorage storage = new MongoStorage(conf);
+        assertTrue(storage instanceof BackingStorage);
+    }
+    
+    @Test
     public void testRegisterCategory() throws Exception {
         DB db = PowerMockito.mock(DB.class);
         CountDownLatch latch = new CountDownLatch(1);
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Fri Apr 05 18:57:56 2013 +0200
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Wed Oct 02 18:14:39 2013 +0200
@@ -90,6 +90,7 @@
 import com.redhat.thermostat.storage.core.Category;
 import com.redhat.thermostat.storage.core.Connection.ConnectionListener;
 import com.redhat.thermostat.storage.core.Connection.ConnectionStatus;
+import com.redhat.thermostat.storage.core.BackingStorage;
 import com.redhat.thermostat.storage.core.Cursor;
 import com.redhat.thermostat.storage.core.DescriptorParsingException;
 import com.redhat.thermostat.storage.core.IllegalDescriptorException;
@@ -256,6 +257,12 @@
         storage.registerCategory(category);
     }
     
+    // WebStorage is a proxy storage, no backing storage
+    @Test
+    public void isNoBackingStorage() {
+        assertFalse(storage instanceof BackingStorage);
+    }
+    
     @Test
     public void preparingFaultyDescriptorThrowsException() throws UnsupportedEncodingException, IOException {
         Gson gson = new GsonBuilder()