changeset 1607:605f0c269f1c

Improve recovery mechanism for WebStorage. Reviewed-by: omajid Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2014-December/011936.html
author Severin Gehwolf <sgehwolf@redhat.com>
date Mon, 01 Dec 2014 10:43:12 +0100
parents c053ee51a8c0
children de525fbb26cf
files storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableDescriptorParsingException.java storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableStatementExecutionException.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCache.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCache.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementHolder.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCacheTest.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCacheTest.java web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java
diffstat 10 files changed, 930 insertions(+), 167 deletions(-) [+]
line wrap: on
line diff
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableDescriptorParsingException.java	Sun Dec 07 22:26:27 2014 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,52 +0,0 @@
-/*
- * Copyright 2012-2014 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-package com.redhat.thermostat.storage.core;
-
-/**
- * Exception thrown if parsing of a prepared statement descriptor
- * failed but retrying to prepare the same descriptor may succeed.
- *
- */
-@SuppressWarnings("serial")
-public class RetryableDescriptorParsingException extends
-        DescriptorParsingException {
-
-    public RetryableDescriptorParsingException(String msg) {
-        super(msg);
-    }
-
-}
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableStatementExecutionException.java	Sun Dec 07 22:26:27 2014 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,53 +0,0 @@
-/*
- * Copyright 2012-2014 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-package com.redhat.thermostat.storage.core;
-
-/**
- * 
- * Exception thrown if execution of a {@link PreparedStatement} failed, but
- * may succeed if the same statement gets executed a second time.
- *
- */
-@SuppressWarnings("serial")
-public class RetryableStatementExecutionException extends
-        StatementExecutionException {
-
-    public RetryableStatementExecutionException(Throwable cause) {
-        super(cause);
-    }
-    
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCache.java	Mon Dec 01 10:43:12 2014 +0100
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2012-2014 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.web.client.internal;
+
+import com.redhat.thermostat.storage.core.StatementDescriptor;
+import com.redhat.thermostat.storage.model.Pojo;
+import com.redhat.thermostat.web.common.SharedStateId;
+
+/**
+ * A {@link WebPreparedStatementCache} with an expiration time. When that time
+ * has passed it will return null for all cache entries. Used as a read-only
+ * transition cache in {@link WebStorage} in order to recover from server
+ * component reloads.
+ *
+ */
+class ExpirableWebPreparedStatementCache extends WebPreparedStatementCache {
+
+    private final long timeExpires;
+    private final WebPreparedStatementCache cache;
+    
+    ExpirableWebPreparedStatementCache(WebPreparedStatementCache cache, long timeExpires) {
+        this.timeExpires = timeExpires;
+        this.cache = cache;
+    }
+    
+    @Override
+    synchronized <T extends Pojo> WebPreparedStatementHolder get(StatementDescriptor<T> desc) {
+        WebPreparedStatementHolder holder = cache.get(desc);
+        if (holder == null) {
+            return null;
+        }
+        // check if corresponding cache entry has expired
+        long now = System.nanoTime();
+        if (now > timeExpires) {
+            // remove cache entry and return null
+            SharedStateId id = holder.getStatementId();
+            cache.remove(id);
+            return null;
+        }
+        return holder;
+    }
+    
+    @Override
+    synchronized <T extends Pojo> StatementDescriptor<T> get(SharedStateId id) {
+        StatementDescriptor<T> desc = cache.get(id);
+        if (desc == null) {
+            return null;
+        }
+        long now = System.nanoTime();
+        if (now > timeExpires) {
+            // remove cache entry and return null
+            cache.remove(id);
+            return null;
+        }
+        return desc;
+    }
+    
+    boolean isExpired() {
+        return System.nanoTime() > timeExpires;
+    }
+    
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCache.java	Mon Dec 01 10:43:12 2014 +0100
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012-2014 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.web.client.internal;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import com.redhat.thermostat.storage.core.StatementDescriptor;
+import com.redhat.thermostat.storage.model.Pojo;
+import com.redhat.thermostat.web.common.SharedStateId;
+import com.redhat.thermostat.web.common.WebPreparedStatement;
+
+/**
+ * A simple implementation of a cache for {@link WebPreparedStatement}
+ * in order to avoid unnecessary network round-trips if a statement
+ * has already been prepared and it's not out-of-sync with the server.
+ */
+class WebPreparedStatementCache {
+    
+    private final Map<StatementDescriptor<?>, WebPreparedStatementHolder> stmtCache;
+    private final Map<SharedStateId, StatementDescriptor<?>> reverseLookup;
+    
+    WebPreparedStatementCache() {
+        stmtCache = new WeakHashMap<>();
+        reverseLookup = new WeakHashMap<>();
+    }
+
+    synchronized <T extends Pojo> void put(StatementDescriptor<T> desc, WebPreparedStatementHolder holder) {
+        SharedStateId id = holder.getStatementId();
+        stmtCache.put(desc, holder);
+        reverseLookup.put(id, desc);
+    }
+    
+    synchronized <T extends Pojo> WebPreparedStatementHolder get(StatementDescriptor<T> desc) {
+        return stmtCache.get(desc);
+    }
+    
+    @SuppressWarnings("unchecked")
+    synchronized <T extends Pojo> StatementDescriptor<T> get(SharedStateId id) {
+        return (StatementDescriptor<T>)reverseLookup.get(id);
+    }
+    
+    synchronized void remove(SharedStateId id) {
+        StatementDescriptor<?> desc = reverseLookup.get(id);
+        if (desc != null) {
+            stmtCache.remove(desc);
+        }
+        reverseLookup.remove(id);
+    }
+    
+    /**
+     * Creates a snapshot of the current state of this cache.
+     * 
+     * @return A copied snapshot of this cache.
+     */
+    synchronized WebPreparedStatementCache createSnapshot() {
+        WebPreparedStatementCache copy = new WebPreparedStatementCache();
+        for (StatementDescriptor<?> desc: stmtCache.keySet()) {
+            copy.put(desc, stmtCache.get(desc));
+        }
+        return copy;
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementHolder.java	Mon Dec 01 10:43:12 2014 +0100
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2012-2014 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.web.client.internal;
+
+import java.lang.reflect.Type;
+
+import com.redhat.thermostat.web.common.SharedStateId;
+
+/**
+ * Container used for parameter caching in order to avoid unneccessary
+ * network overhead.
+ *
+ * @see {@link WebStorage}
+ * @see {@link WebPreparedStatementCache}
+ *
+ */
+class WebPreparedStatementHolder {
+    
+    private final Type typeToken;
+    private final int numParams;
+    private final SharedStateId statementId;
+    
+    WebPreparedStatementHolder(Type typeToken, int numParams, SharedStateId statementId) {
+        this.typeToken = typeToken;
+        this.numParams = numParams;
+        this.statementId = statementId;
+    }
+
+    Type getTypeToken() {
+        return typeToken;
+    }
+
+    int getNumParams() {
+        return numParams;
+    }
+
+    SharedStateId getStatementId() {
+        return statementId;
+    }
+}
\ No newline at end of file
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Sun Dec 07 22:26:27 2014 -0700
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Mon Dec 01 10:43:12 2014 +0100
@@ -51,7 +51,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.WeakHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -104,8 +104,6 @@
 import com.redhat.thermostat.storage.core.IllegalDescriptorException;
 import com.redhat.thermostat.storage.core.IllegalPatchException;
 import com.redhat.thermostat.storage.core.PreparedStatement;
-import com.redhat.thermostat.storage.core.RetryableDescriptorParsingException;
-import com.redhat.thermostat.storage.core.RetryableStatementExecutionException;
 import com.redhat.thermostat.storage.core.SecureStorage;
 import com.redhat.thermostat.storage.core.StatementDescriptor;
 import com.redhat.thermostat.storage.core.StatementExecutionException;
@@ -134,6 +132,10 @@
 
     private static final String HTTP_PREFIX = "http";
     private static final String HTTPS_PREFIX = "https";
+    
+    // Transition cache is valid for 30 seconds starting from the current time.
+    private static final long TRANSITION_CACHE_OFFSET = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+    
     static final Logger logger = LoggingUtils.getLogger(WebStorage.class);
     
     private static class CloseableHttpEntity implements Closeable, HttpEntity {
@@ -338,13 +340,13 @@
         
         @Override
         public int execute() throws StatementExecutionException {
-            return doWriteExecute(this);
+            return doWriteExecute(this, 0);
         }
 
         @Override
         public Cursor<T> executeQuery()
                 throws StatementExecutionException {
-            return doExecuteQuery(this, parametrizedTypeToken);
+            return doExecuteQuery(this, parametrizedTypeToken, 0);
         }
         
     }
@@ -361,12 +363,26 @@
     private StorageCredentials creds;
     private SecureRandom random;
     private WebConnection conn;
-    private Map<StatementDescriptor<?>, WebPreparedStatementHolder> stmtCache;
+    private WebPreparedStatementCache stmtCache;
+    // Temporary cache used for recovering after a server endpoint re-deployment.
+    // Will only be valid for 30 seconds for any server endpoint re-deployment.
+    private ExpirableWebPreparedStatementCache transitionStmtCache;
     
     // for testing
     WebStorage(String url, StorageCredentials creds, HttpClient client) {
         init(url, creds, client);
     }
+    
+    // for testing
+    WebStorage(WebPreparedStatementCache stmtCache, ExpirableWebPreparedStatementCache transitionCache) {
+        this.stmtCache = stmtCache;
+        this.transitionStmtCache = transitionCache;
+    }
+    
+    // for testing
+    WebStorage(Map<Category<?>, SharedStateId> categoryIds) {
+        this.categoryIds = categoryIds;
+    }
 
     public WebStorage(String url, StorageCredentials creds, SSLConfiguration sslConf) throws StorageException {
         PoolingHttpClientConnectionManager connManager = getPoolingHttpClientConnManager(sslConf, url);
@@ -402,10 +418,7 @@
         
         this.endpoint = url;
         this.creds = creds;
-        // Use weak map in order for it to not eat up too much memory. The sole
-        // purpose is to cache values and prevent some unnecessary network
-        // overhead.
-        this.stmtCache = new WeakHashMap<>();
+        this.stmtCache = new WebPreparedStatementCache();
     }
 
     // package private for testing
@@ -568,11 +581,15 @@
      *            Type parametrizedTypeToken = new
      *            TypeToken&lt;WebQueryResponse&lt;AgentInformation&gt;&gt;().getType();
      *            </pre>
+     * @param invocationCount The number of recursive invocations performed so far.
      * @return A cursor for the generic type.
      * @throws StatementExecutionException
-     *             If execution of the statement failed.
+     *             If execution of the statement failed. In particular, if
+     *             the state got out of sync, it tried to recover and then
+     *             failed again.
      */
-    private <T extends Pojo> Cursor<T> doExecuteQuery(WebPreparedStatement<T> stmt, Type parametrizedTypeToken) throws StatementExecutionException {
+    <T extends Pojo> Cursor<T> doExecuteQuery(final WebPreparedStatement<T> stmt, Type parametrizedTypeToken, final int invocationCount) throws StatementExecutionException {
+        checkRecursiveInvocationCount(invocationCount);
         NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class));
         List<NameValuePair> formparams = Arrays.asList(queryParam);
         WebQueryResponse<T> qResp = null;
@@ -594,10 +611,14 @@
             throw new StatementExecutionException(e);
         }
         case PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN: {
-            String msg = "Query failed to execute. Server changed token. Clearing prepared stmt cache!";
-            logger.log(Level.WARNING, msg);
-            clearPreparedStmtCache();
-            throw new RetryableStatementExecutionException(new RuntimeException(msg));
+            // Try to recover from this situation. If this path is
+            // entered more than once than we'll fail on method entry.
+            try {
+                WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt);
+                return doExecuteQuery(newStmt, parametrizedTypeToken, invocationCount + 1);
+            } catch (DescriptorParsingException e) {
+                throw new StatementExecutionException(e);
+            }
         }
         default: {
             String msg = "[query-execute] Unknown response from storage endpoint!";
@@ -607,6 +628,15 @@
         }
     }
     
+    private void checkRecursiveInvocationCount(int invocationCount) throws StatementExecutionException {
+        if (invocationCount > 1) {
+            // Initial invokation == 0, potential recovery-invocation == 1
+            String msg = "Failed to recover from out-of-sync state with server";
+            logger.log(Level.WARNING, msg);
+            throw new StatementExecutionException(new IllegalStateException(msg));
+        }
+    }
+    
     /**
      * This method gets called from WebCursor in order to fetch more results
      * or refresh the result set since parameters like limit or skip have
@@ -646,6 +676,9 @@
      * 
      * @param stmt
      *            The prepared statement to execute
+     * @param invocationCount
+     *            The number of times this method has been recursively called,
+     *            starting at 0.
      * @return The response code of executing the underlying data modifying
      *         statement.
      * @throws StatementExecutionException
@@ -653,8 +686,9 @@
      *             values set as prepared parameters did not work or were
      *             partially missing for the prepared statement.
      */
-    private <T extends Pojo> int doWriteExecute(WebPreparedStatement<T> stmt)
+    <T extends Pojo> int doWriteExecute(final WebPreparedStatement<T> stmt, final int invocationCount)
             throws StatementExecutionException {
+        checkRecursiveInvocationCount(invocationCount);
         NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class));
         List<NameValuePair> formparams = Arrays.asList(queryParam);
         int responseCode = PreparedStatementResponseCode.WRITE_GENERIC_FAILURE;
