Mercurial > hg > release > thermostat-1.2
changeset 1538:fb69a13dd11c
Implement fetching of results in batches for WebStorage.
Reviewed-by: vanaltj
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2014-October/011353.html
PR2021
line wrap: on
line diff
--- a/storage/core/pom.xml Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/core/pom.xml Mon Oct 27 18:46:26 2014 +0100 @@ -65,6 +65,7 @@ <Export-Package> com.redhat.thermostat.storage.connect, com.redhat.thermostat.storage.core, + com.redhat.thermostat.storage.core.experimental, com.redhat.thermostat.storage.core.auth, com.redhat.thermostat.storage.config, com.redhat.thermostat.storage.model,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/experimental/BasicBatchCursor.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,57 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.storage.core.experimental; + +import com.redhat.thermostat.storage.model.Pojo; + +public abstract class BasicBatchCursor<T extends Pojo> implements BatchCursor<T> { + + private Integer batchSize; + + @Override + public void setBatchSize(int n) throws IllegalArgumentException { + if (n <= 0) { + throw new IllegalArgumentException("Batch size must be > 0"); + } + this.batchSize = n; + } + + @Override + public int getBatchSize() { + return this.batchSize == null ? BatchCursor.DEFAULT_BATCH_SIZE : this.batchSize; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/experimental/BatchCursor.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,73 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.storage.core.experimental; + +import com.redhat.thermostat.storage.core.Cursor; +import com.redhat.thermostat.storage.model.Pojo; + +public interface BatchCursor<T extends Pojo> extends Cursor<T> { + + public static final int DEFAULT_BATCH_SIZE = 100; + + /** + * <p> + * Sets the configured batch size when retrieving more elements from the + * database. That is, no more elements will be loaded into memory than the + * configured batch size. Note that the new batch size will only take effect + * once the current batch is exhausted. + * </p> + * <p> + * The default batch size is 100. + * </p> + * + * @param n + * The number of results to fetch from storage in a single batch. + * @return A cursor with the configured batch size. + * @throws IllegalArgumentException + * If {@code n} is < 1 + */ + void setBatchSize(int n) throws IllegalArgumentException; + + /** + * + * @return The configured batch size set via {@link setBatchSize} or + * {@link BatchCursor#DEFAULT_BATCH_SIZE} if it was never set + * explicitly. + */ + int getBatchSize(); + +}
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/model/AggregateCount.java Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/model/AggregateCount.java Mon Oct 27 18:46:26 2014 +0100 @@ -42,6 +42,8 @@ import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.Entity; import com.redhat.thermostat.storage.core.Persist; +import com.redhat.thermostat.storage.core.experimental.BasicBatchCursor; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; /** * Model class for aggregate counts. @@ -78,10 +80,10 @@ @SuppressWarnings("unchecked") public <T extends Pojo> Cursor<T> getCursor() { - return (Cursor<T>) new AggregateCursor<>(this); + return (BatchCursor<T>) new AggregateCursor<>(this); } - private static class AggregateCursor<T extends Pojo> implements Cursor<T> { + private static class AggregateCursor<T extends Pojo> extends BasicBatchCursor<T> { private boolean available = true; private final T count;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/experimental/BasicBatchCursorTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,98 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.storage.core.experimental; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import org.junit.Test; + +import com.redhat.thermostat.storage.model.Pojo; + +public class BasicBatchCursorTest { + + @Test + public void testSetBatchSize() { + BasicBatchCursor<TestPojo> cursor = new BasicBatchCursorImpl<>(); + try { + cursor.setBatchSize(-1); + fail("expected IAE for batch size of -1"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + try { + cursor.setBatchSize(0); + fail("expected IAE for batch size of 0"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + cursor.setBatchSize(BatchCursor.DEFAULT_BATCH_SIZE); + assertEquals(BatchCursor.DEFAULT_BATCH_SIZE, cursor.getBatchSize()); + } + + @Test + public void testGetBatchSize() { + BasicBatchCursor<TestPojo> cursor = new BasicBatchCursorImpl<>(); + assertNotNull("should always return default if never set", cursor.getBatchSize()); + assertEquals(BatchCursor.DEFAULT_BATCH_SIZE, cursor.getBatchSize()); + cursor.setBatchSize(3); + assertEquals(3, cursor.getBatchSize()); + } + + private static class BasicBatchCursorImpl<T extends Pojo> extends BasicBatchCursor<T> { + + @Override + public boolean hasNext() { + // not implemented + return false; + } + + @Override + public T next() { + // not implemented + return null; + } + + } + + private static class TestPojo implements Pojo { + // nothing + } +}
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/model/AggregateCountTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/core/src/test/java/com/redhat/thermostat/storage/model/AggregateCountTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -46,6 +46,7 @@ import org.junit.Test; import com.redhat.thermostat.storage.core.Cursor; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; public class AggregateCountTest { @@ -65,5 +66,20 @@ // pass } } + + /** + * Setting the batch size for single result lists should be no-op. + * This just makes sure that nothing bad happens (no exceptions being thrown) + */ + @Test + public void testCursorBatchSize() { + AggregateCount count = new AggregateCount(); + count.setCount(3); + Cursor<AggregateCount> cursor = count.getCursor(); + BatchCursor<AggregateCount> advCursor = (BatchCursor<AggregateCount>)cursor; + advCursor.setBatchSize(500); + AggregateCount result = advCursor.next(); + assertEquals(3, result.getCount()); + } }
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoCursor.java Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoCursor.java Mon Oct 27 18:46:26 2014 +0100 @@ -39,11 +39,12 @@ import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoException; -import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.StorageException; +import com.redhat.thermostat.storage.core.experimental.BasicBatchCursor; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.model.Pojo; -class MongoCursor<T extends Pojo> implements Cursor<T> { +class MongoCursor<T extends Pojo> extends BasicBatchCursor<T> { private DBCursor cursor; private Class<T> resultClass; @@ -67,6 +68,8 @@ try { DBObject next = cursor.next(); if (next == null) { + // FIXME: This is inconsistent with other cursors throwing + // NoSuchElementException return null; } MongoPojoConverter converter = new MongoPojoConverter(); @@ -76,5 +79,11 @@ } } + @Override + public void setBatchSize(int n) throws IllegalArgumentException { + super.setBatchSize(n); // validates input + cursor.batchSize(super.getBatchSize()); + } + }
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java Mon Oct 27 18:46:26 2014 +0100 @@ -38,7 +38,6 @@ import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,6 +79,7 @@ import com.redhat.thermostat.storage.core.StorageCredentials; import com.redhat.thermostat.storage.core.StorageException; import com.redhat.thermostat.storage.core.Update; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.model.AggregateCount; import com.redhat.thermostat.storage.model.AggregateResult; import com.redhat.thermostat.storage.model.Pojo; @@ -526,7 +526,9 @@ dbCursor = coll.find(); } dbCursor = applySortAndLimit(mongoQuery, dbCursor); - return new MongoCursor<T>(dbCursor, resultClass); + BatchCursor<T> mongoCursor = new MongoCursor<T>(dbCursor, resultClass); + mongoCursor.setBatchSize(BatchCursor.DEFAULT_BATCH_SIZE); + return mongoCursor; } catch (MongoException me) { throw new StorageException(me); }
--- a/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoCursorTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/storage/mongo/src/test/java/com/redhat/thermostat/storage/mongodb/internal/MongoCursorTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -40,21 +40,24 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import com.mongodb.BasicDBObject; import com.mongodb.DBCursor; import com.mongodb.DBObject; -import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.Entity; import com.redhat.thermostat.storage.core.Persist; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.model.BasePojo; public class MongoCursorTest { @@ -105,7 +108,7 @@ } private DBCursor dbCursor; - private Cursor<TestClass> cursor; + private BatchCursor<TestClass> cursor; @Before public void setUp() { @@ -118,12 +121,12 @@ value2.put("key4", "test4"); dbCursor = mock(DBCursor.class); + when(dbCursor.batchSize(BatchCursor.DEFAULT_BATCH_SIZE)).thenReturn(dbCursor); when(dbCursor.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); when(dbCursor.next()).thenReturn(value1).thenReturn(value2).thenReturn(null); when(dbCursor.sort(any(DBObject.class))).thenReturn(dbCursor); when(dbCursor.limit(anyInt())).thenReturn(dbCursor); cursor = new MongoCursor<TestClass>(dbCursor, TestClass.class); - } @After @@ -134,7 +137,6 @@ @Test public void verifySimpleCursor() { - assertTrue(cursor.hasNext()); TestClass obj1 = cursor.next(); assertEquals("test1", obj1.getKey1()); @@ -148,6 +150,29 @@ assertFalse(cursor.hasNext()); assertNull(cursor.next()); } + + @Test + public void testBatchSize() { + DBCursor mongoCursor = mock(DBCursor.class); + BatchCursor<TestClass> mC = new MongoCursor<>(mongoCursor, TestClass.class); + try { + mC.setBatchSize(-1); + fail("expected IAE for batch size of -1"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + try { + mC.setBatchSize(0); + fail("expected IAE for batch size of 0"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + mC.setBatchSize(333); + verify(mongoCursor).batchSize(333); + Mockito.verifyNoMoreInteractions(mongoCursor); + } }
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebCursor.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebCursor.java Mon Oct 27 18:46:26 2014 +0100 @@ -37,35 +37,90 @@ package com.redhat.thermostat.web.client.internal; +import java.lang.reflect.Type; import java.util.NoSuchElementException; +import java.util.logging.Level; +import java.util.logging.Logger; -import com.redhat.thermostat.storage.core.Cursor; +import com.redhat.thermostat.common.utils.LoggingUtils; +import com.redhat.thermostat.storage.core.StorageException; +import com.redhat.thermostat.storage.core.experimental.BasicBatchCursor; import com.redhat.thermostat.storage.model.Pojo; +import com.redhat.thermostat.web.common.PreparedStatementResponseCode; +import com.redhat.thermostat.web.common.WebPreparedStatement; +import com.redhat.thermostat.web.common.WebQueryResponse; -class WebCursor<T extends Pojo> implements Cursor<T> { +class WebCursor<T extends Pojo> extends BasicBatchCursor<T> { + + private static final Logger logger = LoggingUtils.getLogger(WebCursor.class); - private T[] data; - private int index; + private final Type parametrizedTypeToken; + private final WebStorage storage; + private final int cursorId; + private final WebPreparedStatement<T> stmt; + private int batchIndex; + private T[] dataBatch; + private boolean hasMoreBatches; - WebCursor(T[] data) { - this.data = data; - index = 0; + // Main constructor called from doQueryExecute() + WebCursor(WebStorage storage, T[] dataBatch, boolean hasMoreBatches, int cursorId, Type parametrizedTypeToken, WebPreparedStatement<T> stmt) { + this.storage = storage; + this.cursorId = cursorId; + this.parametrizedTypeToken = parametrizedTypeToken; + this.stmt = stmt; + this.hasMoreBatches = hasMoreBatches; + this.dataBatch = dataBatch; + this.batchIndex = 0; } @Override public boolean hasNext() { - return index < data.length; + return batchIndex < dataBatch.length || hasMoreBatches; } @Override public T next() { - if (index >= data.length) { + if (batchIndex >= dataBatch.length && !hasMoreBatches) { throw new NoSuchElementException(); } - T result = data[index]; - index++; + T result = null; + // Check if we have still results left in batch, + // if not fetch a new batch. + if (batchIndex >= dataBatch.length) { + assert(hasMoreBatches); + // This updates batchIndex, dataBatch and + // hasMoreBatches + fetchBatchFromStorage(); + assert(batchIndex == 0); + assert(dataBatch.length > 0); + } + result = dataBatch[batchIndex]; + batchIndex++; return result; } + private void fetchBatchFromStorage() throws StorageException { + logger.log(Level.FINEST, "Getting more results for cursorId: " + cursorId); + WebQueryResponse<T> nextBatchResponse = storage.getMore(cursorId, parametrizedTypeToken, getBatchSize(), stmt); + switch(nextBatchResponse.getResponseCode()) { + case PreparedStatementResponseCode.QUERY_SUCCESS: + this.batchIndex = 0; + this.hasMoreBatches = nextBatchResponse.hasMoreBatches(); + this.dataBatch = nextBatchResponse.getResultList(); + break; + case PreparedStatementResponseCode.GET_MORE_NULL_CURSOR: + // Advise user about potentially timed-out cursor + String msg = "[get-more] Failed to get more results for cursorId: " + cursorId + + " This may be caused because the cursor timed out." + + " Resubmitting the original query might be an approach to fix it." + + " See server logs for more details."; + throw new StorageException(msg); + default: + msg = "[get-more] Failed to get more results for cursorId: " + cursorId + + ". See server logs for details."; + throw new StorageException(msg); + } + } + }
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java Mon Oct 27 18:46:26 2014 +0100 @@ -576,8 +576,10 @@ throw new StatementExecutionException(e); } if (qResp.getResponseCode() == PreparedStatementResponseCode.QUERY_SUCCESS) { - T[] result = qResp.getResultList(); - return new WebCursor<T>(result); + // Return an empty cursor + return new WebCursor<T>(this, qResp.getResultList(), + qResp.hasMoreBatches(), + qResp.getCursorId(), parametrizedTypeToken, stmt); } else if (qResp.getResponseCode() == PreparedStatementResponseCode.ILLEGAL_PATCH) { String msg = "Illegal statement argument. See server logs for details."; IllegalArgumentException iae = new IllegalArgumentException(msg); @@ -587,13 +589,46 @@ // We only handle success responses and illegal patches, like // we do for other storages. This is just a defensive measure in // order to fail early in case something unexpected comes back. - String msg = "Unknown response from storage endpoint!"; + String msg = "[query-execute] Unknown response from storage endpoint!"; IllegalStateException ise = new IllegalStateException(msg); throw new StatementExecutionException(ise); } } /** + * This method gets called from WebCursor in order to fetch more results + * or refresh the result set since parameters like limit or skip have + * changed since the original result set was fetched. + * + * @param cursorId + * @param parametrizedTypeToken The type token for the data class (Pojo). + * @param batchSize The desired batchSize or null. null means that the user + * did not set an explicit batch size. + * @param limit The desired limit for this cursor or null. null means that + * a user did not set an explicit limit. + * @param skip The desired skip value or null. null means no skip value has + * been specified by the user. + * @return + */ + <T extends Pojo> WebQueryResponse<T> getMore(int cursorId, Type parametrizedTypeToken, Integer batchSize, WebPreparedStatement<T> stmt) { + NameValuePair preparedStmtIdParam = new BasicNameValuePair("prepared-stmt-id", Integer.toString(stmt.getStatementId())); + NameValuePair cursorIdParam = new BasicNameValuePair("cursor-id", Integer.toString(cursorId)); + NameValuePair batchSizeParam = new BasicNameValuePair("batch-size", batchSize.toString()); + + List<NameValuePair> formparams = Arrays.asList(preparedStmtIdParam, + cursorIdParam, + batchSizeParam); + WebQueryResponse<T> qResp = null; + try (CloseableHttpEntity entity = post(endpoint + "/get-more", formparams)) { + Reader reader = getContentAsReader(entity); + qResp = gson.fromJson(reader, parametrizedTypeToken); + } catch (Exception e) { + throw new StorageException(e); + } + return qResp; + } + + /** * Executes a prepared write * * @param stmt @@ -800,5 +835,6 @@ } } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebCursorTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,218 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.client.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Type; +import java.util.NoSuchElementException; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.redhat.thermostat.storage.core.StorageException; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; +import com.redhat.thermostat.web.common.PreparedStatementResponseCode; +import com.redhat.thermostat.web.common.WebPreparedStatement; +import com.redhat.thermostat.web.common.WebQueryResponse; + +public class WebCursorTest { + + private WebStorage storage; + private int cursorId; + private Type fakeType; + private WebPreparedStatement<TestObj> stmt; + + @SuppressWarnings("unchecked") + @Before + public void setup() { + storage = mock(WebStorage.class); + cursorId = 4441; + fakeType = mock(Type.class); + stmt = mock(WebPreparedStatement.class); + } + + @Test + public void testHasNext() { + boolean hasMoreBatches = false; + TestObj[] dataBatch = new TestObj[] { }; + WebCursor<TestObj> cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + assertFalse("no results and no more batches", cursor.hasNext()); + hasMoreBatches = true; + cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + assertTrue("no results, but more batches", cursor.hasNext()); + TestObj o1 = new TestObj(); + dataBatch = new TestObj[] { o1 }; + hasMoreBatches = false; + cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + assertTrue("one result, no more batches", cursor.hasNext()); + } + + @Test + public void testNext() { + // test empty results and no more batches + boolean hasMoreBatches = false; + TestObj[] dataBatch = new TestObj[] { }; + WebCursor<TestObj> cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + try { + cursor.next(); + fail("no results and no more batches, expected NSEE"); + } catch (NoSuchElementException e) { + // pass + } + + // test empty results but more batches + hasMoreBatches = true; + cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + WebQueryResponse<TestObj> response = new WebQueryResponse<>(); + response.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + response.setCursorId(cursorId); + response.setHasMoreBatches(false); + TestObj o1 = new TestObj(); + o1.setProperty1("next-test"); + response.setResultList(new TestObj[] { o1 } ); + WebQueryResponse<TestObj> second = new WebQueryResponse<>(); + assertNull(second.getResultList()); + // be sure to return a bad result should storage.getMore() be called + // more than once + when(storage.getMore(cursorId, fakeType, BatchCursor.DEFAULT_BATCH_SIZE, stmt)).thenReturn(response).thenReturn(second); + TestObj actual = cursor.next(); + assertEquals("next-test", actual.getProperty1()); + + // test non-empty results and no more batches + hasMoreBatches = false; + o1.setAgentId("foo-agent-123"); + dataBatch = new TestObj[] { o1 }; + cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + actual = cursor.next(); + assertEquals("foo-agent-123", actual.getAgentId()); + } + + /** + * Tests next() calls where get-more fails due to an expired or missing + * cursor on the web endpoint. + */ + @Test + public void testNextGetMoreBadCursorFailure() { + boolean hasMoreBatches = true; + TestObj[] dataBatch = new TestObj[] { }; + WebCursor<TestObj> cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + WebQueryResponse<TestObj> response = new WebQueryResponse<>(); + response.setResponseCode(PreparedStatementResponseCode.GET_MORE_NULL_CURSOR); + response.setCursorId(cursorId); + response.setHasMoreBatches(false); + when(storage.getMore(cursorId, fakeType, BatchCursor.DEFAULT_BATCH_SIZE, stmt)).thenReturn(response); + try { + cursor.next(); + fail("Expected StorageException to be thrown"); + } catch (StorageException e) { + assertEquals("[get-more] Failed to get more results for cursorId: 4441" + + " This may be caused because the cursor timed out. " + + "Resubmitting the original query might be an approach to fix it. " + + "See server logs for more details.", + e.getMessage()); + } + } + + /** + * Tests next() calls where get-more fails due to some unknown reason. + */ + @Test + public void testNextGenericGetMoreFailure() { + boolean hasMoreBatches = true; + TestObj[] dataBatch = new TestObj[] { }; + WebCursor<TestObj> cursor = new WebCursor<>(storage, dataBatch, hasMoreBatches, cursorId, fakeType, stmt); + WebQueryResponse<TestObj> response = new WebQueryResponse<>(); + response.setResponseCode(PreparedStatementResponseCode.QUERY_FAILURE); + response.setCursorId(cursorId); + response.setHasMoreBatches(false); + when(storage.getMore(cursorId, fakeType, BatchCursor.DEFAULT_BATCH_SIZE, stmt)).thenReturn(response); + try { + cursor.next(); + fail("Expected StorageException to be thrown"); + } catch (StorageException e) { + assertEquals("[get-more] Failed to get more results for cursorId: " + + "4441. See server logs for details.", + e.getMessage()); + } + } + + /** + * Verify that if a batch size is explicitly set it gets passed on to + * web storage on the next call to getMore. Default batch size is accounted + * for in other tests (e.g. testNext()). + */ + @Test + public void testSetBatchSize() { + boolean hasMoreBatches = true; + TestObj[] empty = new TestObj[] {}; + WebCursor<TestObj> cursor = new WebCursor<>(storage, empty, hasMoreBatches, cursorId, fakeType, stmt); + try { + cursor.setBatchSize(-1); + fail("expected IAE for batch size of -1"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + try { + cursor.setBatchSize(0); + fail("expected IAE for batch size of 0"); + } catch (IllegalArgumentException e) { + // pass + assertEquals("Batch size must be > 0", e.getMessage()); + } + cursor = new WebCursor<>(storage, empty, hasMoreBatches, cursorId, fakeType, stmt); + cursor.setBatchSize(128); + TestObj o1 = new TestObj(); + WebQueryResponse<TestObj> response = new WebQueryResponse<>(); + response.setResultList(new TestObj[] { o1 }); + response.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + when(storage.getMore(cursorId, fakeType, 128, stmt)).thenReturn(response); + cursor.next(); + verify(storage).getMore(cursorId, fakeType, 128, stmt); + Mockito.verifyNoMoreInteractions(storage); + } + +}
--- a/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/client/src/test/java/com/redhat/thermostat/web/client/internal/WebStorageTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -102,6 +102,7 @@ import com.redhat.thermostat.storage.core.StatementExecutionException; import com.redhat.thermostat.storage.core.StorageCredentials; import com.redhat.thermostat.storage.core.StorageException; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.test.FreePortFinder; import com.redhat.thermostat.test.FreePortFinder.TryPort; @@ -394,19 +395,162 @@ 3, stmtId); } + /** + * Tests a query which returns results in a single batch. + * + * By setting hasMoreBatches to false in WebQueryResponse we signal that + * there are no more batches available via getMore(). + * + * @see {@link #canPrepareAndExecuteQueryMultiBatchFailure()} + * @see {@link #canPrepareAndExecuteQueryMultiBatchSuccess()} + */ @Test - public void canPrepareAndExecuteQuery() throws UnsupportedEncodingException, IOException { + public void canPrepareAndExecuteQuerySingleBatch() { + WebQueryResponse<TestObj> fakeQueryResponse = new WebQueryResponse<>(); + fakeQueryResponse.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + fakeQueryResponse.setResultList(getTwoTestObjects()); + fakeQueryResponse.setCursorId(444); + // Setting this to false makes Cursor.hasNext() return false after the + // current result list is exhausted. + fakeQueryResponse.setHasMoreBatches(false); + Cursor<TestObj> results = doBasicPrepareAndExecuteQueryTest(fakeQueryResponse); + assertFalse(results.hasNext()); + try { + results.next(); + fail(); + } catch (NoSuchElementException ex) { + // Pass. + } + } + + /** + * Tests a query which returns results in multiple batches. The get-more + * call is successful in this test. + * + * By setting hasMoreBatches to true in WebQueryResponse we signal that + * there are more batches available via getMore(). + * + * @see {@link #canPrepareAndExecuteQueryMultiBatchFailure()} + */ + @Test + public void canPrepareAndExecuteQueryMultiBatchSuccess() { + WebQueryResponse<TestObj> fakeQueryResponse = new WebQueryResponse<>(); + fakeQueryResponse.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + fakeQueryResponse.setResultList(getTwoTestObjects()); + fakeQueryResponse.setCursorId(444); + // Setting this to true makes Cursor.hasNext() return true after the + // current result list is exhausted. + fakeQueryResponse.setHasMoreBatches(true); + // doBasicPrepareAndExecuteQueryTest performs two hasNext() and + // next() calls on the cursor. + Cursor<TestObj> results = doBasicPrepareAndExecuteQueryTest(fakeQueryResponse); + assertTrue("Expected cursor to return true, since there are more batches", results.hasNext()); + assertEquals("POST", method); + String path = requestURI.substring(requestURI.lastIndexOf('/')); + assertEquals("/query-execute", path); + + TestObj more = new TestObj(); + more.setProperty1("get-more-result"); + WebQueryResponse<TestObj> getMoreResults = new WebQueryResponse<>(); + getMoreResults.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + getMoreResults.setCursorId(444); + getMoreResults.setHasMoreBatches(true); // one more batch + getMoreResults.setResultList(new TestObj[] {more}); + final Gson gson = getQueryGson(); + prepareServer(gson.toJson(getMoreResults)); + // the following next() call performs the get-more request + // for which we had to prepare the server + TestObj returnedGetMore = results.next(); + assertEquals("POST", method); + path = requestURI.substring(requestURI.lastIndexOf('/')); + assertEquals("/get-more", path); + // Verify correctly passed parameters + String[] requestParams = requestBody.split("&"); + String prepStmtIdParam = requestParams[0]; + String cursorIdParam = requestParams[1]; + String batchSizeParam = requestParams[2]; + String[] prStmtArray = prepStmtIdParam.split("="); + String[] cursorIdArray = cursorIdParam.split("="); + String[] batchSizeArray = batchSizeParam.split("="); + assertEquals("prepared-stmt-id", prStmtArray[0]); + assertEquals("5", prStmtArray[1]); + assertEquals("cursor-id", cursorIdArray[0]); + assertEquals("444", cursorIdArray[1]); + assertEquals("batch-size", batchSizeArray[0]); + assertEquals(Integer.toString(BatchCursor.DEFAULT_BATCH_SIZE), batchSizeArray[1]); + + assertEquals("get-more-result", returnedGetMore.getProperty1()); + + + // Do it again, this time with a non-default batch size: 5 + + assertTrue(results instanceof BatchCursor); + BatchCursor<TestObj> advCursor = (BatchCursor<TestObj>)results; + advCursor.setBatchSize(5); + + WebQueryResponse<TestObj> getMoreResults2 = new WebQueryResponse<>(); + getMoreResults2.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + getMoreResults2.setCursorId(444); + getMoreResults2.setHasMoreBatches(false); // no more batches this time + getMoreResults2.setResultList(new TestObj[] { more }); + prepareServer(gson.toJson(getMoreResults2)); + advCursor.next(); + + path = requestURI.substring(requestURI.lastIndexOf('/')); + assertEquals("/get-more", path); + + String[] batchSizeParamPair = requestBody.split("&")[2].split("="); + assertEquals("batch-size", batchSizeParamPair[0]); + assertEquals("5", batchSizeParamPair[1]); + } + + /** + * Tests a query which returns results in multiple batches. The get-more + * call fails on the server side, though. + * + * @see {@link #canPrepareAndExecuteQueryMultiBatchSuccess()} + */ + @Test + public void canPrepareAndExecuteQueryMultiBatchFailure() { + WebQueryResponse<TestObj> fakeQueryResponse = new WebQueryResponse<>(); + fakeQueryResponse.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + fakeQueryResponse.setResultList(getTwoTestObjects()); + fakeQueryResponse.setCursorId(444); + fakeQueryResponse.setHasMoreBatches(true); + Cursor<TestObj> results = doBasicPrepareAndExecuteQueryTest(fakeQueryResponse); + assertTrue("Expected cursor to return true, since there are more batches", results.hasNext()); + + WebQueryResponse<TestObj> getMoreResults = new WebQueryResponse<>(); + getMoreResults.setResponseCode(PreparedStatementResponseCode.QUERY_FAILURE); + final Gson gson = getQueryGson(); + prepareServer(gson.toJson(getMoreResults)); + try { + results.next(); + fail(); // Expected storage exception + } catch (StorageException e) { + assertEquals("[get-more] Failed to get more results for cursorId: 444. See server logs for details.", e.getMessage()); + } + // Do it again with a generic failure code + getMoreResults.setResponseCode(-0xcafeBabe); // this should be unknown + prepareServer(gson.toJson(getMoreResults)); + try { + results.next(); + fail(); // Expected storage exception + } catch (StorageException e) { + assertEquals("[get-more] Failed to get more results for cursorId: 444. See server logs for details.", e.getMessage()); + } + } + + private TestObj[] getTwoTestObjects() { TestObj obj1 = new TestObj(); obj1.setProperty1("fluffor1"); TestObj obj2 = new TestObj(); obj2.setProperty1("fluffor2"); - Gson gson = new GsonBuilder() - .registerTypeAdapterFactory(new PojoTypeAdapterFactory()) - .registerTypeAdapterFactory(new WebPreparedStatementResponseTypeAdapterFactory()) - .registerTypeAdapterFactory(new WebQueryResponseTypeAdapterFactory()) - .registerTypeAdapterFactory(new PreparedParameterTypeAdapterFactory()) - .registerTypeAdapterFactory(new WebPreparedStatementTypeAdapterFactory()) - .create(); + return new TestObj[] { obj1, obj2 }; + } + + private Cursor<TestObj> doBasicPrepareAndExecuteQueryTest(WebQueryResponse<TestObj> fakeQueryResponse) { + Gson gson = getQueryGson(); String strDesc = "QUERY test WHERE 'property1' = ?s"; StatementDescriptor<TestObj> desc = new StatementDescriptor<>(category, strDesc); @@ -435,9 +579,6 @@ assertEquals("fluff", params.getParams()[0].getValue()); assertEquals(String.class, params.getParams()[0].getType()); - WebQueryResponse<TestObj> fakeQueryResponse = new WebQueryResponse<>(); - fakeQueryResponse.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); - fakeQueryResponse.setResultList(new TestObj[] { obj1, obj2 }); prepareServer(gson.toJson(fakeQueryResponse)); Cursor<TestObj> results = null; try { @@ -449,17 +590,22 @@ } assertNotNull(results); assertTrue(results instanceof WebCursor); + assertTrue("Expected WebCursor to be an AdvancedCursor", results instanceof BatchCursor); assertTrue(results.hasNext()); assertEquals("fluffor1", results.next().getProperty1()); assertTrue(results.hasNext()); assertEquals("fluffor2", results.next().getProperty1()); - assertFalse(results.hasNext()); - try { - results.next(); - fail(); - } catch (NoSuchElementException ex) { - // Pass. - } + return results; + } + + private Gson getQueryGson() { + return new GsonBuilder() + .registerTypeAdapterFactory(new PojoTypeAdapterFactory()) + .registerTypeAdapterFactory(new WebPreparedStatementResponseTypeAdapterFactory()) + .registerTypeAdapterFactory(new WebQueryResponseTypeAdapterFactory()) + .registerTypeAdapterFactory(new PreparedParameterTypeAdapterFactory()) + .registerTypeAdapterFactory(new WebPreparedStatementTypeAdapterFactory()) + .create(); } @Test
--- a/web/common/src/main/java/com/redhat/thermostat/web/common/PreparedStatementResponseCode.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/common/src/main/java/com/redhat/thermostat/web/common/PreparedStatementResponseCode.java Mon Oct 27 18:46:26 2014 +0100 @@ -50,6 +50,19 @@ public static final int QUERY_SUCCESS = 0; /** + * Generic error code for failed queries. Usually + * returned if get-more failed for an unknown reason. + */ + public static final int QUERY_FAILURE = -100; + + /** + * Failure code for expired cursors. Usually returned if + * get-more requests failed because the underlying cursor + * was null. + */ + public static final int GET_MORE_NULL_CURSOR = -151; + + /** * Response code if patching of a {@link PreparedStatement} failed during * statement execution. * <p>
--- a/web/common/src/main/java/com/redhat/thermostat/web/common/WebQueryResponse.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/common/src/main/java/com/redhat/thermostat/web/common/WebQueryResponse.java Mon Oct 27 18:46:26 2014 +0100 @@ -47,6 +47,8 @@ */ public class WebQueryResponse<T extends Pojo> { + private int cursorId; + private boolean hasMoreBatches; private int responseCode; private T[] resultList; @@ -66,6 +68,22 @@ this.resultList = resultList; } + public void setHasMoreBatches(boolean hasMoreBatches) { + this.hasMoreBatches = hasMoreBatches; + } + + public boolean hasMoreBatches() { + return hasMoreBatches; + } + + public int getCursorId() { + return cursorId; + } + + public void setCursorId(int cursorId) { + this.cursorId = cursorId; + } + public ParameterizedType getRuntimeParametrizedType(final Class<T> dataClass) { ParameterizedType webQueryResponseType = new ParameterizedType() {
--- a/web/common/src/main/java/com/redhat/thermostat/web/common/typeadapters/WebQueryResponseTypeAdapter.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/common/src/main/java/com/redhat/thermostat/web/common/typeadapters/WebQueryResponseTypeAdapter.java Mon Oct 27 18:46:26 2014 +0100 @@ -53,6 +53,8 @@ private static final String PROP_RESULT = "payload"; private static final String PROP_ERROR_CODE = "errno"; + private static final String PROP_CURSOR_ID = "cId"; + private static final String PROP_CURSOR_HAS_MORE_BATCHES = "cHasMore"; // The runtime type of the Pojo private final Class<T> runtimePojoType; @@ -78,11 +80,23 @@ out.name(PROP_ERROR_CODE); out.value(value.getResponseCode()); + // cursor id + out.name(PROP_CURSOR_ID); + out.value(value.getCursorId()); + + // has more batches property + out.name(PROP_CURSOR_HAS_MORE_BATCHES); + out.value(value.hasMoreBatches()); + // payload out.name(PROP_RESULT); - @SuppressWarnings("unchecked") - TypeAdapter<T[]> pojoTa = (TypeAdapter<T[]>)gson.getAdapter(value.getResultList().getClass()); - pojoTa.write(out, value.getResultList()); + if (value.getResultList() == null) { + out.nullValue(); + } else { + @SuppressWarnings("unchecked") + TypeAdapter<T[]> pojoTa = (TypeAdapter<T[]>)gson.getAdapter(value.getResultList().getClass()); + pojoTa.write(out, value.getResultList()); + } out.endObject(); } @@ -97,14 +111,18 @@ in.beginObject(); - // response code + // response code (can't be null) int responseCode = 0; String name = in.nextName(); - if (name.equals(PROP_ERROR_CODE)) { - responseCode = in.nextInt(); - } else { - throw new IllegalStateException("Expected " + PROP_ERROR_CODE + " but got " + name); - } + responseCode = in.nextInt(); + + // cursor ID (can't be null) + name = in.nextName(); + int cursorId = in.nextInt(); + + // Has more batches, boolean, can't be null + name = in.nextName(); + boolean hasMoreBatches = in.nextBoolean(); if (runtimePojoType == null) { throw new IllegalStateException("Runtime pojo type unknown"); @@ -130,6 +148,8 @@ WebQueryResponse<T> qResponse = new WebQueryResponse<>(); qResponse.setResponseCode(responseCode); + qResponse.setCursorId(cursorId); + qResponse.setHasMoreBatches(hasMoreBatches); qResponse.setResultList(resultList); return qResponse; }
--- a/web/common/src/test/java/com/redhat/thermostat/web/common/typeadapters/WebQueryResponseTypeAdapterTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/common/src/test/java/com/redhat/thermostat/web/common/typeadapters/WebQueryResponseTypeAdapterTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -37,6 +37,7 @@ package com.redhat.thermostat.web.common.typeadapters; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.lang.reflect.Type; @@ -77,18 +78,47 @@ // create the query response WebQueryResponse<AgentInformation> response = new WebQueryResponse<>(); response.setResultList(resultList); + response.setCursorId(300); response.setResponseCode(PreparedStatementResponseCode.ILLEGAL_PATCH); String jsonStr = gson.toJson(response); - String expectedJson = "{\"errno\":-1,\"payload\":[{\"agentId\":\"testing\",\"alive\":false,\"startTime\":0,\"stopTime\":0}]}"; + String expectedJson = "{\"errno\":-1,\"cId\":300,\"cHasMore\":false,\"payload\":[{\"agentId\":\"testing\",\"alive\":false,\"startTime\":0,\"stopTime\":0}]}"; assertEquals(expectedJson, jsonStr); } @Test - public void canDeserializeBasic() { - String rawJson = "{\"errno\":-1,\"payload\":[{\"startTime\":0,\"stopTime\":0,\"alive\":true,\"agentId\":\"testing\"}]}"; + public void canSerializeNoResultList() { + // create the query response + WebQueryResponse<AgentInformation> response = new WebQueryResponse<>(); + response.setResultList(null); // should be left out from serialization + response.setCursorId(-0xdeadbeef); + response.setResponseCode(PreparedStatementResponseCode.ILLEGAL_PATCH); + + String jsonStr = gson.toJson(response); + String expectedJson = "{\"errno\":-1,\"cId\":559038737,\"cHasMore\":false}"; + assertEquals(expectedJson, jsonStr); + } + + @Test + public void canDeserializeNoResultList() { + String rawJson = "{\"errno\":-1,\"cId\":444,\"cHasMore\":true}"; Type queryResponseType = new TypeToken<WebQueryResponse<AgentInformation>>() {}.getType(); WebQueryResponse<AgentInformation> actual = gson.fromJson(rawJson, queryResponseType); + assertEquals(true, actual.hasMoreBatches()); + assertEquals(444, actual.getCursorId()); + + assertNull(actual.getResultList()); + + assertEquals(PreparedStatementResponseCode.ILLEGAL_PATCH, actual.getResponseCode()); + } + + @Test + public void canDeserializeBasic() { + String rawJson = "{\"errno\":-1,\"cId\":444,\"cHasMore\":true,\"payload\":[{\"startTime\":0,\"stopTime\":0,\"alive\":true,\"agentId\":\"testing\"}]}"; + Type queryResponseType = new TypeToken<WebQueryResponse<AgentInformation>>() {}.getType(); + WebQueryResponse<AgentInformation> actual = gson.fromJson(rawJson, queryResponseType); + assertEquals(true, actual.hasMoreBatches()); + assertEquals(444, actual.getCursorId()); AgentInformation[] actualList = actual.getResultList(); @@ -144,7 +174,7 @@ response.setResponseCode(PreparedStatementResponseCode.ILLEGAL_PATCH); String jsonStr = gson.toJson(response); - String expectedJson = "{\"errno\":-1,\"payload\":[{\"agentId\":\"testing\",\"alive\":false,\"startTime\":0,\"stopTime\":0}]}"; + String expectedJson = "{\"errno\":-1,\"cId\":0,\"cHasMore\":false,\"payload\":[{\"agentId\":\"testing\",\"alive\":false,\"startTime\":0,\"stopTime\":0}]}"; assertEquals(expectedJson, jsonStr); // We need to tell GSON which parametrized type we want it to deserialize
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/server/src/main/java/com/redhat/thermostat/web/server/CursorManager.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,211 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.server; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; + +import com.redhat.thermostat.storage.core.experimental.BatchCursor; + +/** + * Manages (query) cursors for a single user. + * + */ +final class CursorManager { + + public static final int CURSOR_NOT_STORED = -1; + private final Map<Integer, CursorHolder> cursors; + private int cursorIdCounter; + private final Timer sweeperTimer; + + CursorManager() { + this(new Timer()); + } + + // test-only + CursorManager(Map<Integer, CursorHolder> cursors, Timer timer) { + this.sweeperTimer = timer; + this.cursors = cursors; + } + + CursorManager(Timer timer) { + this.cursors = new HashMap<>(); + this.sweeperTimer = timer; + } + + // test-only + CursorManager(int cursorIdCounter) { + this(); + this.cursorIdCounter = cursorIdCounter; + } + + /** + * Add a cursor to the map we know of if it has more results. + * + * @param cursor The potential candidate to add to the state we keep track + * of. + * @return The cursor ID or {@link CursorManager#CURSOR_NOT_STORED} if the + * passed in cursor has no more elements. + */ + synchronized int put(final BatchCursor<?> cursor) { + int cursorId = CURSOR_NOT_STORED; + if (cursor.hasNext()) { + // Be sure we don't overflow. For a long running web storage we + // could potentially run out of id's for a single user. However, + // the time between 0 and Integer.MAX_VALUE should be sufficiently + // large so that any given cursor expires before the id will get + // reused. + if (cursorIdCounter == Integer.MAX_VALUE) { + cursorIdCounter = 0; // start again from 0 + } + cursorId = cursorIdCounter; + cursors.put(cursorId, new CursorHolder(cursor, System.currentTimeMillis())); + cursorIdCounter++; + } + return cursorId; + } + + synchronized BatchCursor<?> get(int cursorId) { + CursorHolder holder = cursors.get(cursorId); + if (holder == null) { + return null; + } + return holder.getCursor(); + } + + synchronized void updateCursorTimeStamp(int cursorId) { + CursorHolder holder = cursors.get(cursorId); + if (holder == null) { + return; + } + holder.updateTimestamp(); + } + + synchronized void removeCursor(int cursorId) { + cursors.remove(cursorId); + } + + synchronized void expireCursors() { + final long currentTime = System.currentTimeMillis(); + List<Integer> expiredCursors = new ArrayList<>(); + for (Entry<Integer, CursorHolder> entry: cursors.entrySet()) { + CursorHolder holder = entry.getValue(); + if (holder.isCursorExpired(currentTime)) { + expiredCursors.add(entry.getKey()); + } + } + for (Integer expiredKey: expiredCursors) { + cursors.remove(expiredKey); + } + } + + void startSweeperTimer() { + startSweeperTimer(new CursorSweeper(this)); + } + + // This is here in order to facilitate better testing. + void startSweeperTimer(final TimerTask task) { + sweeperTimer.scheduleAtFixedRate(task, 0, CursorHolder.TIMEOUT); + } + + /** + * + * A container in order save and track cursors. This holder enables + * expiring cursors, as well as extending the liveness of a cursor. + * + */ + static class CursorHolder { + + private static final int MINUTES = 1000 * 60; + // The time out in minutes + static final int TIMEOUT = 3 * MINUTES; + + private final BatchCursor<?> cursor; + private long lastUpdated; + + CursorHolder(BatchCursor<?> cursor, long lastUpdated) { + this.cursor = cursor; + this.lastUpdated = lastUpdated; + } + + void updateTimestamp() { + this.lastUpdated = System.currentTimeMillis(); + } + + boolean isCursorExpired(long currentTime) { + return checkIsCursorExpired(currentTime, TIMEOUT); + } + + BatchCursor<?> getCursor() { + return cursor; + } + + // here in order to facilitate testing + boolean checkIsCursorExpired(long currentTime, final int timeoutInMillis) { + return lastUpdated < (currentTime - timeoutInMillis); + } + + // test-only + long getLastUpdated() { + return lastUpdated; + } + } + + /** + * {@link TimerTask} which times out cursors. + */ + static class CursorSweeper extends TimerTask { + + private final CursorManager manager; + + CursorSweeper(CursorManager manager) { + this.manager = manager; + } + + @Override + public void run() { + manager.expireCursors(); + } + + } + +}
--- a/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/server/src/main/java/com/redhat/thermostat/web/server/WebStorageEndPoint.java Mon Oct 27 18:46:26 2014 +0100 @@ -45,6 +45,7 @@ import java.lang.reflect.Array; import java.security.Principal; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +60,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; import org.apache.commons.codec.binary.Base64; import org.apache.commons.fileupload.FileItem; @@ -93,6 +95,7 @@ import com.redhat.thermostat.storage.core.StorageCredentials; import com.redhat.thermostat.storage.core.auth.DescriptorMetadata; import com.redhat.thermostat.storage.core.auth.StatementDescriptorMetadataFactory; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.model.AggregateResult; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.storage.query.BinaryLogicalExpression; @@ -120,12 +123,17 @@ @SuppressWarnings("serial") public class WebStorageEndPoint extends HttpServlet { + // This is an ugly hack in order to allow for testing of batched querying. + static int DEFAULT_QUERY_BATCH_SIZE = BatchCursor.DEFAULT_BATCH_SIZE; + static final String CMDC_AUTHORIZATION_GRANT_ROLE_PREFIX = "thermostat-cmdc-grant-"; static final String FILES_READ_GRANT_ROLE_PREFIX = "thermostat-files-grant-read-filename-"; static final String FILES_WRITE_GRANT_ROLE_PREFIX = "thermostat-files-grant-write-filename-"; private static final String TOKEN_MANAGER_TIMEOUT_PARAM = "token-manager-timeout"; private static final String TOKEN_MANAGER_KEY = "token-manager"; private static final String USER_PRINCIPAL_CALLBACK_KEY = "user-principal-callback"; + private static final String CURSOR_MANAGER_KEY = "cursor-manager"; + private static final int UNKNOWN_CURSOR_ID = -0xdeadbeef; private static final String CATEGORY_KEY_FORMAT = "%s|%s"; // our strings can contain non-ASCII characters. Use UTF-8 @@ -272,6 +280,8 @@ generateToken(req, resp); } else if (cmd.equals("verify-token")) { verifyToken(req, resp); + } else if (cmd.equals("get-more")) { + getMore(req, resp); } } @@ -585,13 +595,21 @@ } } - @SuppressWarnings("unchecked") + /** + * Handler for query executions (except for getting more results). See + * {@link #getMore(HttpServletRequest, HttpServletResponse)}. + * + * @param req + * @param resp + * @throws IOException + */ @WebStoragePathHandler( path = "query-execute" ) private <T extends Pojo> void queryExecute(HttpServletRequest req, HttpServletResponse resp) throws IOException { if (! isAuthorized(req, resp, Roles.READ)) { return; } String queryParam = req.getParameter("prepared-stmt"); + @SuppressWarnings("unchecked") WebPreparedStatement<T> stmt = gson.fromJson(queryParam, WebPreparedStatement.class); PreparedParameters p = stmt.getParams(); @@ -600,7 +618,6 @@ PreparedStatement<T> targetStmt = targetStmtHolder.getStmt(); ParsedStatement<T> parsed = targetStmt.getParsedStatement(); Query<T> targetQuery = null; - ArrayList<T> resultList = new ArrayList<>(); WebQueryResponse<T> response = new WebQueryResponse<>(); try { targetQuery = (Query<T>)parsed.patchStatement(params); @@ -618,16 +635,157 @@ UserPrincipal userPrincipal = getUserPrincipal(req); targetQuery = getQueryForPrincipal(userPrincipal, targetQuery, desc, actualMetadata); + // While the signature still says the retval of query execute is + // cursor, we return an instance of AdvancedCursor instead for new code. + // This is the case for MongoStorage. However, in order to work + // around potential third-party implementations perform the check + // and fall back to legacy behaviour. Cursor<T> cursor = targetQuery.execute(); + List<T> resultsList = null; + if (cursor instanceof BatchCursor) { + BatchCursor<T> batchCursor = (BatchCursor<T>)cursor; + resultsList = getBatchFromCursor(batchCursor, DEFAULT_QUERY_BATCH_SIZE); + assert(resultsList.size() <= DEFAULT_QUERY_BATCH_SIZE); + CursorManager cursorManager = null; + HttpSession userSession = req.getSession(); + synchronized(userSession) { + cursorManager = (CursorManager)userSession.getAttribute(CURSOR_MANAGER_KEY); + if (cursorManager == null) { + // Not yet set for this user, create a new cursor manager + // and start the sweeper timer so as to prevent memory + // leaks due to cursors kept as a reference in cursor manager + cursorManager = new CursorManager(); + cursorManager.startSweeperTimer(); + userSession.setAttribute(CURSOR_MANAGER_KEY, cursorManager); + } + } + // Only record cursor if there are more results to return than the + // first batch size. + int cursorId = cursorManager.put(batchCursor); + response.setCursorId(cursorId); + response.setHasMoreBatches(batchCursor.hasNext()); + } else { + // fallback to old behaviour + resultsList = getLegacyResultList(cursor); + response.setHasMoreBatches(false); // only one batch + response.setCursorId(UNKNOWN_CURSOR_ID); + } + writeQueryResponse(resp, response, resultsList, targetStmtHolder); + } + + private <T extends Pojo> void writeQueryResponse(HttpServletResponse resp, WebQueryResponse<T> response, List<T> resultsList, PreparedStatementHolder<T> targetStmtHolder) throws IOException { + @SuppressWarnings("unchecked") + T[] results = (T[])Array.newInstance(targetStmtHolder.getDataClass(), resultsList.size()); + for (int i = 0; i < resultsList.size(); i++) { + results[i] = resultsList.get(i); + } + response.setResultList(results); + writeResponse(resp, response, WebQueryResponse.class); + } + + /** + * Handler for getting more results for a query. Queries return results + * in batches. The first batch is returned via {@link #queryExecute(HttpServletRequest, HttpServletResponse)}. Subsequent results will get returned using + * this path. + * + * @param req + * @param resp + * @throws IOException + */ + @WebStoragePathHandler( path = "get-more" ) + private <T extends Pojo> void getMore(HttpServletRequest req, HttpServletResponse resp) throws IOException { + if (! isAuthorized(req, resp, Roles.READ)) { + return; + } + String stmtIdParam = req.getParameter("prepared-stmt-id"); + String cursorIdParam = req.getParameter("cursor-id"); + String batchSizeParam = req.getParameter("batch-size"); + + int stmtId = Integer.parseInt(stmtIdParam); + int cursorId = Integer.parseInt(cursorIdParam); + int batchSize = Integer.parseInt(batchSizeParam); + + HttpSession userSession = req.getSession(); + CursorManager cursorManager = null; + synchronized(userSession) { + cursorManager = (CursorManager)userSession.getAttribute(CURSOR_MANAGER_KEY); + } + if (cursorManager == null) { + // Trying to get a cursorManager for a user which does not + // have it in the session as an attribute? Perhaps a cluster + // deployment problem? + throw new IllegalStateException("[get-more] No cursor manager available in session for " + req.getRemoteUser()); + } + @SuppressWarnings("unchecked") + BatchCursor<T> batchCursor = (BatchCursor<T>)cursorManager.get(cursorId); + + PreparedStatementHolder<T> targetStmtHolder = getStatementHolderFromId(stmtId); + if (batchCursor == null) { + // This either means: + // 1. The underlying (backing-storage) cursor didn't have + // more results, thus WebQueryResponse.hasMoreBatches() == false, + // when queryExecute() returned its WebQueryResponse. Still, + // the client requested more elements anyway and ended up here. + // That's really a bug in the client which performed this request. + // 2. The cursor expired via the sweeper timer in CursorManager, + // before the client actually managed to request more results. In + // that case the client is advised to re-issue the query in order + // to get a new cursor, since the underlying data in the DB might + // have changed anyway and results returned would be surprising. + // See http://docs.mongodb.org/manual/core/cursors/ + // (section "Cursor Isolation") + String msg = "No cursor found for user " + + req.getRemoteUser() + " and cursor id: " + cursorId + + ". Query was: " + targetStmtHolder.getStatementDescriptor(); + logger.log(Level.WARNING, msg); + WebQueryResponse<T> response = new WebQueryResponse<>(); + response.setResponseCode(PreparedStatementResponseCode.GET_MORE_NULL_CURSOR); + response.setHasMoreBatches(false); + response.setCursorId(cursorId); + List<T> empty = Collections.emptyList(); + writeQueryResponse(resp, response, empty, targetStmtHolder); + return; + } + // Update backing storage cursor with (possibly) changed params. + // This will validate batchSize input + batchCursor.setBatchSize(batchSize); + + List<T> nextBatch = getBatchFromCursor(batchCursor, batchCursor.getBatchSize()); + boolean stillMoreResults = batchCursor.hasNext(); + if (stillMoreResults) { + // Refresh timestamp of a live cursor so that it won't expire. + cursorManager.updateCursorTimeStamp(cursorId); + } else { + // no more results, remove cursor + cursorManager.removeCursor(cursorId); + } + logger.log(Level.FINEST, "Fetched more results (" + nextBatch.size() + ") for user '" + req.getRemoteUser() + "' cursorId " + cursorId + + ". Statement: " + targetStmtHolder.getStatementDescriptor()); + WebQueryResponse<T> response = new WebQueryResponse<>(); + response.setResponseCode(PreparedStatementResponseCode.QUERY_SUCCESS); + response.setHasMoreBatches(stillMoreResults); + response.setCursorId(cursorId); + writeQueryResponse(resp, response, nextBatch, targetStmtHolder); + } + + // Fetches the first batch of results. Number of results are determined + // by the default batch size in AdvancedCursor + private <T extends Pojo> List<T> getBatchFromCursor(final BatchCursor<T> cursor, final int batchSize) { + ArrayList<T> resultList = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize && cursor.hasNext(); i++) { + resultList.add(cursor.next()); + } + return resultList; + } + + // Fetches all results imposing no bound on the result set if the underlying + // query was unbounded. + private <T extends Pojo> ArrayList<T> getLegacyResultList(Cursor<T> cursor) { + ArrayList<T> resultList = new ArrayList<>(); while (cursor.hasNext()) { resultList.add(cursor.next()); } - T[] results = (T[])Array.newInstance(targetStmtHolder.getDataClass(), resultList.size()); - for (int i = 0; i < resultList.size(); i++) { - results[i] = resultList.get(i); - } - response.setResultList(results); - writeResponse(resp, response, WebQueryResponse.class); + return resultList; } @SuppressWarnings("unchecked")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/server/src/test/java/com/redhat/thermostat/web/server/CursorManagerTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -0,0 +1,251 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.web.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import org.junit.Test; + +import com.redhat.thermostat.storage.core.experimental.BatchCursor; +import com.redhat.thermostat.web.server.CursorManager.CursorHolder; +import com.redhat.thermostat.web.server.CursorManager.CursorSweeper; + +public class CursorManagerTest { + + /** + * Test putting cursors which return true on hasNext() + * those cursors need to get tracked. + */ + @Test + public void testPutBasicCursorHasMore() { + CursorManager manager = new CursorManager(); + int id = manager.put(getHasMoreBatchCursor()); + assertTrue(id >= 0); + assertEquals(0, id); + id = manager.put(getHasMoreBatchCursor()); + assertTrue(id != 0); + assertEquals(1, id); + } + + /** + * Verifies that cursor IDs won't overflow. + */ + @Test + public void testPutCursorOverflow() { + int startValue = Integer.MAX_VALUE - 5; + // construct a cursor manager with high enough counter to simulate + // overflow. + CursorManager manager = new CursorManager(startValue); + int numIterations = 10; + boolean rollOver = false; + for (int i = 0, id; i < numIterations; i++) { + id = manager.put(getHasMoreBatchCursor()); + assertTrue(id >= 0); + assertFalse(id == Integer.MAX_VALUE); + if (id == 0) { + rollOver = true; + } + } + assertTrue("Expected id to start again from 0", rollOver); + } + + /** + * Test putting a cursor which returns false on hasNext(). + * CursorManager should not add such cursors and should + * return CURSOR_NOT_STORED. + */ + @Test + public void testPutNoHasMoreCursor() { + CursorManager manager = new CursorManager(); + int id = manager.put(mock(BatchCursor.class)); + assertEquals(CursorManager.CURSOR_NOT_STORED, id); + } + + private BatchCursor<?> getHasMoreBatchCursor() { + BatchCursor<?> c = mock(BatchCursor.class); + when(c.hasNext()).thenReturn(true); + return c; + } + + @Test + public void testGetInvalidId() { + CursorManager manager = new CursorManager(); + BatchCursor<?> c = manager.get(CursorManager.CURSOR_NOT_STORED); + assertNull(c); + } + + /** + * Basic test for CursorManager.get(). Add some cursors, + * then add one we track the id for and verify that we + * can get the BatchCursor reference. + */ + @Test + public void testGetHasMore() { + CursorManager manager = new CursorManager(); + int num = (int)(Math.random() * 300); + addCursors(num, manager); + BatchCursor<?> cursor = getHasMoreBatchCursor(); + int interestingId = manager.put(cursor); + num = (int)(Math.random() * 40); + addCursors(num, manager); + BatchCursor<?> actual = manager.get(interestingId); + assertSame(actual, cursor); + } + + @Test + public void testExpireCursors() { + Map<Integer, CursorHolder> cursors = new HashMap<>(); + long expiredTime = System.currentTimeMillis() - ( 5 * 60 * 1000); + long notExpiredTime = System.currentTimeMillis(); + cursors.put(3, new CursorHolder(mock(BatchCursor.class), expiredTime)); + cursors.put(4, new CursorHolder(mock(BatchCursor.class), expiredTime)); + cursors.put(5, new CursorHolder(mock(BatchCursor.class), notExpiredTime)); + cursors.put(7, new CursorHolder(mock(BatchCursor.class), expiredTime)); + CursorManager manager = new CursorManager(cursors, mock(Timer.class)); + manager.expireCursors(); // should remove old cursors + assertEquals(1, cursors.keySet().size()); + assertNotNull(cursors.get(5)); + } + + @Test + public void testRemoveCursor() { + Map<Integer, CursorHolder> cursors = new HashMap<>(); + cursors.put(3, new CursorHolder(mock(BatchCursor.class), 3)); + cursors.put(4, new CursorHolder(mock(BatchCursor.class), 4)); + cursors.put(5, new CursorHolder(mock(BatchCursor.class), 5)); + CursorManager manager = new CursorManager(cursors, mock(Timer.class)); + manager.removeCursor(3); + assertEquals(2, cursors.keySet().size()); + assertNotNull(cursors.get(4)); + assertNotNull(cursors.get(5)); + assertNull(cursors.get(3)); + } + + @Test + public void testUpdateCursorTimeStamp() { + Map<Integer, CursorHolder> cursors = new HashMap<>(); + long expiredTime = System.currentTimeMillis() - ( 5 * 60 * 1000); + long notExpiredTime = System.currentTimeMillis(); + cursors.put(4, new CursorHolder(mock(BatchCursor.class), expiredTime)); + cursors.put(5, new CursorHolder(mock(BatchCursor.class), notExpiredTime)); + cursors.put(7, new CursorHolder(mock(BatchCursor.class), expiredTime)); + CursorManager manager = new CursorManager(cursors, mock(Timer.class)); + // refresh 4's timestamp so that it's now no longer expired + manager.updateCursorTimeStamp(4); + manager.expireCursors(); + assertEquals(2, cursors.keySet().size()); + assertNotNull("4 has been updated and is thus still alive", cursors.get(4)); + assertNotNull("5 was expired from the outset", cursors.get(5)); + } + + @Test + public void canStartSweeperTimerViaManager() { + Timer mockTimer = mock(Timer.class); + CursorManager manager = new CursorManager(mockTimer); + TimerTask timerTask = mock(TimerTask.class); + manager.startSweeperTimer(timerTask); + long threeMinutes = 3 * 60 * 1000; + verify(mockTimer).scheduleAtFixedRate(timerTask, 0, threeMinutes); + } + + private void addCursors(int num, CursorManager manager) { + for (int i = 0; i < num; i++) { + manager.put(getHasMoreBatchCursor()); + } + } + + // CursorHolder tests + + @Test + public void testUpdateTimeStamp() { + long now = System.currentTimeMillis(); + CursorHolder holder = new CursorHolder(mock(BatchCursor.class), now); + sleep(10); + holder.updateTimestamp(); + assertTrue(now != holder.getLastUpdated()); + } + + @Test + public void testCheckIsCursorExpired() { + long now = System.currentTimeMillis(); + CursorHolder holder = new CursorHolder(mock(BatchCursor.class), now); + sleep(10); + long laterTime = System.currentTimeMillis(); + assertFalse("cursor still valid. timeout == 20ms, but only 10ms old.", + holder.checkIsCursorExpired(laterTime, 20)); + assertTrue("cursor older than 5 milliseconds", holder.checkIsCursorExpired(laterTime, 5)); + } + + @Test + public void testGetCursor() { + long now = System.currentTimeMillis(); + BatchCursor<?> cursor = mock(BatchCursor.class); + CursorHolder holder = new CursorHolder(cursor, now); + assertSame(cursor, holder.getCursor()); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + // ignore + } + } + + // CursorSweeper tests + + public void testCursorSweeper() { + CursorManager manager = mock(CursorManager.class); + CursorSweeper sweeper = new CursorSweeper(manager); + sweeper.run(); + verify(manager).expireCursors(); + } + +}
--- a/web/server/src/test/java/com/redhat/thermostat/web/server/WebStorageEndPointUnitTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/server/src/test/java/com/redhat/thermostat/web/server/WebStorageEndPointUnitTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -102,7 +102,8 @@ // authorization checks final String[] authPaths = new String[] { "prepare-statement", "query-execute", "write-execute", "register-category", - "save-file", "load-file", "purge", "ping", "generate-token", "verify-token" + "save-file", "load-file", "purge", "ping", "generate-token", "verify-token", + "get-more" }; Map<String, Boolean> checkedAutPaths = new HashMap<>(); for (String path: authPaths) {
--- a/web/server/src/test/java/com/redhat/thermostat/web/server/WebStorageEndpointTest.java Wed Nov 05 15:13:32 2014 -0500 +++ b/web/server/src/test/java/com/redhat/thermostat/web/server/WebStorageEndpointTest.java Mon Oct 27 18:46:26 2014 +0100 @@ -37,6 +37,8 @@ package com.redhat.thermostat.web.server; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -119,6 +121,7 @@ import com.redhat.thermostat.storage.core.auth.CategoryRegistration; import com.redhat.thermostat.storage.core.auth.DescriptorMetadata; import com.redhat.thermostat.storage.core.auth.StatementDescriptorRegistration; +import com.redhat.thermostat.storage.core.experimental.BatchCursor; import com.redhat.thermostat.storage.dao.HostInfoDAO; import com.redhat.thermostat.storage.model.AggregateCount; import com.redhat.thermostat.storage.model.BasePojo; @@ -349,38 +352,197 @@ assertEquals("application/json; charset=UTF-8", conn.getContentType()); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + /** + * Tests a successful query execution, starting from preparing the query, + * then executing the query and finally getting more results for the query. + * + * @throws Exception + * + * @see {@link #authorizedPrepareQueryWithTrustedDescriptorGetMoreFail()} + */ + @SuppressWarnings({ "rawtypes" }) @Test - public void authorizedPrepareQueryWithTrustedDescriptor() throws Exception { - String strDescriptor = "QUERY " + category.getName() + " WHERE '" + key1.getName() + "' = ?s SORT '" + key1.getName() + "' DSC LIMIT 42"; - // metadata which basically does no filtering. There's another test which - // asserts only allowed data (via ACL) gets returned. - DescriptorMetadata metadata = new DescriptorMetadata(); - setupTrustedStatementRegistry(strDescriptor, metadata); + public void authorizedPrepareQueryWithTrustedDescriptorSuccessfulGetMore() throws Exception { + // Get the trusted descriptor + String strDescriptor = setupPreparedQueryWithTrustedDescriptor(); + + // Prepare the query + boolean moreBatches = true; + TrustedPreparedQueryTestResult prepareQueryResult = prepareQuery(strDescriptor, moreBatches); + + Type typeToken = new TypeToken<WebQueryResponse<TestClass>>(){}.getType(); + // now execute the query we've just prepared + WebPreparedStatement<TestClass> stmt = new WebPreparedStatement<>(1, 0); + stmt.setString(0, "fluff"); + + // Execute the query, preserver the cookie + String cookieValue = executeQuery(prepareQueryResult.gson, prepareQueryResult.mockMongoQuery, typeToken, stmt, moreBatches); + + // Simulate getting more elements + int cursorId = 0; + int batchSize = 3; + // stub the underlying cursor so that it can fill up a single batch (3) + // and leaves one element more => 3 + 1. + when(prepareQueryResult.cursor.hasNext()).thenReturn(true) // 1 + .thenReturn(true) // 2 + .thenReturn(true) // 3 + .thenReturn(true) // 4 + .thenReturn(false); + + + when(prepareQueryResult.cursor.next()) + .thenReturn(new TestClass()) // 1 + .thenReturn(new TestClass()) // 2 + .thenReturn(new TestClass()) // 3 + .thenReturn(new TestClass()) // 4 + .thenReturn(null); + when(prepareQueryResult.cursor.getBatchSize()).thenReturn(batchSize); + URL url = new URL(getEndpoint() + "/get-more"); + HttpURLConnection getMoreConn = (HttpURLConnection) url.openConnection(); + getMoreConn.setRequestMethod("POST"); + setCookie(getMoreConn, cookieValue); + sendAuthentication(getMoreConn, "ignored1", "ignored2"); + getMoreConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + getMoreConn.setDoInput(true); + getMoreConn.setDoOutput(true); + + OutputStreamWriter out = new OutputStreamWriter(getMoreConn.getOutputStream()); + String body = "prepared-stmt-id=" + stmt.getStatementId() + "&"; + body += "cursor-id=" + cursorId + "&"; + body += "batch-size=" + batchSize; + out.write(body); + out.flush(); + + InputStreamReader in = new InputStreamReader(getMoreConn.getInputStream()); + WebQueryResponse result = prepareQueryResult.gson.fromJson(in, typeToken); + assertEquals(3, result.getResultList().length); + assertEquals(cursorId, result.getCursorId()); + assertTrue("There was one more result than the batch size", result.hasMoreBatches()); + assertEquals("application/json; charset=UTF-8", getMoreConn.getContentType()); + // expected setBatchSize to be called via get-more request + verify(prepareQueryResult.cursor).setBatchSize(batchSize); + } + + /** + * Tests authorized prepared query which attempts to do a get-more but does + * not have a cursor in the user's cursor manager. + * + * @throws Exception + * + * @see {@link #authorizedPrepareQueryWithTrustedDescriptorSuccessfulGetMore} + */ + @SuppressWarnings({ "rawtypes" }) + @Test + public void authorizedPrepareQueryWithTrustedDescriptorGetMoreFail() throws Exception { + // Get the trusted descriptor + String strDescriptor = setupPreparedQueryWithTrustedDescriptor(); + + // Prepare the query + boolean moreBatches = false; + TrustedPreparedQueryTestResult prepareQueryResult = prepareQuery(strDescriptor, moreBatches); + + Type typeToken = new TypeToken<WebQueryResponse<TestClass>>(){}.getType(); + // now execute the query we've just prepared + WebPreparedStatement<TestClass> stmt = new WebPreparedStatement<>(1, 0); + stmt.setString(0, "fluff"); - Set<BasicRole> roles = new HashSet<>(); - roles.add(new RolePrincipal(Roles.REGISTER_CATEGORY)); - roles.add(new RolePrincipal(Roles.PREPARE_STATEMENT)); - roles.add(new RolePrincipal(Roles.READ)); - roles.add(new RolePrincipal(Roles.ACCESS_REALM)); - UserPrincipal testUser = new UserPrincipal("ignored1"); - testUser.setRoles(roles); + // Execute the query, preserver the cookie + String cookieValue = executeQuery(prepareQueryResult.gson, prepareQueryResult.mockMongoQuery, typeToken, stmt, moreBatches); - final JAASLoginService loginService = getConfiguredLoginService(testUser, roles); + // Simulate getting more elements + int cursorId = 0; + int batchSize = 3; + // stub the underlying cursor so that it can fill up a single batch (3) + // and leaves one element more => 3 + 1. + when(prepareQueryResult.cursor.hasNext()).thenReturn(true) // 1 + .thenReturn(true) // 2 + .thenReturn(true) // 3 + .thenReturn(false) // 4 + .thenReturn(false); + + + when(prepareQueryResult.cursor.next()) + .thenReturn(new TestClass()) // 1 + .thenReturn(new TestClass()) // 2 + .thenReturn(new TestClass()) // 3 + .thenReturn(null) // 4 + .thenReturn(null); + when(prepareQueryResult.cursor.getBatchSize()).thenReturn(batchSize); + URL url = new URL(getEndpoint() + "/get-more"); + HttpURLConnection getMoreConn = (HttpURLConnection) url.openConnection(); + getMoreConn.setRequestMethod("POST"); + setCookie(getMoreConn, cookieValue); + sendAuthentication(getMoreConn, "ignored1", "ignored2"); + getMoreConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + getMoreConn.setDoInput(true); + getMoreConn.setDoOutput(true); + + OutputStreamWriter out = new OutputStreamWriter(getMoreConn.getOutputStream()); + String body = "prepared-stmt-id=" + stmt.getStatementId() + "&"; + body += "cursor-id=" + cursorId + "&"; + body += "batch-size=" + batchSize; + out.write(body); + out.flush(); - //final LoginService loginService = new TestJAASLoginService(testUser); - port = FreePortFinder.findFreePort(new TryPort() { - - @Override - public void tryPort(int port) throws Exception { - startServer(port, loginService); - } - }); - // This makes register category work for the "test" category. - // Undone via @After - setupTrustedCategory(categoryName); - registerCategory("ignored1", "ignored2"); + InputStreamReader in = new InputStreamReader(getMoreConn.getInputStream()); + WebQueryResponse result = prepareQueryResult.gson.fromJson(in, typeToken); + assertEquals(PreparedStatementResponseCode.GET_MORE_NULL_CURSOR, result.getResponseCode()); + assertNotNull(result.getResultList()); + assertEquals(0, result.getResultList().length); + assertEquals(cursorId, result.getCursorId()); + assertFalse("This is a failure response, no more batches", result.hasMoreBatches()); + assertEquals("application/json; charset=UTF-8", getMoreConn.getContentType()); + } + + private String executeQuery(Gson gson, Query<TestClass> mockMongoQuery, + Type typeToken, WebPreparedStatement<TestClass> stmt, boolean moreBatches) + throws MalformedURLException, IOException, ProtocolException { + URL url = new URL(getEndpoint() + "/query-execute"); + HttpURLConnection queryExecuteConn = (HttpURLConnection) url.openConnection(); + queryExecuteConn.setRequestMethod("POST"); + sendAuthentication(queryExecuteConn, "ignored1", "ignored2"); + queryExecuteConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + queryExecuteConn.setDoInput(true); + queryExecuteConn.setDoOutput(true); + OutputStreamWriter out = new OutputStreamWriter(queryExecuteConn.getOutputStream()); + String body = "prepared-stmt=" + gson.toJson(stmt, WebPreparedStatement.class); + out.write(body + "\n"); + out.flush(); + + String cookieValue = queryExecuteConn.getHeaderField("Set-Cookie"); + InputStreamReader in = new InputStreamReader(queryExecuteConn.getInputStream()); + WebQueryResponse<TestClass> result = gson.fromJson(in, typeToken); + assertEquals("Expected more batches", moreBatches, result.hasMoreBatches()); + TestClass[] results = result.getResultList(); + assertEquals(2, results.length); + assertEquals("fluff1", results[0].getKey1()); + assertEquals(42, results[0].getKey2()); + assertEquals("fluff2", results[1].getKey1()); + assertEquals(43, results[1].getKey2()); + + assertEquals("application/json; charset=UTF-8", queryExecuteConn.getContentType()); + verify(mockMongoQuery).execute(); + verify(mockMongoQuery).getWhereExpression(); + verifyNoMoreInteractions(mockMongoQuery); + return cookieValue; + } + + private static class TrustedPreparedQueryTestResult { + + private final Gson gson; + private final Query<TestClass> mockMongoQuery; + private final BatchCursor<TestClass> cursor; + + private TrustedPreparedQueryTestResult(Gson gson, Query<TestClass> mockMongoQuery, BatchCursor<TestClass> cursor) { + this.cursor = cursor; + this.gson = gson; + this.mockMongoQuery = mockMongoQuery; + } + } + + @SuppressWarnings("unchecked") + private TrustedPreparedQueryTestResult prepareQuery(String strDescriptor, boolean moreBatches) throws Exception { TestClass expected1 = new TestClass(); expected1.setKey1("fluff1"); expected1.setKey2(42); @@ -391,14 +553,19 @@ Query<TestClass> mockMongoQuery = mock(Query.class); when(mockStorage.createQuery(eq(category))).thenReturn(mockMongoQuery); - Cursor<TestClass> cursor = mock(Cursor.class); - when(cursor.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); + BatchCursor<TestClass> cursor = mock(BatchCursor.class); + WebStorageEndPoint.DEFAULT_QUERY_BATCH_SIZE = 2; + // Assuming: moreBatches == true then we have + // WebStorageEndpoint.getBatchFromCursor() method calls hasNext() twice, + // CursorManager.put() calls it once and WebStorageEndpoint.queryExecute() + // calls it once. Thus, 2 + 1 + 1 = 4 x true, then return false; + when(cursor.hasNext()).thenReturn(true).thenReturn(true).thenReturn(moreBatches).thenReturn(moreBatches).thenReturn(false); when(cursor.next()).thenReturn(expected1).thenReturn(expected2); - PreparedStatement mockPreparedQuery = mock(PreparedStatement.class); + PreparedStatement<TestClass> mockPreparedQuery = mock(PreparedStatement.class); when(mockStorage.prepareStatement(any(StatementDescriptor.class))).thenReturn(mockPreparedQuery); - ParsedStatement mockParsedStatement = mock(ParsedStatement.class); + ParsedStatement<TestClass> mockParsedStatement = mock(ParsedStatement.class); when(mockParsedStatement.getNumParams()).thenReturn(1); when(mockParsedStatement.patchStatement(any(PreparedParameter[].class))).thenReturn(mockMongoQuery); when(mockPreparedQuery.getParsedStatement()).thenReturn(mockParsedStatement); @@ -435,39 +602,38 @@ assertEquals(0, response.getStatementId()); assertEquals("application/json; charset=UTF-8", conn.getContentType()); - - - // now execute the query we've just prepared - WebPreparedStatement<TestClass> stmt = new WebPreparedStatement<>(1, 0); - stmt.setString(0, "fluff"); - - url = new URL(endpoint + "/query-execute"); - HttpURLConnection conn2 = (HttpURLConnection) url.openConnection(); - conn2.setRequestMethod("POST"); - sendAuthentication(conn2, "ignored1", "ignored2"); - conn2.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - conn2.setDoInput(true); - conn2.setDoOutput(true); + return new TrustedPreparedQueryTestResult(gson, mockMongoQuery, cursor); + } + + private String setupPreparedQueryWithTrustedDescriptor() throws Exception { + String strDescriptor = "QUERY " + category.getName() + " WHERE '" + key1.getName() + "' = ?s SORT '" + key1.getName() + "' DSC LIMIT 42"; + // metadata which basically does no filtering. There's another test which + // asserts only allowed data (via ACL) gets returned. + DescriptorMetadata metadata = new DescriptorMetadata(); + setupTrustedStatementRegistry(strDescriptor, metadata); - out = new OutputStreamWriter(conn2.getOutputStream()); - body = "prepared-stmt=" + gson.toJson(stmt, WebPreparedStatement.class); - out.write(body + "\n"); - out.flush(); - - in = new InputStreamReader(conn2.getInputStream()); - Type typeToken = new TypeToken<WebQueryResponse<TestClass>>(){}.getType(); - WebQueryResponse<TestClass> result = gson.fromJson(in, typeToken); - TestClass[] results = result.getResultList(); - assertEquals(2, results.length); - assertEquals("fluff1", results[0].getKey1()); - assertEquals(42, results[0].getKey2()); - assertEquals("fluff2", results[1].getKey1()); - assertEquals(43, results[1].getKey2()); - - assertEquals("application/json; charset=UTF-8", conn2.getContentType()); - verify(mockMongoQuery).execute(); - verify(mockMongoQuery).getWhereExpression(); - verifyNoMoreInteractions(mockMongoQuery); + Set<BasicRole> roles = new HashSet<>(); + roles.add(new RolePrincipal(Roles.REGISTER_CATEGORY)); + roles.add(new RolePrincipal(Roles.PREPARE_STATEMENT)); + roles.add(new RolePrincipal(Roles.READ)); + roles.add(new RolePrincipal(Roles.ACCESS_REALM)); + UserPrincipal testUser = new UserPrincipal("ignored1"); + testUser.setRoles(roles); + + final JAASLoginService loginService = getConfiguredLoginService(testUser, roles); + + port = FreePortFinder.findFreePort(new TryPort() { + + @Override + public void tryPort(int port) throws Exception { + startServer(port, loginService); + } + }); + // This makes register category work for the "test" category. + // Undone via @After + setupTrustedCategory(categoryName); + registerCategory("ignored1", "ignored2"); + return strDescriptor; } /* @@ -991,6 +1157,10 @@ Integer id = gson.fromJson(reader, Integer.class); return id; } + + private void setCookie(HttpURLConnection conn, String cookieVal) { + conn.setRequestProperty("Cookie", cookieVal); + } private void sendAuthentication(HttpURLConnection conn, String username, String passwd) { String userpassword = username + ":" + passwd;