Mercurial > hg > release > thermostat-1.2
view web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java @ 1660:c6ae78b6f3ac
[Thermostat 1.2] Update copyright year to 2015
Reviewed-by: omajid
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2015-March/013127.html
PR2273
author | Severin Gehwolf <sgehwolf@redhat.com> |
---|---|
date | Wed, 11 Mar 2015 15:07:27 +0100 |
parents | 605f0c269f1c |
children |
line wrap: on
line source
/* * Copyright 2012-2015 Red Hat, Inc. * * This file is part of Thermostat. * * Thermostat is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published * by the Free Software Foundation; either version 2, or (at your * option) any later version. * * Thermostat is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Thermostat; see the file COPYING. If not see * <http://www.gnu.org/licenses/>. * * Linking this code with other modules is making a combined work * based on this code. Thus, the terms and conditions of the GNU * General Public License cover the whole combination. * * As a special exception, the copyright holders of this code give * you permission to link this code with independent modules to * produce an executable, regardless of the license terms of these * independent modules, and to copy and distribute the resulting * executable under terms of your choice, provided that you also * meet, for each linked independent module, the terms and conditions * of the license of that module. An independent module is a module * which is not derived from or based on this code. If you modify * this code, you may extend this exception to your version of the * library, but you are not obligated to do so. If you do not wish * to do so, delete this exception statement from your version. */ package com.redhat.thermostat.web.client.internal; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Reader; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; import java.security.SecureRandom; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ssl.SSLContext; import org.apache.commons.codec.binary.Base64; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.StatusLine; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.HttpClient; import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Lookup; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.content.InputStreamBody; import org.apache.http.impl.auth.BasicSchemeFactory; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.redhat.thermostat.common.ssl.SSLContextFactory; import com.redhat.thermostat.common.ssl.SslInitException; import com.redhat.thermostat.common.utils.LoggingUtils; import com.redhat.thermostat.shared.config.SSLConfiguration; import com.redhat.thermostat.storage.core.AuthToken; import com.redhat.thermostat.storage.core.Categories; import com.redhat.thermostat.storage.core.Category; import com.redhat.thermostat.storage.core.Connection; import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.DescriptorParsingException; import com.redhat.thermostat.storage.core.IllegalDescriptorException; import com.redhat.thermostat.storage.core.IllegalPatchException; import com.redhat.thermostat.storage.core.PreparedStatement; import com.redhat.thermostat.storage.core.SecureStorage; import com.redhat.thermostat.storage.core.StatementDescriptor; import com.redhat.thermostat.storage.core.StatementExecutionException; import com.redhat.thermostat.storage.core.Storage; import com.redhat.thermostat.storage.core.StorageCredentials; import com.redhat.thermostat.storage.core.StorageException; import com.redhat.thermostat.storage.model.AggregateResult; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.web.common.PreparedStatementResponseCode; import com.redhat.thermostat.web.common.SharedStateId; import com.redhat.thermostat.web.common.WebPreparedStatement; import com.redhat.thermostat.web.common.WebPreparedStatementResponse; import com.redhat.thermostat.web.common.WebQueryResponse; import com.redhat.thermostat.web.common.typeadapters.PojoTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.PreparedParameterTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.PreparedParametersTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.SharedStateIdTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.WebPreparedStatementResponseTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.WebPreparedStatementTypeAdapterFactory; import com.redhat.thermostat.web.common.typeadapters.WebQueryResponseTypeAdapterFactory; public class WebStorage implements Storage, SecureStorage { private static final int STATUS_OK = 200; private static final int STATUS_NO_CONTENT = 204; private static final String HTTP_PREFIX = "http"; private static final String HTTPS_PREFIX = "https"; // Transition cache is valid for 30 seconds starting from the current time. private static final long TRANSITION_CACHE_OFFSET = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); static final Logger logger = LoggingUtils.getLogger(WebStorage.class); private static class CloseableHttpEntity implements Closeable, HttpEntity { private HttpEntity entity; private int responseCode; CloseableHttpEntity(HttpEntity entity, int responseCode) { this.entity = entity; this.responseCode = responseCode; } @Override public void consumeContent() throws IOException { EntityUtils.consume(entity); } @Override public InputStream getContent() throws IOException, IllegalStateException { return entity.getContent(); } @Override public Header getContentEncoding() { return entity.getContentEncoding(); } @Override public long getContentLength() { return entity.getContentLength(); } @Override public Header getContentType() { return entity.getContentType(); } @Override public boolean isChunked() { return entity.isChunked(); } @Override public boolean isRepeatable() { return entity.isRepeatable(); } @Override public boolean isStreaming() { return entity.isStreaming(); } @Override public void writeTo(OutputStream out) throws IOException { entity.writeTo(out); } @Override public void close() { try { EntityUtils.consume(entity); } catch (IOException ex) { throw new StorageException(ex); } } int getResponseCode() { return responseCode; } } private final class WebConnection extends Connection { WebConnection() { connected = true; } @Override public void disconnect() { connected = false; fireChanged(ConnectionStatus.DISCONNECTED); } @Override public void connect() { try { initAuthentication(); ping(); connected = true; logger.fine("Connected to storage"); fireChanged(ConnectionStatus.CONNECTED); } catch (Exception ex) { logger.log(Level.WARNING, "Could not connect to storage!", ex); fireChanged(ConnectionStatus.FAILED_TO_CONNECT); } } private void initAuthentication() throws MalformedURLException { String username = creds.getUsername(); char[] password = creds.getPassword(); if (username != null && password != null) { URL endpointURL = new URL(endpoint); BasicCredentialsProvider credsProvider = new BasicCredentialsProvider(); // FIXME Password as string? BAD. Limited by apache API here however. Credentials creds = new UsernamePasswordCredentials(username, new String(password)); Arrays.fill(password, '\0'); AuthScope scope = new AuthScope(endpointURL.getHost(), endpointURL.getPort(), "Thermostat Realm"); credsProvider.setCredentials(scope, creds); synchronized (httpClientContextLock) { httpClientContext.setCredentialsProvider(credsProvider); } } } @Override public void setUrl(String url) { super.setUrl(url); endpoint = url; } @Override public String getUrl() { return endpoint; } } private static class WebDataStream extends InputStream { private CloseableHttpEntity entity; private InputStream content; WebDataStream(CloseableHttpEntity entity) { this.entity = entity; try { content = entity.getContent(); } catch (IllegalStateException | IOException e) { throw new StorageException(e); } } @Override public void close() throws IOException { content.close(); entity.close(); } @Override public int read() throws IOException { return content.read(); } @Override public int available() throws IOException { return content.available(); } @Override public void mark(int readlimit) { content.mark(readlimit); } @Override public boolean markSupported() { return content.markSupported(); } @Override public int read(byte[] b) throws IOException { return content.read(b); } @Override public int read(byte[] b, int off, int len) throws IOException { return content.read(b, off, len); } @Override public void reset() throws IOException { content.reset(); } @Override public long skip(long n) throws IOException { return content.skip(n); } } private class WebPreparedStatementImpl<T extends Pojo> extends WebPreparedStatement<T> { // The type of the query result objects we'd get back upon // statement execution private final transient Type parametrizedTypeToken; public WebPreparedStatementImpl(Type parametrizedTypeToken, int numParams, SharedStateId statementId) { super(numParams, statementId); this.parametrizedTypeToken = parametrizedTypeToken; } @Override public int execute() throws StatementExecutionException { return doWriteExecute(this, 0); } @Override public Cursor<T> executeQuery() throws StatementExecutionException { return doExecuteQuery(this, parametrizedTypeToken, 0); } } private String endpoint; private Map<Category<?>, SharedStateId> categoryIds; private Gson gson; // The shared http client we use for execution (uses the context below) private HttpClient httpClient; private Object httpClientContextLock = new Object(); // Http client execution context. Protected via clientContext lock. private HttpClientContext httpClientContext; private StorageCredentials creds; private SecureRandom random; private WebConnection conn; private WebPreparedStatementCache stmtCache; // Temporary cache used for recovering after a server endpoint re-deployment. // Will only be valid for 30 seconds for any server endpoint re-deployment. private ExpirableWebPreparedStatementCache transitionStmtCache; // for testing WebStorage(String url, StorageCredentials creds, HttpClient client) { init(url, creds, client); } // for testing WebStorage(WebPreparedStatementCache stmtCache, ExpirableWebPreparedStatementCache transitionCache) { this.stmtCache = stmtCache; this.transitionStmtCache = transitionCache; } // for testing WebStorage(Map<Category<?>, SharedStateId> categoryIds) { this.categoryIds = categoryIds; } public WebStorage(String url, StorageCredentials creds, SSLConfiguration sslConf) throws StorageException { PoolingHttpClientConnectionManager connManager = getPoolingHttpClientConnManager(sslConf, url); HttpClientBuilder builder = HttpClients.custom(); Lookup<AuthSchemeProvider> authProviders = RegistryBuilder.<AuthSchemeProvider>create() .register(AuthSchemes.BASIC, new BasicSchemeFactory()) .build(); // Set up client with default basic-auth scheme and pooled // connection manager. HttpClient client = builder.setConnectionManager(connManager) .setDefaultAuthSchemeRegistry(authProviders) .build(); init(url, creds, client); } private void init(String url, StorageCredentials creds, HttpClient client) { categoryIds = new HashMap<>(); gson = new GsonBuilder() .registerTypeAdapterFactory(new PojoTypeAdapterFactory()) .registerTypeAdapterFactory(new SharedStateIdTypeAdapterFactory()) .registerTypeAdapterFactory(new WebPreparedStatementResponseTypeAdapterFactory()) .registerTypeAdapterFactory(new WebQueryResponseTypeAdapterFactory()) .registerTypeAdapterFactory(new PreparedParameterTypeAdapterFactory()) .registerTypeAdapterFactory(new WebPreparedStatementTypeAdapterFactory()) .registerTypeAdapterFactory(new PreparedParametersTypeAdapterFactory()) .create(); httpClient = client; synchronized (httpClientContextLock) { httpClientContext = HttpClientContext.create(); } random = new SecureRandom(); conn = new WebConnection(); this.endpoint = url; this.creds = creds; this.stmtCache = new WebPreparedStatementCache(); } // package private for testing PoolingHttpClientConnectionManager getPoolingHttpClientConnManager(SSLConfiguration sslConf, String url) throws StorageException { ConnectionSocketFactory plainsf = new PlainConnectionSocketFactory(); RegistryBuilder<ConnectionSocketFactory> regBuilder = RegistryBuilder.<ConnectionSocketFactory>create() .register(HTTP_PREFIX, plainsf); try { // setup SSL if necessary if (url.startsWith(HTTPS_PREFIX)) { SSLContext sc = SSLContextFactory.getClientContext(sslConf); SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(sc); regBuilder = regBuilder.register(HTTPS_PREFIX, socketFactory); } } catch ( SslInitException e) { throw new StorageException(e); } Registry<ConnectionSocketFactory> r = regBuilder.build(); return new PoolingHttpClientConnectionManager(r); } private void ping() throws StorageException { post(endpoint + "/ping", (HttpEntity) null).close(); } private CloseableHttpEntity post(String url, List<NameValuePair> formparams) throws StorageException { try { return postImpl(url, formparams); } catch (IOException ex) { throw new StorageException(ex); } } private CloseableHttpEntity postImpl(String url, List<NameValuePair> formparams) throws IOException { HttpEntity entity; if (formparams != null) { entity = new UrlEncodedFormEntity(formparams, "UTF-8"); } else { entity = null; } return postImpl(url, entity); } private CloseableHttpEntity post(String url, HttpEntity entity) throws StorageException { try { return postImpl(url, entity, null); } catch (IOException ex) { throw new StorageException(ex); } } private CloseableHttpEntity post(String url, HttpEntity entity, RequestConfig config) throws StorageException { try { return postImpl(url, entity, config); } catch (IOException ex) { throw new StorageException(ex); } } private CloseableHttpEntity postImpl(String url, HttpEntity entity, RequestConfig config) throws IOException { HttpPost httpPost = new HttpPost(url); if (entity != null) { httpPost.setEntity(entity); } HttpResponse response = null; // The client context is not thread-safe. Thus protect execution // via the client context lock. synchronized(httpClientContextLock) { RequestConfig oldConfig = httpClientContext.getRequestConfig(); if (config != null) { httpClientContext.setRequestConfig(config); } try { response = httpClient.execute(httpPost, httpClientContext); } catch (Throwable e) { throw e; } finally { if (config != null) { httpClientContext.setRequestConfig(oldConfig); } } } StatusLine status = response.getStatusLine(); int responseCode = status.getStatusCode(); switch (responseCode) { case (STATUS_NO_CONTENT): // Let calling code handle STATUS_NO_CONTENT break; case (STATUS_OK): // Let calling code handle STATUS_OK break; default: // Properly consume the entity, thus closing the content stream, // by throwing this IOException sub-class. This is important for the // 403 and 500 status code cases. See: // http://hc.apache.org/httpcomponents-core-4.3.x/httpcore/apidocs/org/apache/http/util/EntityUtils.html#consume%28org.apache.http.HttpEntity%29 throw new EntityConsumingIOException(response.getEntity(), "Server returned status: " + status); } return new CloseableHttpEntity(response.getEntity(), responseCode); } private CloseableHttpEntity postImpl(String url, HttpEntity entity) throws StorageException { try { return postImpl(url, entity, null); } catch (IOException ex) { throw new StorageException(ex); } } private static InputStream getContent(HttpEntity entity) { try { return entity.getContent(); } catch (IOException ex) { throw new StorageException(ex); } } private static Reader getContentAsReader(HttpEntity entity) { InputStream in = getContent(entity); return new InputStreamReader(in); } @Override public void registerCategory(Category<?> category) throws StorageException { NameValuePair nameParam = new BasicNameValuePair("name", category.getName()); NameValuePair dataClassParam = new BasicNameValuePair("data-class", category.getDataClass().getName()); NameValuePair categoryParam = new BasicNameValuePair("category", gson.toJson(category)); List<NameValuePair> formparams = Arrays .asList(nameParam, categoryParam, dataClassParam); try (CloseableHttpEntity entity = post(endpoint + "/register-category", formparams)) { Reader reader = getContentAsReader(entity); SharedStateId id = gson.fromJson(reader, SharedStateId.class); categoryIds.put(category, id); } } /** * Executes a prepared query * * @param stmt * The prepared statement to execute * @param parametrizedTypeToken * The parametrized type token to use for deserialization. * Example as to how this was created: * <pre> * Type parametrizedTypeToken = new * TypeToken<WebQueryResponse<AgentInformation>>().getType(); * </pre> * @param invocationCount The number of recursive invocations performed so far. * @return A cursor for the generic type. * @throws StatementExecutionException * If execution of the statement failed. In particular, if * the state got out of sync, it tried to recover and then * failed again. */ <T extends Pojo> Cursor<T> doExecuteQuery(final WebPreparedStatement<T> stmt, Type parametrizedTypeToken, final int invocationCount) throws StatementExecutionException { checkRecursiveInvocationCount(invocationCount); NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class)); List<NameValuePair> formparams = Arrays.asList(queryParam); WebQueryResponse<T> qResp = null; try (CloseableHttpEntity entity = post(endpoint + "/query-execute", formparams)) { Reader reader = getContentAsReader(entity); qResp = gson.fromJson(reader, parametrizedTypeToken); } catch (Exception e) { throw new StatementExecutionException(e); } switch(qResp.getResponseCode()) { case PreparedStatementResponseCode.QUERY_SUCCESS: return new WebCursor<T>(this, qResp.getResultList(), qResp.hasMoreBatches(), qResp.getCursorId(), parametrizedTypeToken, stmt); case PreparedStatementResponseCode.ILLEGAL_PATCH: { String msg = "Illegal statement argument. See server logs for details."; IllegalArgumentException iae = new IllegalArgumentException(msg); IllegalPatchException e = new IllegalPatchException(iae); throw new StatementExecutionException(e); } case PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN: { // Try to recover from this situation. If this path is // entered more than once than we'll fail on method entry. try { WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt); return doExecuteQuery(newStmt, parametrizedTypeToken, invocationCount + 1); } catch (DescriptorParsingException e) { throw new StatementExecutionException(e); } } default: { String msg = "[query-execute] Unknown response from storage endpoint!"; IllegalStateException ise = new IllegalStateException(msg); throw new StatementExecutionException(ise); } } } private void checkRecursiveInvocationCount(int invocationCount) throws StatementExecutionException { if (invocationCount > 1) { // Initial invokation == 0, potential recovery-invocation == 1 String msg = "Failed to recover from out-of-sync state with server"; logger.log(Level.WARNING, msg); throw new StatementExecutionException(new IllegalStateException(msg)); } } /** * This method gets called from WebCursor in order to fetch more results * or refresh the result set since parameters like limit or skip have * changed since the original result set was fetched. * * @param cursorId * @param parametrizedTypeToken The type token for the data class (Pojo). * @param batchSize The desired batchSize or null. null means that the user * did not set an explicit batch size. * @param limit The desired limit for this cursor or null. null means that * a user did not set an explicit limit. * @param skip The desired skip value or null. null means no skip value has * been specified by the user. * @return */ <T extends Pojo> WebQueryResponse<T> getMore(int cursorId, Type parametrizedTypeToken, Integer batchSize, WebPreparedStatement<T> stmt) { String stmtId = gson.toJson(stmt.getStatementId()); NameValuePair preparedStmtIdParam = new BasicNameValuePair("prepared-stmt-id", stmtId); NameValuePair cursorIdParam = new BasicNameValuePair("cursor-id", Integer.toString(cursorId)); NameValuePair batchSizeParam = new BasicNameValuePair("batch-size", batchSize.toString()); List<NameValuePair> formparams = Arrays.asList(preparedStmtIdParam, cursorIdParam, batchSizeParam); WebQueryResponse<T> qResp = null; try (CloseableHttpEntity entity = post(endpoint + "/get-more", formparams)) { Reader reader = getContentAsReader(entity); qResp = gson.fromJson(reader, parametrizedTypeToken); } catch (Exception e) { throw new StorageException(e); } return qResp; } /** * Executes a prepared write * * @param stmt * The prepared statement to execute * @param invocationCount * The number of times this method has been recursively called, * starting at 0. * @return The response code of executing the underlying data modifying * statement. * @throws StatementExecutionException * If execution of the statement failed. For example if the * values set as prepared parameters did not work or were * partially missing for the prepared statement. */ <T extends Pojo> int doWriteExecute(final WebPreparedStatement<T> stmt, final int invocationCount) throws StatementExecutionException { checkRecursiveInvocationCount(invocationCount); NameValuePair queryParam = new BasicNameValuePair("prepared-stmt", gson.toJson(stmt, WebPreparedStatement.class)); List<NameValuePair> formparams = Arrays.asList(queryParam); int responseCode = PreparedStatementResponseCode.WRITE_GENERIC_FAILURE; try (CloseableHttpEntity entity = post(endpoint + "/write-execute", formparams)) { Reader reader = getContentAsReader(entity); responseCode = gson.fromJson(reader, int.class); } catch (Exception e) { throw new StatementExecutionException(e); } if (responseCode == PreparedStatementResponseCode.ILLEGAL_PATCH) { String msg = "Illegal statement argument. See server logs for details. Invokation count: " + invocationCount; IllegalArgumentException iae = new IllegalArgumentException(msg); IllegalPatchException e = new IllegalPatchException(iae); throw new StatementExecutionException(e); } else if (responseCode == PreparedStatementResponseCode.PREP_STMT_BAD_STOKEN) { // Try to recover from this situation. If this path is // entered more than once than we'll fail on method entry. try { WebPreparedStatement<T> newStmt = handlePreparedStmtStateOutOfSync(stmt); return doWriteExecute(newStmt, invocationCount + 1); } catch (DescriptorParsingException e) { throw new StatementExecutionException(e); } } return responseCode; } @Override public Connection getConnection() { return conn; } @Override public InputStream loadFile(String name) throws StorageException { NameValuePair fileParam = new BasicNameValuePair("file", name); List<NameValuePair> formparams = Arrays.asList(fileParam); CloseableHttpEntity entity = post(endpoint + "/load-file", formparams); if (entity.getResponseCode() == STATUS_NO_CONTENT) { return null; } return new WebDataStream(entity); } @Override public void saveFile(String name, InputStream in) throws StorageException { InputStreamBody body = new InputStreamBody(in, name); MultipartEntityBuilder builder = MultipartEntityBuilder.create(); HttpEntity mpEntity = builder.addPart("file", body).build(); // See IcedTea bug #1314. For safe-file we need to do this: // setExcpectContinueEnabled. However, // doing this for other actions messes up authentication when using // jetty (and possibly others). Hence, do this expect-continue thingy // only for save-file. We achieve this via a single request configuration. RequestConfig config = RequestConfig.custom().setExpectContinueEnabled(true).build(); post(endpoint + "/save-file", mpEntity, config).close(); } @Override public void purge(String agentId) throws StorageException { NameValuePair agentIdParam = new BasicNameValuePair("agentId", agentId); List<NameValuePair> agentIdParams = Arrays.asList(agentIdParam); post(endpoint + "/purge", agentIdParams).close(); } @Override public AuthToken generateToken(String actionName) throws StorageException { byte[] clientToken = new byte[256]; random.nextBytes(clientToken); NameValuePair clientTokenParam = new BasicNameValuePair("client-token", Base64.encodeBase64String(clientToken)); NameValuePair actionNameParam = new BasicNameValuePair("action-name", Objects.requireNonNull(actionName)); List<NameValuePair> formparams = Arrays.asList(clientTokenParam, actionNameParam); try (CloseableHttpEntity entity = post(endpoint + "/generate-token", formparams)) { byte[] authToken = EntityUtils.toByteArray(entity); return new AuthToken(authToken, clientToken); } catch (IOException ex) { throw new StorageException(ex); } } @Override public boolean verifyToken(AuthToken authToken, String actionName) { byte[] clientToken = authToken.getClientToken(); byte[] token = authToken.getToken(); NameValuePair clientTokenParam = new BasicNameValuePair("client-token", Base64.encodeBase64String(clientToken)); NameValuePair tokenParam = new BasicNameValuePair("token", Base64.encodeBase64String(token)); NameValuePair actionNameParam = new BasicNameValuePair("action-name", Objects.requireNonNull(actionName)); List<NameValuePair> formparams = Arrays.asList(clientTokenParam, tokenParam, actionNameParam); HttpResponse response = null; try { HttpEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8"); HttpPost httpPost = new HttpPost(endpoint + "/verify-token"); httpPost.setEntity(entity); synchronized (httpClientContextLock) { response = httpClient.execute(httpPost, httpClientContext); } StatusLine status = response.getStatusLine(); return status.getStatusCode() == STATUS_OK; } catch (IOException ex) { throw new StorageException(ex); } finally { if (response != null) { try { EntityUtils.consume(response.getEntity()); } catch (IOException ex) { throw new StorageException(ex); } } } } @Override public void shutdown() { // Nothing to do here. } SharedStateId getCategoryId(Category<?> category) { return categoryIds.get(category); } /** * Package private for testing * * This method handles the recovery mechanism which needs to be done before * an already failed {@link WebPreparedStatement} can be re-submitted * because some state maintained in the client (here) and on the server * need to be in agreement. * * Here is how the recovery mechanism works: * * Pre: client and server agree on an ID for every statement. Any single * statement is uniquely identifiable via the (server-token, int-id) * pair. When this method is called we already know that when we first * tried to execute the statement we had an out-dated server-token in * record. Thus, we need to refresh the local cache with updated * statement IDs. * * Getting the local cache back in sync can be done by: * 1. Removing the old values from the current statement cache and * 2. Re-registering the underlying category and re-preparing the statement * * The above two steps will be done once per statement descriptor. This will * update the statement cache accordingly. However, since there may be other * statements in the local queue waiting to be executed. Those pending * statements still have old statement IDs in record. This is where the * transition cache comes into play. There is no need to re-register categories * and re-prepare statements for the same descriptor. It was already done * once and the main statement cache updated accordingly. The transition cache is then used * to get the descriptor from an old statement ID. I.e. whenever the transition * cache is used it is no longer equal to the main statement cache. In a way * the transition cache is a tool to get a descriptor for an now out-dated * statement ID. Once we have the descriptor again we can look it up in the * regular statement cache in (which has been updated previously) in order * to get the updated values for the statement ID. * * @param origStmt The original statement that failed to execute. * @return A fixed-up statement which should succeed to execute if tried * again. * @throws DescriptorParsingException If re-preparing a statement failed. */ synchronized <T extends Pojo> WebPreparedStatement<T> handlePreparedStmtStateOutOfSync(final WebPreparedStatement<T> origStmt) throws DescriptorParsingException { SharedStateId id = origStmt.getStatementId(); String msg = "Prepared statement failed to execute. Server changed token. Trying to recover stmt with id: " + id; logger.log(Level.FINE, msg); // Transition stmt cache needs to be created in 2 cases: // 1. It might be null if it was the first time the server // re-deployed. // 2. The server did re-deploy at least once and the time it happened // is more than TRANSITION_CACHE_OFFSET in the past. Case for // multiple re-deployments of the server parts. if (transitionStmtCache == null || transitionStmtCache.isExpired()) { // Create a transition cache which expires soon in the future // in order to allow successful executions of queued statements which // did not yet run and have old statement IDs in record. logger.log(Level.FINE, "Re-creating transition cache"); WebPreparedStatementCache cacheSnapshot = stmtCache.createSnapshot(); long timeExpires = System.nanoTime() + TRANSITION_CACHE_OFFSET; transitionStmtCache = new ExpirableWebPreparedStatementCache(cacheSnapshot, timeExpires); } StatementDescriptor<T> desc = stmtCache.get(id); // If the above returned null we most likely tried to execute a statement // with an old server token. Attempt to use the transition cache in order // to still be able to execute it successfully. if (desc == null) { desc = transitionStmtCache.get(id); if (desc == null) { throw new IllegalStateException("Irrecoverable error. GC happened or transition cache expired."); } WebPreparedStatementHolder transCacheHolder = transitionStmtCache.get(desc); WebPreparedStatementHolder cacheHolder = stmtCache.get(desc); if (transCacheHolder.getStatementId().equals(cacheHolder.getStatementId())) { throw new IllegalStateException("Should not happen!"); } // Transition case: // // Fetch the new mapping from the stmt cache since the statement id // must have changed but category-registration and preparing the // updated statement was done already. SharedStateId stmtId = cacheHolder.getStatementId(); logger.log(Level.FINE, "Returning fixed-up statement using updated statement id: " + stmtId); origStmt.setStatementId(stmtId); return origStmt; } // Base case: re-register category and re-prepare statement. This will // be done *once* for every statement. logger.log(Level.FINE, "Re-register category + prepareStatement + setting params: " + desc); sendCategoryReRegistrationRequest(desc.getCategory()); stmtCache.remove(id); // prepareStatement() will return a raw statement (no parameters will be // set in this new datastructure). In order to make it executable right // away we need to set the params via the params we have in record in the // original stmt. WebPreparedStatement<T> newStmt = (WebPreparedStatement<T>)prepareStatement(desc); newStmt.setParams(origStmt.getParams()); return newStmt; } @Override public <T extends Pojo> PreparedStatement<T> prepareStatement(StatementDescriptor<T> desc) throws DescriptorParsingException { /* * Avoid two network round-trips for statements which have already * been prepared. Note that this makes preparing statements not entirely * stateless, since the prepared statement ID might change if the * web endpoint reloads. If those IDs get out-of-sync we do our best * to correct this situation by clearing the relevant cache entry and * preparing the statement again. */ WebPreparedStatementHolder holder = stmtCache.get(desc); // note this is a WeakHashMap-backed cache and may return null if (holder == null) { // Cache-miss, send request over the wire and cache result. holder = sendPrepareStmtRequest(desc, 0); stmtCache.put(desc, holder); } return new WebPreparedStatementImpl<>(holder.getTypeToken(), holder.getNumParams(), holder.getStatementId()); } // package-private for testing <T extends Pojo> WebPreparedStatementHolder sendPrepareStmtRequest(StatementDescriptor<T> desc, final int invokationCount) throws DescriptorParsingException { if (invokationCount > 1) { // Initial invokation == 0, potential recovery-invocation == 1 String msg = "Failed to recover from out-of-sync state with server"; logger.log(Level.WARNING, msg); throw new DescriptorParsingException(msg); } String strDesc = desc.getDescriptor(); SharedStateId categoryId = getCategoryId(desc.getCategory()); NameValuePair nameParam = new BasicNameValuePair("query-descriptor", strDesc); NameValuePair categoryParam = new BasicNameValuePair("category-id", gson.toJson(categoryId, SharedStateId.class)); List<NameValuePair> formparams = Arrays .asList(nameParam, categoryParam); try (CloseableHttpEntity entity = post(endpoint + "/prepare-statement", formparams)) { Reader reader = getContentAsReader(entity); WebPreparedStatementResponse result = gson.fromJson(reader, WebPreparedStatementResponse.class); int numParams = result.getNumFreeVariables(); SharedStateId statementId = result.getStatementId(); int stmtId = statementId.getId(); switch (stmtId) { case WebPreparedStatementResponse.ILLEGAL_STATEMENT: { // we've got a descriptor the endpoint doesn't know about or // refuses to accept for security reasons. String msg = "Unknown query descriptor which endpoint of " + WebStorage.class.getName() + " refused to accept!"; throw new IllegalDescriptorException(msg, desc.getDescriptor()); } case WebPreparedStatementResponse.DESCRIPTOR_PARSE_FAILED: { String msg = "Statement descriptor failed to parse. " + "Please check server logs for details!"; throw new DescriptorParsingException(msg); } case WebPreparedStatementResponse.CATEGORY_OUT_OF_SYNC: { // We tried to prepare a statement and the server's // representation of category IDs changed. Thus, be sure to // clear the category state and get their new IDs. String msg = "Preparing statement failed. Server changed category state. Clearing category ID for statement: " + desc.getDescriptor() + " and trying to recover."; logger.log(Level.FINE, msg); sendCategoryReRegistrationRequest(desc.getCategory()); return sendPrepareStmtRequest(desc, invokationCount + 1); } default: { // Common case where stmtId is the actual ID of the statement // and not an error code. assert(stmtId >= 0); // negative values are error codes // We need this ugly trick in order for WebQueryResponse // deserialization to work properly. I.e. GSON needs this type // info hint. Class<T> dataClass = desc.getCategory().getDataClass(); Type typeToken = new WebQueryResponse<T>().getRuntimeParametrizedType(dataClass); return new WebPreparedStatementHolder(typeToken, numParams, statementId); } } } } // package private for testing synchronized <T extends Pojo> void sendCategoryReRegistrationRequest(Category<T> category) { // There are two possible cases. Category is an aggregate category or // it is not. For aggregate categories we need to re-register the // original first and then the aggregate category. Class<T> dataClass = category.getDataClass(); if (AggregateResult.class.isAssignableFrom(dataClass)) { Category<?> nonAggregateCategory = Categories.getByName(category.getName()); categoryIds.remove(nonAggregateCategory); registerCategory(nonAggregateCategory); } categoryIds.remove(category); registerCategory(category); } }