@@ -665,15 +699,19 @@
             throw new StatementExecutionException(e);
         }
         if (responseCode == PreparedStatementResponseCode.ILLEGAL_PATCH) {
-            String msg = "Illegal statement argument. See server logs for details.";
+            String msg = "Illegal statement argument. See server logs for details. Invokation count: " + invocationCount;
             IllegalArgumentException iae = new IllegalArgumentException(msg);
             IllegalPatchException e = new IllegalPatchException(iae);
             throw new StatementExecutionException(e);
         } else if (responseCode == PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN) {
-            String msg = "Write failed to execute. Server changed token. Clearing prepared stmt cache!";
-            logger.log(Level.WARNING, msg);
-            clearPreparedStmtCache();
-            throw new RetryableStatementExecutionException(new RuntimeException(msg));
+            // Try to recover from this situation. If this path is
+            // entered more than once than we'll fail on method entry.
+            try {
+                WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt);
+                return doWriteExecute(newStmt, invocationCount + 1);
+            } catch (DescriptorParsingException e) {
+                throw new StatementExecutionException(e);
+            }
         }
         return responseCode;
     }
@@ -773,10 +811,101 @@
         return categoryIds.get(category);
     }
     
-    private void clearPreparedStmtCache() {
-        synchronized(this.stmtCache) {
-            stmtCache.clear();
+    /**
+     * Package private for testing
+     * 
+     * This method handles the recovery mechanism which needs to be done before
+     * an already failed {@link WebPreparedStatement} can be re-submitted
+     * because some state maintained in the client (here) and on the server
+     * need to be in agreement.
+     * 
+     * Here is how the recovery mechanism works:
+     * 
+     * Pre: client and server agree on an ID for every statement. Any single
+     *      statement is uniquely identifiable via the (server-token, int-id)
+     *      pair. When this method is called we already know that when we first
+     *      tried to execute the statement we had an out-dated server-token in
+     *      record. Thus, we need to refresh the local cache with updated
+     *      statement IDs.
+     * 
+     * Getting the local cache back in sync can be done by:
+     * 1. Removing the old values from the current statement cache and 
+     * 2. Re-registering the underlying category and re-preparing the statement
+     *  
+     * The above two steps will be done once per statement descriptor. This will
+     * update the statement cache accordingly. However, since there may be other
+     * statements in the local queue waiting to be executed. Those pending
+     * statements still have old statement IDs in record. This is where the
+     * transition cache comes into play. There is no need to re-register categories
+     * and re-prepare statements for the same descriptor. It was already done
+     * once and the main statement cache updated accordingly. The transition cache is then used
+     * to get the descriptor from an old statement ID. I.e. whenever the transition
+     * cache is used it is no longer equal to the main statement cache. In a way
+     * the transition cache is a tool to get a descriptor for an now out-dated
+     * statement ID. Once we have the descriptor again we can look it up in the
+     * regular statement cache in (which has been updated previously) in order
+     * to get the updated values for the statement ID.
+     *  
+     * @param origStmt The original statement that failed to execute.
+     * @return A fixed-up statement which should succeed to execute if tried
+     *         again.
+     * @throws DescriptorParsingException If re-preparing a statement failed.
+     */
+    synchronized <T extends Pojo> WebPreparedStatement<T> handlePreparedStmtStateOutOfSync(final WebPreparedStatement<T> origStmt) throws DescriptorParsingException {
+        SharedStateId id = origStmt.getStatementId();
+        String msg = "Prepared statement failed to execute. Server changed token. Trying to recover stmt with id: " + id;
+        logger.log(Level.FINE, msg);
+        // Transition stmt cache needs to be created in 2 cases:
+        // 1. It might be null if it was the first time the server
+        //    re-deployed.
+        // 2. The server did re-deploy at least once and the time it happened
+        //    is more than TRANSITION_CACHE_OFFSET in the past. Case for
+        //    multiple re-deployments of the server parts.
+        if (transitionStmtCache == null || transitionStmtCache.isExpired()) {
+            // Create a transition cache which expires soon in the future
+            // in order to allow successful executions of queued statements which
+            // did not yet run and have old statement IDs in record.
+            logger.log(Level.FINE, "Re-creating transition cache");
+            WebPreparedStatementCache cacheSnapshot = stmtCache.createSnapshot();
+            long timeExpires = System.nanoTime() + TRANSITION_CACHE_OFFSET;
+            transitionStmtCache = new ExpirableWebPreparedStatementCache(cacheSnapshot, timeExpires);
         }
+        StatementDescriptor<T> desc = stmtCache.get(id);
+        // If the above returned null we most likely tried to execute a statement
+        // with an old server token. Attempt to use the transition cache in order
+        // to still be able to execute it successfully.
+        if (desc == null) {
+            desc = transitionStmtCache.get(id);
+            if (desc == null) {
+                throw new IllegalStateException("Irrecoverable error. GC happened or transition cache expired.");
+            }
+            WebPreparedStatementHolder transCacheHolder = transitionStmtCache.get(desc);
+            WebPreparedStatementHolder cacheHolder = stmtCache.get(desc);
+            if (transCacheHolder.getStatementId().equals(cacheHolder.getStatementId())) {
+                throw new IllegalStateException("Should not happen!");
+            }
+            // Transition case:
+            //
+            // Fetch the new mapping from the stmt cache since the statement id
+            // must have changed but category-registration and preparing the
+            // updated statement was done already.
+            SharedStateId stmtId = cacheHolder.getStatementId();
+            logger.log(Level.FINE, "Returning fixed-up statement using updated statement id: " + stmtId);
+            origStmt.setStatementId(stmtId);
+            return origStmt;
+        }
+        // Base case: re-register category and re-prepare statement. This will
+        //            be done *once* for every statement.
+        logger.log(Level.FINE, "Re-register category + prepareStatement + setting params: " + desc);
+        sendCategoryReRegistrationRequest(desc.getCategory());
+        stmtCache.remove(id);
+        // prepareStatement() will return a raw statement (no parameters will be
+        // set in this new datastructure). In order to make it executable right
+        // away we need to set the params via the params we have in record in the
+        // original stmt.
+        WebPreparedStatement<T> newStmt = (WebPreparedStatement<T>)prepareStatement(desc);
+        newStmt.setParams(origStmt.getParams());
+        return newStmt;
     }
 
     @Override
@@ -786,30 +915,29 @@
          * Avoid two network round-trips for statements which have already
          * been prepared. Note that this makes preparing statements not entirely
          * stateless, since the prepared statement ID might change if the
-         * web endpoint reboots, but the agent/client does not and keeps old IDs
-         * in the cache. This will likely produce statement execution errors.
-         * So if that happens you've just found the likely cause :)
+         * web endpoint reloads. If those IDs get out-of-sync we do our best
+         * to correct this situation by clearing the relevant cache entry and
+         * preparing the statement again.
          */
-        WebPreparedStatementHolder holder = null;
-        synchronized(this.stmtCache) {
-            if (this.stmtCache.containsKey(desc)) {
-                // note this is a WeakHashMap and may return null here
-                holder = stmtCache.get(desc);
-            }
-        }
+        WebPreparedStatementHolder holder = stmtCache.get(desc);
+        // note this is a WeakHashMap-backed cache and may return null
         if (holder == null) {
             // Cache-miss, send request over the wire and cache result.
-            holder = sendPrepareStmtRequest(desc);
-            synchronized(this.stmtCache) {
-                this.stmtCache.put(desc, holder);
-            }
+            holder = sendPrepareStmtRequest(desc, 0);
+            stmtCache.put(desc, holder);
         }
-        return new WebPreparedStatementImpl<>(holder.typeToken, holder.numParams, holder.statementId);
+        return new WebPreparedStatementImpl<>(holder.getTypeToken(), holder.getNumParams(), holder.getStatementId());
     }
     
     // package-private for testing
-    <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc)
+    <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, final int invokationCount)
             throws DescriptorParsingException {
+        if (invokationCount > 1) {
+            // Initial invokation == 0, potential recovery-invocation == 1
+            String msg = "Failed to recover from out-of-sync state with server";
+            logger.log(Level.WARNING, msg);
+            throw new DescriptorParsingException(msg);
+        }
         String strDesc = desc.getDescriptor();
         SharedStateId categoryId = getCategoryId(desc.getCategory());
         NameValuePair nameParam = new BasicNameValuePair("query-descriptor",
@@ -842,10 +970,10 @@
                     // representation of category IDs changed. Thus, be sure to
                     // clear the category state and get their new IDs.
                     String msg = "Preparing statement failed. Server changed category state. Clearing category ID for statement: " +
-                                    desc.getDescriptor();
-                    logger.log(Level.WARNING, msg);
+                                    desc.getDescriptor() + " and trying to recover.";
+                    logger.log(Level.FINE, msg);
                     sendCategoryReRegistrationRequest(desc.getCategory());
-                    throw new RetryableDescriptorParsingException(msg);
+                    return sendPrepareStmtRequest(desc, invokationCount + 1);
                 }
                 default: {
                     // Common case where stmtId is the actual ID of the statement
@@ -862,7 +990,8 @@
         }
     }
     
