Mercurial > hg > release > thermostat-1.2
changeset 1607:605f0c269f1c
Improve recovery mechanism for WebStorage.
Reviewed-by: omajid
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2014-December/011936.html
line wrap: on
line diff
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableDescriptorParsingException.java Sun Dec 07 22:26:27 2014 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,52 +0,0 @@ -/* - * Copyright 2012-2014 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.storage.core; - -/** - * Exception thrown if parsing of a prepared statement descriptor - * failed but retrying to prepare the same descriptor may succeed. - * - */ -@SuppressWarnings("serial") -public class RetryableDescriptorParsingException extends - DescriptorParsingException { - - public RetryableDescriptorParsingException(String msg) { - super(msg); - } - -}
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/RetryableStatementExecutionException.java Sun Dec 07 22:26:27 2014 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,53 +0,0 @@ -/* - * Copyright 2012-2014 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.storage.core; - -/** - * - * Exception thrown if execution of a {@link PreparedStatement} failed, but - * may succeed if the same statement gets executed a second time. - * - */ -@SuppressWarnings("serial") -public class RetryableStatementExecutionException extends - StatementExecutionException { - - public RetryableStatementExecutionException(Throwable cause) { - super(cause); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCache.java Mon Dec 01 10:43:12 2014 +0100 @@ -0,0 +1,96 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import com.redhat.thermostat.storage.core.StatementDescriptor; +import com.redhat.thermostat.storage.model.Pojo; +import com.redhat.thermostat.web.common.SharedStateId; + +/** + * A {@link WebPreparedStatementCache} with an expiration time. When that time + * has passed it will return null for all cache entries. Used as a read-only + * transition cache in {@link WebStorage} in order to recover from server + * component reloads. + * + */ +class ExpirableWebPreparedStatementCache extends WebPreparedStatementCache { + + private final long timeExpires; + private final WebPreparedStatementCache cache; + + ExpirableWebPreparedStatementCache(WebPreparedStatementCache cache, long timeExpires) { + this.timeExpires = timeExpires; + this.cache = cache; + } + + @Override + synchronized <T extends Pojo> WebPreparedStatementHolder get(StatementDescriptor<T> desc) { + WebPreparedStatementHolder holder = cache.get(desc); + if (holder == null) { + return null; + } + // check if corresponding cache entry has expired + long now = System.nanoTime(); + if (now > timeExpires) { + // remove cache entry and return null + SharedStateId id = holder.getStatementId(); + cache.remove(id); + return null; + } + return holder; + } + + @Override + synchronized <T extends Pojo> StatementDescriptor<T> get(SharedStateId id) { + StatementDescriptor<T> desc = cache.get(id); + if (desc == null) { + return null; + } + long now = System.nanoTime(); + if (now > timeExpires) { + // remove cache entry and return null + cache.remove(id); + return null; + } + return desc; + } + + boolean isExpired() { + return System.nanoTime() > timeExpires; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCache.java Mon Dec 01 10:43:12 2014 +0100 @@ -0,0 +1,98 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import java.util.Map; +import java.util.WeakHashMap; + +import com.redhat.thermostat.storage.core.StatementDescriptor; +import com.redhat.thermostat.storage.model.Pojo; +import com.redhat.thermostat.web.common.SharedStateId; +import com.redhat.thermostat.web.common.WebPreparedStatement; + +/** + * A simple implementation of a cache for {@link WebPreparedStatement} + * in order to avoid unnecessary network round-trips if a statement + * has already been prepared and it's not out-of-sync with the server. + */ +class WebPreparedStatementCache { + + private final Map<StatementDescriptor<?>, WebPreparedStatementHolder> stmtCache; + private final Map<SharedStateId, StatementDescriptor<?>> reverseLookup; + + WebPreparedStatementCache() { + stmtCache = new WeakHashMap<>(); + reverseLookup = new WeakHashMap<>(); + } + + synchronized <T extends Pojo> void put(StatementDescriptor<T> desc, WebPreparedStatementHolder holder) { + SharedStateId id = holder.getStatementId(); + stmtCache.put(desc, holder); + reverseLookup.put(id, desc); + } + + synchronized <T extends Pojo> WebPreparedStatementHolder get(StatementDescriptor<T> desc) { + return stmtCache.get(desc); + } + + @SuppressWarnings("unchecked") + synchronized <T extends Pojo> StatementDescriptor<T> get(SharedStateId id) { + return (StatementDescriptor<T>)reverseLookup.get(id); + } + + synchronized void remove(SharedStateId id) { + StatementDescriptor<?> desc = reverseLookup.get(id); + if (desc != null) { + stmtCache.remove(desc); + } + reverseLookup.remove(id); + } + + /** + * Creates a snapshot of the current state of this cache. + * + * @return A copied snapshot of this cache. + */ + synchronized WebPreparedStatementCache createSnapshot() { + WebPreparedStatementCache copy = new WebPreparedStatementCache(); + for (StatementDescriptor<?> desc: stmtCache.keySet()) { + copy.put(desc, stmtCache.get(desc)); + } + return copy; + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementHolder.java Mon Dec 01 10:43:12 2014 +0100 @@ -0,0 +1,74 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import java.lang.reflect.Type; + +import com.redhat.thermostat.web.common.SharedStateId; + +/** + * Container used for parameter caching in order to avoid unneccessary + * network overhead. + * + * @see {@link WebStorage} + * @see {@link WebPreparedStatementCache} + * + */ +class WebPreparedStatementHolder { + + private final Type typeToken; + private final int numParams; + private final SharedStateId statementId; + + WebPreparedStatementHolder(Type typeToken, int numParams, SharedStateId statementId) { + this.typeToken = typeToken; + this.numParams = numParams; + this.statementId = statementId; + } + + Type getTypeToken() { + return typeToken; + } + + int getNumParams() { + return numParams; + } + + SharedStateId getStatementId() { + return statementId; + } +} \ No newline at end of file
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Sun Dec 07 22:26:27 2014 -0700 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Mon Dec 01 10:43:12 2014 +0100 @@ -51,7 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.WeakHashMap; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -104,8 +104,6 @@ import com.redhat.thermostat.storage.core.IllegalDescriptorException; import com.redhat.thermostat.storage.core.IllegalPatchException; import com.redhat.thermostat.storage.core.PreparedStatement; -import com.redhat.thermostat.storage.core.RetryableDescriptorParsingException; -import com.redhat.thermostat.storage.core.RetryableStatementExecutionException; import com.redhat.thermostat.storage.core.SecureStorage; import com.redhat.thermostat.storage.core.StatementDescriptor; import com.redhat.thermostat.storage.core.StatementExecutionException; @@ -134,6 +132,10 @@ private static final String HTTP_PREFIX = "http"; private static final String HTTPS_PREFIX = "https"; + + // Transition cache is valid for 30 seconds starting from the current time. + private static final long TRANSITION_CACHE_OFFSET = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + static final Logger logger = LoggingUtils.getLogger(WebStorage.class); private static class CloseableHttpEntity implements Closeable, HttpEntity { @@ -338,13 +340,13 @@ @Override public int execute() throws StatementExecutionException { - return doWriteExecute(this); + return doWriteExecute(this, 0); } @Override public Cursor<T> executeQuery() throws StatementExecutionException { - return doExecuteQuery(this, parametrizedTypeToken); + return doExecuteQuery(this, parametrizedTypeToken, 0); } } @@ -361,12 +363,26 @@ private StorageCredentials creds; private SecureRandom random; private WebConnection conn; - private Map<StatementDescriptor<?>, WebPreparedStatementHolder> stmtCache; + private WebPreparedStatementCache stmtCache; + // Temporary cache used for recovering after a server endpoint re-deployment. + // Will only be valid for 30 seconds for any server endpoint re-deployment. + private ExpirableWebPreparedStatementCache transitionStmtCache; // for testing WebStorage(String url, StorageCredentials creds, HttpClient client) { init(url, creds, client); } + + // for testing + WebStorage(WebPreparedStatementCache stmtCache, ExpirableWebPreparedStatementCache transitionCache) { + this.stmtCache = stmtCache; + this.transitionStmtCache = transitionCache; + } + + // for testing + WebStorage(Map<Category<?>, SharedStateId> categoryIds) { + this.categoryIds = categoryIds; + } public WebStorage(String url, StorageCredentials creds, SSLConfiguration sslConf) throws StorageException { PoolingHttpClientConnectionManager connManager = getPoolingHttpClientConnManager(sslConf, url); @@ -402,10 +418,7 @@ this.endpoint = url; this.creds = creds; - // Use weak map in order for it to not eat up too much memory. The sole - // purpose is to cache values and prevent some unnecessary network - // overhead. - this.stmtCache = new WeakHashMap<>(); + this.stmtCache = new WebPreparedStatementCache(); } // package private for testing @@ -568,11 +581,15 @@ * Type parametrizedTypeToken = new * TypeToken<WebQueryResponse<AgentInformation>>().getType(); * </pre> + * @param invocationCount The number of recursive invocations performed so far. * @return A cursor for the generic type. * @throws StatementExecutionException - * If execution of the statement failed. + * If execution of the statement failed. In particular, if + * the state got out of sync, it tried to recover and then + * failed again. */ - private <T extends Pojo> Cursor<T> doExecuteQuery(WebPreparedStatement<T> stmt, Type parametrizedTypeToken) throws StatementExecutionException { + <T extends Pojo> Cursor<T> doExecuteQuery(final WebPreparedStatement<T> stmt, Type parametrizedTypeToken, final int invocationCount) throws StatementExecutionException { + checkRecursiveInvocationCount(invocationCount); NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class)); List<NameValuePair> formparams = Arrays.asList(queryParam); WebQueryResponse<T> qResp = null; @@ -594,10 +611,14 @@ throw new StatementExecutionException(e); } case PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN: { - String msg = "Query failed to execute. Server changed token. Clearing prepared stmt cache!"; - logger.log(Level.WARNING, msg); - clearPreparedStmtCache(); - throw new RetryableStatementExecutionException(new RuntimeException(msg)); + // Try to recover from this situation. If this path is + // entered more than once than we'll fail on method entry. + try { + WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt); + return doExecuteQuery(newStmt, parametrizedTypeToken, invocationCount + 1); + } catch (DescriptorParsingException e) { + throw new StatementExecutionException(e); + } } default: { String msg = "[query-execute] Unknown response from storage endpoint!"; @@ -607,6 +628,15 @@ } } + private void checkRecursiveInvocationCount(int invocationCount) throws StatementExecutionException { + if (invocationCount > 1) { + // Initial invokation == 0, potential recovery-invocation == 1 + String msg = "Failed to recover from out-of-sync state with server"; + logger.log(Level.WARNING, msg); + throw new StatementExecutionException(new IllegalStateException(msg)); + } + } + /** * This method gets called from WebCursor in order to fetch more results * or refresh the result set since parameters like limit or skip have @@ -646,6 +676,9 @@ * * @param stmt * The prepared statement to execute + * @param invocationCount + * The number of times this method has been recursively called, + * starting at 0. * @return The response code of executing the underlying data modifying * statement. * @throws StatementExecutionException @@ -653,8 +686,9 @@ * values set as prepared parameters did not work or were * partially missing for the prepared statement. */ - private <T extends Pojo> int doWriteExecute(WebPreparedStatement<T> stmt) + <T extends Pojo> int doWriteExecute(final WebPreparedStatement<T> stmt, final int invocationCount) throws StatementExecutionException { + checkRecursiveInvocationCount(invocationCount); NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class)); List<NameValuePair> formparams = Arrays.asList(queryParam); int responseCode = PreparedStatementResponseCode.WRITE_GENERIC_FAILURE; @@ -665,15 +699,19 @@ throw new StatementExecutionException(e); } if (responseCode == PreparedStatementResponseCode.ILLEGAL_PATCH) { - String msg = "Illegal statement argument. See server logs for details."; + String msg = "Illegal statement argument. See server logs for details. Invokation count: " + invocationCount; IllegalArgumentException iae = new IllegalArgumentException(msg); IllegalPatchException e = new IllegalPatchException(iae); throw new StatementExecutionException(e); } else if (responseCode == PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN) { - String msg = "Write failed to execute. Server changed token. Clearing prepared stmt cache!"; - logger.log(Level.WARNING, msg); - clearPreparedStmtCache(); - throw new RetryableStatementExecutionException(new RuntimeException(msg)); + // Try to recover from this situation. If this path is + // entered more than once than we'll fail on method entry. + try { + WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt); + return doWriteExecute(newStmt, invocationCount + 1); + } catch (DescriptorParsingException e) { + throw new StatementExecutionException(e); + } } return responseCode; } @@ -773,10 +811,101 @@ return categoryIds.get(category); } - private void clearPreparedStmtCache() { - synchronized(this.stmtCache) { - stmtCache.clear(); + /** + * Package private for testing + * + * This method handles the recovery mechanism which needs to be done before + * an already failed {@link WebPreparedStatement} can be re-submitted + * because some state maintained in the client (here) and on the server + * need to be in agreement. + * + * Here is how the recovery mechanism works: + * + * Pre: client and server agree on an ID for every statement. Any single + * statement is uniquely identifiable via the (server-token, int-id) + * pair. When this method is called we already know that when we first + * tried to execute the statement we had an out-dated server-token in + * record. Thus, we need to refresh the local cache with updated + * statement IDs. + * + * Getting the local cache back in sync can be done by: + * 1. Removing the old values from the current statement cache and + * 2. Re-registering the underlying category and re-preparing the statement + * + * The above two steps will be done once per statement descriptor. This will + * update the statement cache accordingly. However, since there may be other + * statements in the local queue waiting to be executed. Those pending + * statements still have old statement IDs in record. This is where the + * transition cache comes into play. There is no need to re-register categories + * and re-prepare statements for the same descriptor. It was already done + * once and the main statement cache updated accordingly. The transition cache is then used + * to get the descriptor from an old statement ID. I.e. whenever the transition + * cache is used it is no longer equal to the main statement cache. In a way + * the transition cache is a tool to get a descriptor for an now out-dated + * statement ID. Once we have the descriptor again we can look it up in the + * regular statement cache in (which has been updated previously) in order + * to get the updated values for the statement ID. + * + * @param origStmt The original statement that failed to execute. + * @return A fixed-up statement which should succeed to execute if tried + * again. + * @throws DescriptorParsingException If re-preparing a statement failed. + */ + synchronized <T extends Pojo> WebPreparedStatement<T> handlePreparedStmtStateOutOfSync(final WebPreparedStatement<T> origStmt) throws DescriptorParsingException { + SharedStateId id = origStmt.getStatementId(); + String msg = "Prepared statement failed to execute. Server changed token. Trying to recover stmt with id: " + id; + logger.log(Level.FINE, msg); + // Transition stmt cache needs to be created in 2 cases: + // 1. It might be null if it was the first time the server + // re-deployed. + // 2. The server did re-deploy at least once and the time it happened + // is more than TRANSITION_CACHE_OFFSET in the past. Case for + // multiple re-deployments of the server parts. + if (transitionStmtCache == null || transitionStmtCache.isExpired()) { + // Create a transition cache which expires soon in the future + // in order to allow successful executions of queued statements which + // did not yet run and have old statement IDs in record. + logger.log(Level.FINE, "Re-creating transition cache"); + WebPreparedStatementCache cacheSnapshot = stmtCache.createSnapshot(); + long timeExpires = System.nanoTime() + TRANSITION_CACHE_OFFSET; + transitionStmtCache = new ExpirableWebPreparedStatementCache(cacheSnapshot, timeExpires); } + StatementDescriptor<T> desc = stmtCache.get(id); + // If the above returned null we most likely tried to execute a statement + // with an old server token. Attempt to use the transition cache in order + // to still be able to execute it successfully. + if (desc == null) { + desc = transitionStmtCache.get(id); + if (desc == null) { + throw new IllegalStateException("Irrecoverable error. GC happened or transition cache expired."); + } + WebPreparedStatementHolder transCacheHolder = transitionStmtCache.get(desc); + WebPreparedStatementHolder cacheHolder = stmtCache.get(desc); + if (transCacheHolder.getStatementId().equals(cacheHolder.getStatementId())) { + throw new IllegalStateException("Should not happen!"); + } + // Transition case: + // + // Fetch the new mapping from the stmt cache since the statement id + // must have changed but category-registration and preparing the + // updated statement was done already. + SharedStateId stmtId = cacheHolder.getStatementId(); + logger.log(Level.FINE, "Returning fixed-up statement using updated statement id: " + stmtId); + origStmt.setStatementId(stmtId); + return origStmt; + } + // Base case: re-register category and re-prepare statement. This will + // be done *once* for every statement. + logger.log(Level.FINE, "Re-register category + prepareStatement + setting params: " + desc); + sendCategoryReRegistrationRequest(desc.getCategory()); + stmtCache.remove(id); + // prepareStatement() will return a raw statement (no parameters will be + // set in this new datastructure). In order to make it executable right + // away we need to set the params via the params we have in record in the + // original stmt. + WebPreparedStatement<T> newStmt = (WebPreparedStatement<T>)prepareStatement(desc); + newStmt.setParams(origStmt.getParams()); + return newStmt; } @Override @@ -786,30 +915,29 @@ * Avoid two network round-trips for statements which have already * been prepared. Note that this makes preparing statements not entirely * stateless, since the prepared statement ID might change if the - * web endpoint reboots, but the agent/client does not and keeps old IDs - * in the cache. This will likely produce statement execution errors. - * So if that happens you've just found the likely cause :) + * web endpoint reloads. If those IDs get out-of-sync we do our best + * to correct this situation by clearing the relevant cache entry and + * preparing the statement again. */ - WebPreparedStatementHolder holder = null; - synchronized(this.stmtCache) { - if (this.stmtCache.containsKey(desc)) { - // note this is a WeakHashMap and may return null here - holder = stmtCache.get(desc); - } - } + WebPreparedStatementHolder holder = stmtCache.get(desc); + // note this is a WeakHashMap-backed cache and may return null if (holder == null) { // Cache-miss, send request over the wire and cache result. - holder = sendPrepareStmtRequest(desc); - synchronized(this.stmtCache) { - this.stmtCache.put(desc, holder); - } + holder = sendPrepareStmtRequest(desc, 0); + stmtCache.put(desc, holder); } - return new WebPreparedStatementImpl<>(holder.typeToken, holder.numParams, holder.statementId); + return new WebPreparedStatementImpl<>(holder.getTypeToken(), holder.getNumParams(), holder.getStatementId()); } // package-private for testing - <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc) + <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, final int invokationCount) throws DescriptorParsingException { + if (invokationCount > 1) { + // Initial invokation == 0, potential recovery-invocation == 1 + String msg = "Failed to recover from out-of-sync state with server"; + logger.log(Level.WARNING, msg); + throw new DescriptorParsingException(msg); + } String strDesc = desc.getDescriptor(); SharedStateId categoryId = getCategoryId(desc.getCategory()); NameValuePair nameParam = new BasicNameValuePair("query-descriptor", @@ -842,10 +970,10 @@ // representation of category IDs changed. Thus, be sure to // clear the category state and get their new IDs. String msg = "Preparing statement failed. Server changed category state. Clearing category ID for statement: " + - desc.getDescriptor(); - logger.log(Level.WARNING, msg); + desc.getDescriptor() + " and trying to recover."; + logger.log(Level.FINE, msg); sendCategoryReRegistrationRequest(desc.getCategory()); - throw new RetryableDescriptorParsingException(msg); + return sendPrepareStmtRequest(desc, invokationCount + 1); } default: { // Common case where stmtId is the actual ID of the statement @@ -862,7 +990,8 @@ } } - private synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) { + // package private for testing + synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) { // There are two possible cases. Category is an aggregate category or // it is not. For aggregate categories we need to re-register the // original first and then the aggregate category. @@ -875,21 +1004,6 @@ categoryIds.remove(category); registerCategory(category); } - - // Container used for parameter caching in order to avoid unneccessary - // network overhead. - static class WebPreparedStatementHolder { - - private final Type typeToken; - private final int numParams; - private final SharedStateId statementId; - - WebPreparedStatementHolder(Type typeToken, int numParams, SharedStateId statementId) { - this.typeToken = typeToken; - this.numParams = numParams; - this.statementId = statementId; - } - } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/ExpirableWebPreparedStatementCacheTest.java Mon Dec 01 10:43:12 2014 +0100 @@ -0,0 +1,110 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; + +import com.redhat.thermostat.storage.core.Category; +import com.redhat.thermostat.storage.core.StatementDescriptor; +import com.redhat.thermostat.web.common.SharedStateId; + +public class ExpirableWebPreparedStatementCacheTest { + + private static final UUID SERVER_TOKEN = UUID.randomUUID(); + private static final SharedStateId STMT_ID = new SharedStateId(3, SERVER_TOKEN); + private WebPreparedStatementCache cache; + private StatementDescriptor<?> desc; + + @SuppressWarnings("unchecked") + @Before + public void setup() { + cache = new WebPreparedStatementCache(); + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + when(holder.getStatementId()).thenReturn(STMT_ID); + desc = new StatementDescriptor<>(mock(Category.class), "test-desc"); + cache.put(desc, holder); + } + + @Test + public void testGetByDesc() { + long expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS); + ExpirableWebPreparedStatementCache expCache = new ExpirableWebPreparedStatementCache(cache, expires); + assertNotNull("should not yet be expired", expCache.get(desc)); + @SuppressWarnings("unchecked") + StatementDescriptor<?> notExisting = new StatementDescriptor<>(mock(Category.class), "testing"); + assertNull("entry does not exist", expCache.get(notExisting)); + + expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS); + expCache = new ExpirableWebPreparedStatementCache(cache, expires); + assertNull("should have expired", expCache.get(desc)); + } + + @Test + public void testGetById() { + long expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS); + ExpirableWebPreparedStatementCache expCache = new ExpirableWebPreparedStatementCache(cache, expires); + assertNotNull("should not yet be expired", expCache.get(STMT_ID)); + assertNull("entry does not exist", expCache.get(new SharedStateId(3, UUID.randomUUID()))); + + expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS); + expCache = new ExpirableWebPreparedStatementCache(cache, expires); + assertNull("should have expired", expCache.get(STMT_ID)); + } + + @Test + public void testIsExpired() { + long expires = System.nanoTime() - TimeUnit.NANOSECONDS.convert(20, TimeUnit.MILLISECONDS); + ExpirableWebPreparedStatementCache cache = new ExpirableWebPreparedStatementCache(null, expires); + assertTrue(cache.isExpired()); + + expires = System.nanoTime() + TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS); + cache = new ExpirableWebPreparedStatementCache(null, expires); + assertFalse(cache.isExpired()); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebPreparedStatementCacheTest.java Mon Dec 01 10:43:12 2014 +0100 @@ -0,0 +1,164 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import java.util.UUID; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.redhat.thermostat.storage.core.Category; +import com.redhat.thermostat.storage.core.StatementDescriptor; +import com.redhat.thermostat.storage.model.Pojo; +import com.redhat.thermostat.web.common.SharedStateId; + +public class WebPreparedStatementCacheTest { + + private WebPreparedStatementCache cache; + + @Before + public void setup() { + cache = new WebPreparedStatementCache(); + } + + @Test + public void testPut() { + SharedStateId id = new SharedStateId(3, UUID.randomUUID()); + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + when(holder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<TestPojo> cat = mock(Category.class); + StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat, + "something"); + cache.put(foo, holder); + assertSame(holder, cache.get(foo)); + assertSame(foo, cache.get(id)); + WebPreparedStatementHolder holder2 = mock(WebPreparedStatementHolder.class); + when(holder2.getStatementId()).thenReturn(id); + cache.put(foo, holder2); + assertNotSame(holder, cache.get(foo)); + assertSame(holder2, cache.get(foo)); + assertSame(foo, cache.get(id)); + + // Different server token should not result in an look-up-by-id clash. + SharedStateId id2 = new SharedStateId(3, UUID.randomUUID()); + assertNull(cache.get(id2)); + } + + @Test + public void testGetByDesc() { + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + SharedStateId id = mock(SharedStateId.class); + when(holder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<TestPojo> cat = mock(Category.class); + StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat, + "something"); + cache.put(foo, holder); + assertEquals(holder, cache.get(foo)); + StatementDescriptor<TestPojo> other = new StatementDescriptor<>(cat, + "something-else"); + assertNull(cache.get(other)); + } + + @Test + public void testGetById() { + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + SharedStateId id = mock(SharedStateId.class); + when(holder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<TestPojo> cat = mock(Category.class); + StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat, + "something"); + cache.put(foo, holder); + assertEquals(foo, cache.get(id)); + assertNull(cache.get(mock(SharedStateId.class))); + } + + @Test + public void testRemove() { + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + SharedStateId id = mock(SharedStateId.class); + when(holder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<TestPojo> cat = mock(Category.class); + StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat, + "something"); + cache.put(foo, holder); + assertEquals(holder, cache.get(foo)); + + cache.remove(id); + assertNull(cache.get(foo)); + assertNull(cache.get(id)); + } + + /** + * Verify that we are able to create a snapshot of a cache in order to be + * able to look up old ID mappings. + */ + @Test + public void verifyCreateSnapshot() { + WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + SharedStateId id = mock(SharedStateId.class); + when(holder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<TestPojo> cat = mock(Category.class); + StatementDescriptor<TestPojo> foo = new StatementDescriptor<>(cat, + "something"); + cache.put(foo, holder); + assertEquals(holder, cache.get(foo)); + assertNotNull(cache.get(id)); + + WebPreparedStatementCache copy = cache.createSnapshot(); + + cache.remove(id); + + assertNull(cache.get(foo)); + assertNull(cache.get(id)); + + assertNotNull(copy.get(foo)); + assertNotNull(copy.get(id)); + } + + private static class TestPojo implements Pojo { + // nothing + } +}
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Sun Dec 07 22:26:27 2014 -0700 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Mon Dec 01 10:43:12 2014 +0100 @@ -41,12 +41,15 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -58,9 +61,11 @@ import java.net.URLDecoder; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -90,6 +95,7 @@ import com.redhat.thermostat.storage.core.BackingStorage; import com.redhat.thermostat.storage.core.Categories; import com.redhat.thermostat.storage.core.Category; +import com.redhat.thermostat.storage.core.CategoryAdapter; import com.redhat.thermostat.storage.core.Connection.ConnectionListener; import com.redhat.thermostat.storage.core.Connection.ConnectionStatus; import com.redhat.thermostat.storage.core.Cursor; @@ -103,6 +109,7 @@ import com.redhat.thermostat.storage.core.StorageCredentials; import com.redhat.thermostat.storage.core.StorageException; import com.redhat.thermostat.storage.core.experimental.BatchCursor; +import com.redhat.thermostat.storage.model.AggregateResult; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.test.FreePortFinder; import com.redhat.thermostat.test.FreePortFinder.TryPort; @@ -404,6 +411,57 @@ 3, stmtId.getId()); } + @Test + public void prepareStatementCachesStatements2() throws DescriptorParsingException { + WebPreparedStatementCache cache = mock(WebPreparedStatementCache.class); + final WebPreparedStatementHolder holder = mock(WebPreparedStatementHolder.class); + WebStorage testStorage = new WebStorage(cache, null) { + <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, final int invokationCount) + throws DescriptorParsingException { + if (invokationCount != 0) { + throw new AssertionError("expected invokation count 0 but was: " + invokationCount); + } + return holder; + } + }; + String strDesc = "QUERY test WHERE 'property1' = ?s"; + StatementDescriptor<TestObj> desc = new StatementDescriptor<>(category, strDesc); + testStorage.prepareStatement(desc); + verify(cache).get(desc); + verify(cache).put(desc, holder); + } + + @Test + public void verifySendCategoryReRegistrationRequest() { + @SuppressWarnings("unchecked") + Map<Category<?>, SharedStateId> categoryMap = mock(Map.class); + final List<Category<?>> interceptedCategories = new ArrayList<>(); + WebStorage testStorage = new WebStorage(categoryMap) { + @Override + public void registerCategory(Category<?> category) throws StorageException { + interceptedCategories.add(category); + } + }; + // verify aggregate categories + Category<TestAggregate> aggregateCategory = new CategoryAdapter<TestObj, TestAggregate>(category).getAdapted(TestAggregate.class); + testStorage.sendCategoryReRegistrationRequest(aggregateCategory); + verify(categoryMap).remove(aggregateCategory); + verify(categoryMap).remove(category); + assertEquals(2, interceptedCategories.size()); + assertEquals("Expected actual category to be re-registered first", category, interceptedCategories.get(0)); + assertEquals("Expected aggregate category to be re-registered second", aggregateCategory, interceptedCategories.get(1)); + + interceptedCategories.clear(); + + // verify regular categories + testStorage.sendCategoryReRegistrationRequest(category); + // earlier test above did invoke it already once + verify(categoryMap, times(2)).remove(category); + assertEquals(1, interceptedCategories.size()); + assertEquals(category, interceptedCategories.get(0)); + verifyNoMoreInteractions(categoryMap); + } + /** * Tests a query which returns results in a single batch. * @@ -945,6 +1003,155 @@ assertTrue(listener.disconnectEvent); } + @Test + public void verifyDoWriteExecuteMorethanOne() { + try { + storage.doWriteExecute(null, 2); + fail("Expected excecution exception since invoked count > 1"); + } catch (StatementExecutionException e) { + Throwable cause = e.getCause(); + // pass + assertEquals("Failed to recover from out-of-sync state with server", cause.getMessage()); + } + } + + @Test + public void verifyDoExecuteQueryMorethanOne() { + try { + storage.doExecuteQuery(null, null, 2); + fail("Expected descriptor parsing exception since invoked count > 1"); + } catch (StatementExecutionException e) { + Throwable cause = e.getCause(); + // pass + assertEquals("Failed to recover from out-of-sync state with server", cause.getMessage()); + } + } + + @Test + public void verifySendPreparedStatementRequestMoreThanOne() { + try { + storage.sendPrepareStmtRequest(null, 2); + fail("Expected descriptor parsing exception since invoked count > 1"); + } catch (DescriptorParsingException e) { + // pass + assertEquals("Failed to recover from out-of-sync state with server", e.getMessage()); + } + } + + /** + * Test the base case in {@link WebStorage#handlePreparedStmtStateOutOfSync(WebPreparedStatement)}. + * with null transition cache and non-null statement cache. This simulates the + * case where state got out of sync and handlePreparedStmtStateOutOfSync() is + * called the first time after that out-of-sync-event happened. In that case + * it is expected for prepareStatement() and sendCategoryReRegistrationRequest() + * to be called. + * + * @throws DescriptorParsingException + * + */ + @Test + public void testHandleStatementStateOutOfSyncBaseCase() throws DescriptorParsingException { + WebPreparedStatementHolder mockHolder = mock(WebPreparedStatementHolder.class); + SharedStateId id = new SharedStateId(300, UUID.randomUUID()); + when(mockHolder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<Pojo> foo = mock(Category.class); + StatementDescriptor<Pojo> desc = new StatementDescriptor<>(foo, "testing"); + WebPreparedStatementCache stmtCache = mock(WebPreparedStatementCache.class); + // this is called twice. Once for creating the snapshot and another time + // for getting the descriptor. + when(stmtCache.get(id)).thenReturn(desc).thenReturn(desc); + final boolean[] sendCategoryReRegistrationRequest = new boolean[1]; + final boolean[] prepareStatement = new boolean[1]; + final WebPreparedStatement<?> newStmt = mock(WebPreparedStatement.class); + WebStorage webStorage = new WebStorage(stmtCache, null) { + + @Override + protected synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) { + sendCategoryReRegistrationRequest[0] = true; + } + + @SuppressWarnings("unchecked") + @Override + public <T extends Pojo> PreparedStatement<T> prepareStatement(StatementDescriptor<T> desc) + throws DescriptorParsingException { + prepareStatement[0] = true; + return (PreparedStatement<T>)newStmt; + } + }; + @SuppressWarnings("unchecked") + WebPreparedStatement<Pojo> mockStmt = mock(WebPreparedStatement.class); + PreparedParameters mockParams = new PreparedParameters(3); + when(mockStmt.getParams()).thenReturn(mockParams); + when(mockStmt.getStatementId()).thenReturn(id); + + WebPreparedStatementCache stmtCacheSnapshot = mock(WebPreparedStatementCache.class); + when(stmtCache.createSnapshot()).thenReturn(stmtCacheSnapshot); + webStorage.handlePreparedStmtStateOutOfSync(mockStmt); + verify(stmtCache).createSnapshot(); + verify(stmtCache).get(id); + verify(stmtCache).remove(id); + assertTrue("expected sendCategoryReRegistrationRequest() to be called", sendCategoryReRegistrationRequest[0]); + assertTrue("expected prepareStatement() to be called", prepareStatement[0]); + verify(newStmt).setParams(mockParams); + verifyNoMoreInteractions(stmtCache); + } + + /** + * Test the transition case in {@link WebStorage#handlePreparedStmtStateOutOfSync(WebPreparedStatement)}. + * with non-null transition cache and non-null statement cache. + * + * This simulates the case where state got out of sync and handlePreparedStmtStateOutOfSync() is + * called <strong>not</strong> the first time after an out-of-sync-event happened. I.e. + * the base-case path has been entered first when a similar statement tried + * to execute, in turn, getting the statement id removed from the main + * statement cache. + * + * In that case it is expected for the transition cache to become active + * allowing the statement to execute successfully. + * + * @throws DescriptorParsingException + * + */ + @Test + public void testHandleStatementStateOutOfSyncTransitionCase() throws DescriptorParsingException { + WebPreparedStatementHolder mockHolder = mock(WebPreparedStatementHolder.class); + SharedStateId id = new SharedStateId(300, UUID.randomUUID()); + when(mockHolder.getStatementId()).thenReturn(id); + @SuppressWarnings("unchecked") + Category<Pojo> foo = mock(Category.class); + StatementDescriptor<Pojo> desc = new StatementDescriptor<>(foo, "testing"); + WebPreparedStatementCache stmtCache = mock(WebPreparedStatementCache.class); + // no setup for the id in stmtCache, however the transitionCache, + // a snapshot cache - created when the first call to handlePreparedStatementStateOutOfSync() + // came in - still "knows" about this record. + ExpirableWebPreparedStatementCache transitionCache = mock(ExpirableWebPreparedStatementCache.class); + when(transitionCache.isExpired()).thenReturn(false); + when(transitionCache.get(id)).thenReturn(desc); + when(transitionCache.get(desc)).thenReturn(mockHolder); + SharedStateId updatedId = new SharedStateId(301, UUID.randomUUID()); + WebPreparedStatementHolder newHolder = mock(WebPreparedStatementHolder.class); + when(mockHolder.getStatementId()).thenReturn(id); + // called twice. once for equality check. once for getting the id and + // using it to update the prepared statement id. + when(newHolder.getStatementId()).thenReturn(updatedId).thenReturn(updatedId); + when(stmtCache.get(desc)).thenReturn(newHolder); + WebStorage webStorage = new WebStorage(stmtCache, transitionCache); + @SuppressWarnings("unchecked") + WebPreparedStatement<Pojo> mockStmt = mock(WebPreparedStatement.class); + when(mockStmt.getStatementId()).thenReturn(id); + WebPreparedStatement<Pojo> result = webStorage.handlePreparedStmtStateOutOfSync(mockStmt); + verify(mockStmt).setStatementId(updatedId); + assertSame(mockStmt, result); + verify(stmtCache).get(id); + verify(stmtCache).get(desc); + verify(transitionCache).isExpired(); + verify(transitionCache).get(id); + verify(transitionCache).get(desc); + verifyNoMoreInteractions(stmtCache); + verifyNoMoreInteractions(transitionCache); + } + static class MyListener implements ConnectionListener { private CountDownLatch latch; @@ -1000,7 +1207,7 @@ } @Override - <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc) + <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, int invokationCounter) throws DescriptorParsingException { int numParams = counter++; int stmtId = counter++; @@ -1008,5 +1215,9 @@ return new WebPreparedStatementHolder(TestObj.class, numParams, id); } } + + private static class TestAggregate implements AggregateResult { + // nothing + } }
--- a/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java Sun Dec 07 22:26:27 2014 -0700 +++ b/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java Mon Dec 01 10:43:12 2014 +0100 @@ -395,6 +395,7 @@ .getParsedStatement(); response.setNumFreeVariables(parsed.getNumParams()); response.setStatementId(stmtId); + logger.log(Level.INFO, "Server: prepare-statement: stmt: " + desc + " got assigned id: " + stmtId.getId()); writeResponse(resp, response, WebPreparedStatementResponse.class); } } @@ -835,7 +836,7 @@ // perform the patching of the target statement. targetStatement = (DataModifyingStatement<T>)parsed.patchStatement(params); } catch (IllegalPatchException e) { - logger.log(Level.INFO, "Failed to execute write", e); + logger.log(Level.INFO, "Failed to execute write. Stmt id was: " + stmtId, e); writeResponse(resp, PreparedStatementResponseCode.ILLEGAL_PATCH, int.class); return; }