Mercurial > hg > release > thermostat-0.5
view storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java @ 900:916ea399eb6b
Consolidate Storage.putPojo() to replace and add operations.
Reviewed-by: vanaltj
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-December/004896.html
author | Roman Kennke <rkennke@redhat.com> |
---|---|
date | Thu, 10 Jan 2013 14:53:32 +0100 |
parents | 29f815f1bbed |
children | eef06eccb01b |
line wrap: on
line source
/* * Copyright 2012 Red Hat, Inc. * * This file is part of Thermostat. * * Thermostat is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published * by the Free Software Foundation; either version 2, or (at your * option) any later version. * * Thermostat is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Thermostat; see the file COPYING. If not see * <http://www.gnu.org/licenses/>. * * Linking this code with other modules is making a combined work * based on this code. Thus, the terms and conditions of the GNU * General Public License cover the whole combination. * * As a special exception, the copyright holders of this code give * you permission to link this code with independent modules to * produce an executable, regardless of the license terms of these * independent modules, and to copy and distribute the resulting * executable under terms of your choice, provided that you also * meet, for each linked independent module, the terms and conditions * of the license of that module. An independent module is a module * which is not derived from or based on this code. If you modify * this code, you may extend this exception to your version of the * library, but you are not obligated to do so. If you do not wish * to do so, delete this exception statement from your version. */ package com.redhat.thermostat.storage.mongodb.internal; import java.io.InputStream; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.gridfs.GridFSInputFile; import com.redhat.thermostat.storage.config.StartupConfiguration; import com.redhat.thermostat.storage.core.AbstractQuery.Sort; import com.redhat.thermostat.storage.core.Add; import com.redhat.thermostat.storage.core.BasePut; import com.redhat.thermostat.storage.core.Category; import com.redhat.thermostat.storage.core.Connection; import com.redhat.thermostat.storage.core.Connection.ConnectionListener; import com.redhat.thermostat.storage.core.Connection.ConnectionStatus; import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.Key; import com.redhat.thermostat.storage.core.Query; import com.redhat.thermostat.storage.core.Remove; import com.redhat.thermostat.storage.core.Replace; import com.redhat.thermostat.storage.core.Storage; import com.redhat.thermostat.storage.core.Update; import com.redhat.thermostat.storage.model.Pojo; /** * Implementation of the Storage interface that uses MongoDB to store the instrumentation data. * * In this implementation, each CATEGORY is given a distinct collection. */ public class MongoStorage implements Storage { public static final String SET_MODIFIER = "$set"; private class MongoAdd extends BasePut implements Add { @Override public void apply() { addImpl(getCategory(), getPojo()); } } private class MongoReplace extends BasePut implements Replace { @Override public void apply() { replaceImpl(getCategory(), getPojo()); } } private MongoConnection conn; private DB db = null; private Map<String, DBCollection> collectionCache = new HashMap<String, DBCollection>(); private UUID agentId; public MongoStorage(StartupConfiguration conf) { conn = new MongoConnection(conf); conn.addListener(new ConnectionListener() { @Override public void changed(ConnectionStatus newStatus) { switch (newStatus) { case DISCONNECTED: db = null; case CONNECTED: db = conn.getDB(); default: // ignore other status events } } }); } @Override public Connection getConnection() { return conn; } @Override public void setAgentId(UUID agentId) { this.agentId = agentId; } @Override public String getAgentId() { return agentId.toString(); } @Override public Add createAdd(Category into) { MongoAdd add = new MongoAdd(); add.setCategory(into); return add; } @Override public Replace createReplace(Category into) { MongoReplace replace = new MongoReplace(); replace.setCategory(into); return replace; } private void addImpl(final Category cat, final Pojo pojo) { DBCollection coll = getCachedCollection(cat); DBObject toInsert = preparePut(pojo); coll.insert(toInsert); } private void replaceImpl(final Category cat, final Pojo pojo) { DBCollection coll = getCachedCollection(cat); DBObject toInsert = preparePut(pojo); DBObject query = new BasicDBObject(); Collection<Key<?>> keys = cat.getKeys(); for (Key<?> key : keys) { if (key.isPartialCategoryKey()) { String name = key.getName(); query.put(name, toInsert.get(name)); } } coll.update(query, toInsert, true, false); } private DBObject preparePut(final Pojo pojo) { MongoPojoConverter converter = new MongoPojoConverter(); DBObject toInsert = converter.convertPojoToMongo(pojo); if (toInsert.get(Key.AGENT_ID.getName()) == null) { toInsert.put(Key.AGENT_ID.getName(), getAgentId()); } return toInsert; } @Override public void updatePojo(Update update) { assert update instanceof MongoUpdate; MongoUpdate mongoUpdate = (MongoUpdate) update; Category cat = mongoUpdate.getCategory(); DBCollection coll = getCachedCollection(cat); DBObject query = mongoUpdate.getQuery(); DBObject values = mongoUpdate.getValues(); coll.update(query, values); } @Override public void removePojo(Remove remove) { assert (remove instanceof MongoRemove); MongoRemove mongoRemove = (MongoRemove) remove; DBObject query = mongoRemove.getQuery(); Category category = mongoRemove.getCategory(); DBCollection coll = getCachedCollection(category); coll.remove(query); } private DBCollection getCachedCollection(Category category) { String collName = category.getName(); DBCollection coll = collectionCache.get(collName); if (coll == null && db.collectionExists(collName)) { throw new IllegalStateException("Categories need to be registered before being used"); } return coll; } // TODO: This method is only temporary to enable tests, until we come up with a better design, // in particular, the collection should be stored in the category itself. It must not be called // from production code. void mapCategoryToDBCollection(Category category, DBCollection coll) { collectionCache.put(category.getName(), coll); } @Override public void purge() { BasicDBObject query = new BasicDBObject(Key.AGENT_ID.getName(), getAgentId()); for (DBCollection coll : collectionCache.values()) { coll.remove(query); } } @Override public void registerCategory(Category category) { String name = category.getName(); if (collectionCache.containsKey(name)) { throw new IllegalStateException("Category may only be associated with one backend."); } DBCollection coll; if (! db.collectionExists(name)) { coll = db.createCollection(name, new BasicDBObject("capped", false)); } else { coll = db.getCollection(name); } collectionCache.put(name, coll); } @Override public Query createQuery() { return new MongoQuery(); } @Override public Update createUpdate() { return new MongoUpdate(); } @Override public Remove createRemove() { return new MongoRemove(); } @Override public <T extends Pojo> Cursor<T> findAllPojos(Query query, Class<T> resultClass) { MongoQuery mongoQuery = checkAndCastQuery(query); DBCollection coll = getCachedCollection(mongoQuery.getCategory()); DBCursor dbCursor; if (mongoQuery.hasClauses()) { dbCursor = coll.find(mongoQuery.getGeneratedQuery()); } else { dbCursor = coll.find(); } dbCursor = applySortAndLimit(mongoQuery, dbCursor); return new MongoCursor<T>(dbCursor, resultClass); } private DBCursor applySortAndLimit(MongoQuery query, DBCursor dbCursor) { BasicDBObject orderBy = new BasicDBObject(); List<Sort> sorts = query.getSorts(); for (Sort sort : sorts) { orderBy.append(sort.getKey().getName(), sort.getDirection().getValue()); } dbCursor.sort(orderBy); int limit = query.getLimit(); if (limit > 0) { dbCursor.limit(limit); } return dbCursor; } @Override public <T extends Pojo> T findPojo(Query query, Class<T> resultClass) { MongoQuery mongoQuery = checkAndCastQuery(query); DBCollection coll = getCachedCollection(mongoQuery.getCategory()); DBObject dbResult = coll.findOne(mongoQuery.getGeneratedQuery()); MongoPojoConverter conv = new MongoPojoConverter(); return conv.convertMongoToPojo(dbResult, resultClass); } private MongoQuery checkAndCastQuery(Query query) { if (!(query instanceof MongoQuery)) { throw new IllegalArgumentException("MongoStorage can only handle MongoQuery"); } return (MongoQuery) query; } @Override public long getCount(Category category) { DBCollection coll = getCachedCollection(category); if (coll != null) { return coll.getCount(); } return 0L; } @Override public void saveFile(String filename, InputStream data) { GridFS gridFS = new GridFS(db); GridFSInputFile inputFile = gridFS.createFile(data, filename); inputFile.save(); } @Override public InputStream loadFile(String filename) { GridFS gridFS = new GridFS(db); GridFSDBFile file = gridFS.findOne(filename); if (file == null) { return null; } else { return file.getInputStream(); } } @Override public void shutdown() { // Nothing to do here. } }