-    private synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) {
+    // package private for testing
+    synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) {
         // There are two possible cases. Category is an aggregate category or
         // it is not. For aggregate categories we need to re-register the
         // original first and then the aggregate category.
@@ -875,21 +1004,6 @@
         categoryIds.remove(category);
         registerCategory(category);
     }
-    
-    // Container used for parameter caching in order to avoid unneccessary
-    // network overhead.
-    static class WebPreparedStatementHolder {
-        
-        private final Type typeToken;
-        private final int numParams;
-        private final SharedStateId statementId;
-        
-        WebPreparedStatementHolder(Type typeToken, int numParams, SharedStateId statementId) {
-            this.typeToken = typeToken;
-            this.numParams = numParams;
-            this.statementId = statementId;
-        }
-    }
 
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCacheTest.java	Mon Dec 01 10:43:12 2014 +0100
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2012-2014 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.web.client.internal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.redhat.thermostat.storage.core.Category;
+import com.redhat.thermostat.storage.core.StatementDescriptor;
+import com.redhat.thermostat.web.common.SharedStateId;
+
+public class ExpirableWebPreparedStatementCacheTest {
+    
+    private static final UUID SERVER_TOKEN = UUID.randomUUID();
+    private static final SharedStateId STMT_ID = new SharedStateId(3, SERVER_TOKEN);
+    private WebPreparedStatementCache cache;
+    private StatementDescriptor<?> desc;
+    
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setup() {
+        cache = new WebPreparedStatementCache();
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        when(holder.getStatementId()).thenReturn(STMT_ID);
+        desc = new StatementDescriptor<>(mock(Category.class), "test-desc");
+        cache.put(desc, holder);
+    }
+
+    @Test
+    public void testGetByDesc() {
+        long expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
+        ExpirableWebPreparedStatementCache expCache = new ExpirableWebPreparedStatementCache(cache, expires);
+        assertNotNull("should not yet be expired", expCache.get(desc));
+        @SuppressWarnings("unchecked")
+        StatementDescriptor<?> notExisting = new StatementDescriptor<>(mock(Category.class), "testing");
+        assertNull("entry does not exist", expCache.get(notExisting));
+        
+        expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
+        expCache = new ExpirableWebPreparedStatementCache(cache, expires);
+        assertNull("should have expired", expCache.get(desc));
+    }
+    
+    @Test
+    public void testGetById() {
+        long expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
+        ExpirableWebPreparedStatementCache expCache = new ExpirableWebPreparedStatementCache(cache, expires);
+        assertNotNull("should not yet be expired", expCache.get(STMT_ID));
+        assertNull("entry does not exist", expCache.get(new SharedStateId(3, UUID.randomUUID())));
+        
+        expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
+        expCache = new ExpirableWebPreparedStatementCache(cache, expires);
+        assertNull("should have expired", expCache.get(STMT_ID));
+    }
+    
+    @Test
+    public void testIsExpired() {
+        long expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(20, TimeUnit.MILLISECONDS);
+        ExpirableWebPreparedStatementCache cache = new ExpirableWebPreparedStatementCache(null, expires);
+        assertTrue(cache.isExpired());
+        
+        expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS);
+        cache = new ExpirableWebPreparedStatementCache(null, expires);
+        assertFalse(cache.isExpired());
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCacheTest.java	Mon Dec 01 10:43:12 2014 +0100
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2012-2014 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.web.client.internal;
+
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.redhat.thermostat.storage.core.Category;
+import com.redhat.thermostat.storage.core.StatementDescriptor;
+import com.redhat.thermostat.storage.model.Pojo;
+import com.redhat.thermostat.web.common.SharedStateId;
+
+public class WebPreparedStatementCacheTest {
+
+    private WebPreparedStatementCache cache;
+
+    @Before
+    public void setup() {
+        cache = new WebPreparedStatementCache();
+    }
+
+    @Test
+    public void testPut() {
+        SharedStateId id = new SharedStateId(3, UUID.randomUUID());
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        when(holder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<TestPojo> cat = mock(Category.class);
+        StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat,
+                "something");
+        cache.put(foo, holder);
+        assertSame(holder, cache.get(foo));
+        assertSame(foo, cache.get(id));
+        WebPreparedStatementHolder holder2 = mock(WebPreparedStatementHolder.class);
+        when(holder2.getStatementId()).thenReturn(id);
+        cache.put(foo, holder2);
+        assertNotSame(holder, cache.get(foo));
+        assertSame(holder2, cache.get(foo));
+        assertSame(foo, cache.get(id));
+        
+        // Different server token should not result in an look-up-by-id clash.
+        SharedStateId id2 = new SharedStateId(3, UUID.randomUUID());
+        assertNull(cache.get(id2));
+    }
+
+    @Test
+    public void testGetByDesc() {
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = mock(SharedStateId.class);
+        when(holder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<TestPojo> cat = mock(Category.class);
+        StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat,
+                "something");
+        cache.put(foo, holder);
+        assertEquals(holder, cache.get(foo));
+        StatementDescriptor<TestPojo> other = new StatementDescriptor<>(cat,
+                "something-else");
+        assertNull(cache.get(other));
+    }
+
+    @Test
+    public void testGetById() {
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = mock(SharedStateId.class);
+        when(holder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<TestPojo> cat = mock(Category.class);
+        StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat,
+                "something");
+        cache.put(foo, holder);
+        assertEquals(foo, cache.get(id));
+        assertNull(cache.get(mock(SharedStateId.class)));
+    }
+
+    @Test
+    public void testRemove() {
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = mock(SharedStateId.class);
+        when(holder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<TestPojo> cat = mock(Category.class);
+        StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat,
+                "something");
+        cache.put(foo, holder);
+        assertEquals(holder, cache.get(foo));
+
+        cache.remove(id);
+        assertNull(cache.get(foo));
+        assertNull(cache.get(id));
+    }
+
+    /**
+     * Verify that we are able to create a snapshot of a cache in order to be
+     * able to look up old ID mappings.
+     */
+    @Test
+    public void verifyCreateSnapshot() {
+        WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = mock(SharedStateId.class);
+        when(holder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<TestPojo> cat = mock(Category.class);
+        StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat,
+                "something");
+        cache.put(foo, holder);
+        assertEquals(holder, cache.get(foo));
+        assertNotNull(cache.get(id));
+        
+        WebPreparedStatementCache copy = cache.createSnapshot();
+
+        cache.remove(id);
+        
+        assertNull(cache.get(foo));
+        assertNull(cache.get(id));
+        
+        assertNotNull(copy.get(foo));
+        assertNotNull(copy.get(id));
+    }
+
+    private static class TestPojo implements Pojo {
+        // nothing
+    }
+}
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Sun Dec 07 22:26:27 2014 -0700
+++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java	Mon Dec 01 10:43:12 2014 +0100
@@ -41,12 +41,15 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -58,9 +61,11 @@
 import java.net.URLDecoder;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -90,6 +95,7 @@
 import com.redhat.thermostat.storage.core.BackingStorage;
 import com.redhat.thermostat.storage.core.Categories;
 import com.redhat.thermostat.storage.core.Category;
+import com.redhat.thermostat.storage.core.CategoryAdapter;
 import com.redhat.thermostat.storage.core.Connection.ConnectionListener;
 import com.redhat.thermostat.storage.core.Connection.ConnectionStatus;
 import com.redhat.thermostat.storage.core.Cursor;
@@ -103,6 +109,7 @@
 import com.redhat.thermostat.storage.core.StorageCredentials;
 import com.redhat.thermostat.storage.core.StorageException;
 import com.redhat.thermostat.storage.core.experimental.BatchCursor;
+import com.redhat.thermostat.storage.model.AggregateResult;
 import com.redhat.thermostat.storage.model.Pojo;
 import com.redhat.thermostat.test.FreePortFinder;
 import com.redhat.thermostat.test.FreePortFinder.TryPort;
@@ -404,6 +411,57 @@
                      3, stmtId.getId());
     }
     
+    @Test
+    public void prepareStatementCachesStatements2() throws DescriptorParsingException {
+        WebPreparedStatementCache cache = mock(WebPreparedStatementCache.class);
+        final WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class);
+        WebStorage testStorage = new WebStorage(cache, null) {
+            <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, final int invokationCount)
+                    throws DescriptorParsingException {
+                if (invokationCount != 0) {
+                    throw new AssertionError("expected invokation count 0 but was: " + invokationCount);
+                }
+                return holder;
+            }
+        };
+        String strDesc = "QUERY test WHERE 'property1' = ?s";
+        StatementDescriptor<TestObj> desc = new StatementDescriptor<>(category, strDesc);
+        testStorage.prepareStatement(desc);
+        verify(cache).get(desc);
+        verify(cache).put(desc, holder);
+    }
+    
+    @Test
+    public void verifySendCategoryReRegistrationRequest() {
+        @SuppressWarnings("unchecked")
+        Map<Category<?>, SharedStateId> categoryMap = mock(Map.class);
+        final List<Category<?>> interceptedCategories = new ArrayList<>();
+        WebStorage testStorage = new WebStorage(categoryMap) {
+            @Override
+            public void registerCategory(Category<?> category) throws StorageException {
+                interceptedCategories.add(category);
+            }
+        };
+        // verify aggregate categories
+        Category<TestAggregate> aggregateCategory = new CategoryAdapter<TestObj, TestAggregate>(category).getAdapted(TestAggregate.class);
+        testStorage.sendCategoryReRegistrationRequest(aggregateCategory);
+        verify(categoryMap).remove(aggregateCategory);
+        verify(categoryMap).remove(category);
+        assertEquals(2, interceptedCategories.size());
+        assertEquals("Expected actual category to be re-registered first", category, interceptedCategories.get(0));
+        assertEquals("Expected aggregate category to be re-registered second", aggregateCategory, interceptedCategories.get(1));
+        
+        interceptedCategories.clear();
+        
+        // verify regular categories
+        testStorage.sendCategoryReRegistrationRequest(category);
+        // earlier test above did invoke it already once
+        verify(categoryMap, times(2)).remove(category);
+        assertEquals(1, interceptedCategories.size());
+        assertEquals(category, interceptedCategories.get(0));
+        verifyNoMoreInteractions(categoryMap);
+    }
+    
     /**
      * Tests a query which returns results in a single batch.
      * 
@@ -945,6 +1003,155 @@
         assertTrue(listener.disconnectEvent);
     }
     
+    @Test
+    public void verifyDoWriteExecuteMorethanOne() {
+        try {
+            storage.doWriteExecute(null, 2);
+            fail("Expected excecution exception since invoked count > 1");
+        } catch (StatementExecutionException e) {
+            Throwable cause = e.getCause();
+            // pass
+            assertEquals("Failed to recover from out-of-sync state with server", cause.getMessage());
+        }
+    }
+    
+    @Test
+    public void verifyDoExecuteQueryMorethanOne() {
+        try {
+            storage.doExecuteQuery(null, null, 2);
+            fail("Expected descriptor parsing exception since invoked count > 1");
+        } catch (StatementExecutionException e) {
+            Throwable cause = e.getCause();
+            // pass
+            assertEquals("Failed to recover from out-of-sync state with server", cause.getMessage());
+        }
+    }
+    
+    @Test
+    public void verifySendPreparedStatementRequestMoreThanOne() {
+        try {
+            storage.sendPrepareStmtRequest(null, 2);
+            fail("Expected descriptor parsing exception since invoked count > 1");
+        } catch (DescriptorParsingException e) {
+            // pass
+            assertEquals("Failed to recover from out-of-sync state with server", e.getMessage());
+        }
+    }
+    
+    /**
+     * Test the base case in {@link WebStorage#handlePreparedStmtStateOutOfSync(WebPreparedStatement)}.
+     * with null transition cache and non-null statement cache. This simulates the
+     * case where state got out of sync and handlePreparedStmtStateOutOfSync() is
+     * called the first time after that out-of-sync-event happened. In that case
+     * it is expected for prepareStatement() and sendCategoryReRegistrationRequest()
+     * to be called.
+     * 
+     * @throws DescriptorParsingException 
+     * 
+     */
+    @Test
+    public void testHandleStatementStateOutOfSyncBaseCase() throws DescriptorParsingException {
+        WebPreparedStatementHolder mockHolder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = new SharedStateId(300, UUID.randomUUID());
+        when(mockHolder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<Pojo> foo = mock(Category.class);
+        StatementDescriptor<Pojo> desc = new StatementDescriptor<>(foo, "testing");
+        WebPreparedStatementCache stmtCache = mock(WebPreparedStatementCache.class);
+        // this is called twice. Once for creating the snapshot and another time
+        // for getting the descriptor.
+        when(stmtCache.get(id)).thenReturn(desc).thenReturn(desc);
+        final boolean[] sendCategoryReRegistrationRequest = new boolean[1];
+        final boolean[] prepareStatement = new boolean[1];
+        final WebPreparedStatement<?> newStmt = mock(WebPreparedStatement.class);
+        WebStorage webStorage = new WebStorage(stmtCache, null) {
+            
+            @Override
+            protected synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) {
+                sendCategoryReRegistrationRequest[0] = true;
+            }
+            
+            @SuppressWarnings("unchecked")
+            @Override
+            public <T extends Pojo> PreparedStatement<T> prepareStatement(StatementDescriptor<T> desc)
+                    throws DescriptorParsingException {
+                prepareStatement[0] = true;
+                return (PreparedStatement<T>)newStmt;
+            }
+        };
+        @SuppressWarnings("unchecked")
+        WebPreparedStatement<Pojo> mockStmt = mock(WebPreparedStatement.class);
+        PreparedParameters mockParams = new PreparedParameters(3);
+        when(mockStmt.getParams()).thenReturn(mockParams);
+        when(mockStmt.getStatementId()).thenReturn(id);
+        
+        WebPreparedStatementCache stmtCacheSnapshot = mock(WebPreparedStatementCache.class);
+        when(stmtCache.createSnapshot()).thenReturn(stmtCacheSnapshot);
+        webStorage.handlePreparedStmtStateOutOfSync(mockStmt);
+        verify(stmtCache).createSnapshot();
+        verify(stmtCache).get(id);
+        verify(stmtCache).remove(id);
+        assertTrue("expected sendCategoryReRegistrationRequest() to be called", sendCategoryReRegistrationRequest[0]);
+        assertTrue("expected prepareStatement() to be called", prepareStatement[0]);
+        verify(newStmt).setParams(mockParams);
+        verifyNoMoreInteractions(stmtCache);
+    }
+    
+    /**
+     * Test the transition case in {@link WebStorage#handlePreparedStmtStateOutOfSync(WebPreparedStatement)}.
+     * with non-null transition cache and non-null statement cache.
+     * 
+     * This simulates the case where state got out of sync and handlePreparedStmtStateOutOfSync() is
+     * called <strong>not</strong> the first time after an out-of-sync-event happened. I.e.
+     * the base-case path has been entered first when a similar statement tried
+     * to execute, in turn, getting the statement id removed from the main
+     * statement cache.
+     * 
+     * In that case it is expected for the transition cache to become active
+     * allowing the statement to execute successfully. 
+     * 
+     * @throws DescriptorParsingException 
+     * 
+     */
+    @Test
+    public void testHandleStatementStateOutOfSyncTransitionCase() throws DescriptorParsingException {
+        WebPreparedStatementHolder mockHolder = mock(WebPreparedStatementHolder.class);
+        SharedStateId id = new SharedStateId(300, UUID.randomUUID());
+        when(mockHolder.getStatementId()).thenReturn(id);
+        @SuppressWarnings("unchecked")
+        Category<Pojo> foo = mock(Category.class);
+        StatementDescriptor<Pojo> desc = new StatementDescriptor<>(foo, "testing");
+        WebPreparedStatementCache stmtCache = mock(WebPreparedStatementCache.class);
+        // no setup for the id in stmtCache, however the transitionCache,
+        // a snapshot cache - created when the first call to handlePreparedStatementStateOutOfSync()
+        // came in - still "knows" about this record.
+        ExpirableWebPreparedStatementCache transitionCache = mock(ExpirableWebPreparedStatementCache.class);
+        when(transitionCache.isExpired()).thenReturn(false);
+        when(transitionCache.get(id)).thenReturn(desc);
+        when(transitionCache.get(desc)).thenReturn(mockHolder);
+        SharedStateId updatedId = new SharedStateId(301, UUID.randomUUID());
+        WebPreparedStatementHolder newHolder = mock(WebPreparedStatementHolder.class);
+        when(mockHolder.getStatementId()).thenReturn(id);
+        // called twice. once for equality check. once for getting the id and
+        // using it to update the prepared statement id.
+        when(newHolder.getStatementId()).thenReturn(updatedId).thenReturn(updatedId);
+        when(stmtCache.get(desc)).thenReturn(newHolder);
+        WebStorage webStorage = new WebStorage(stmtCache, transitionCache);
+        @SuppressWarnings("unchecked")
+        WebPreparedStatement<Pojo> mockStmt = mock(WebPreparedStatement.class);
+        when(mockStmt.getStatementId()).thenReturn(id);
+        WebPreparedStatement<Pojo> result = webStorage.handlePreparedStmtStateOutOfSync(mockStmt);
+        verify(mockStmt).setStatementId(updatedId);
+        assertSame(mockStmt, result);
+        verify(stmtCache).get(id);
+        verify(stmtCache).get(desc);
+        verify(transitionCache).isExpired();
+        verify(transitionCache).get(id);
+        verify(transitionCache).get(desc);
+        verifyNoMoreInteractions(stmtCache);
+        verifyNoMoreInteractions(transitionCache);
+    }
+    
     static class MyListener implements ConnectionListener {
 
         private CountDownLatch latch;
@@ -1000,7 +1207,7 @@
         }
         
         @Override
-        <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc)
+        <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, int invokationCounter)
                 throws DescriptorParsingException {
             int numParams = counter++;
             int stmtId = counter++;
@@ -1008,5 +1215,9 @@
             return new WebPreparedStatementHolder(TestObj.class, numParams, id); 
         }
     }
+    
+    private static class TestAggregate implements AggregateResult {
+        // nothing
+    }
 }
 
--- a/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java	Sun Dec 07 22:26:27 2014 -0700
+++ b/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java	Mon Dec 01 10:43:12 2014 +0100
@@ -395,6 +395,7 @@
                         .getParsedStatement();
                 response.setNumFreeVariables(parsed.getNumParams());
                 response.setStatementId(stmtId);
+                logger.log(Level.INFO, "Server: prepare-statement: stmt: " + desc + " got assigned id: " + stmtId.getId());
                 writeResponse(resp, response, WebPreparedStatementResponse.class);
             }
         }
@@ -835,7 +836,7 @@
             // perform the patching of the target statement.
             targetStatement = (DataModifyingStatement<T>)parsed.patchStatement(params);
         } catch (IllegalPatchException e) {
-            logger.log(Level.INFO, "Failed to execute write", e);
+            logger.log(Level.INFO, "Failed to execute write. Stmt id was: " + stmtId, e);
             writeResponse(resp, PreparedStatementResponseCode.ILLEGAL_PATCH, int.class);
             return;
         }