Mercurial > hg > release > thermostat-0.9
changeset 61:a4ac69374d2f
Add chunk updating to storage layer and use it to push vm stop time.
author | Jon VanAlten <jon.vanalten@redhat.com> |
---|---|
date | Tue, 31 Jan 2012 12:01:59 -0500 |
parents | a55a4ed7e6b6 |
children | c1cc0b37e8c1 |
files | src/com/redhat/thermostat/agent/storage/MongoStorage.java src/com/redhat/thermostat/agent/storage/Storage.java src/com/redhat/thermostat/backend/Backend.java src/com/redhat/thermostat/backend/system/JvmStatHostListener.java src/com/redhat/thermostat/backend/system/SystemBackend.java |
diffstat | 5 files changed, 129 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/src/com/redhat/thermostat/agent/storage/MongoStorage.java Mon Jan 30 13:42:10 2012 -0500 +++ b/src/com/redhat/thermostat/agent/storage/MongoStorage.java Tue Jan 31 12:01:59 2012 -0500 @@ -66,11 +66,13 @@ public class MongoStorage extends Storage { public static final String KEY_AGENT_ID = "agent-id"; + public static final String SET_MODIFIER = "$set"; private static final Logger logger = LoggingUtils.getLogger(MongoStorage.class); private Mongo mongo = null; private DB db = null; + private Map<String, DBCollection> collectionCache = new HashMap<String, DBCollection>(); private UUID agentId = null; @@ -79,7 +81,7 @@ connect(new MongoURI(uri)); } - public void connect(MongoURI uri) throws UnknownHostException { + private void connect(MongoURI uri) throws UnknownHostException { mongo = new Mongo(uri); db = mongo.getDB(StorageConstants.THERMOSTAT_DB_NAME); } @@ -123,17 +125,17 @@ } @Override - protected void addChunkImpl(Chunk chunk) { + protected void putChunkImpl(Chunk chunk) { Category cat = chunk.getCategory(); - DBCollection coll = db.getCollection(cat.getName()); + DBCollection coll = getCachedCollection(cat.getName()); BasicDBObject toInsert = getAgentDBObject(); - BasicDBObject toDelete = null; + BasicDBObject replaceKey = null; boolean replace = chunk.getReplace(); Map<String, BasicDBObject> nestedParts = new HashMap<String, BasicDBObject>(); - Map<String, BasicDBObject> deleteNestedParts = null; + Map<String, BasicDBObject> replaceKeyNestedParts = null; if (replace) { - toDelete = getAgentDBObject(); - deleteNestedParts = new HashMap<String, BasicDBObject>(); + replaceKey = getAgentDBObject(); + replaceKeyNestedParts = new HashMap<String, BasicDBObject>(); } for (Iterator<com.redhat.thermostat.agent.storage.Key> iter = cat.getEntryIterator(); iter.hasNext();) { com.redhat.thermostat.agent.storage.Key key = iter.next(); @@ -142,24 +144,30 @@ if (entryParts.length == 2) { BasicDBObject nested = nestedParts.get(entryParts[0]); if (nested == null) { + if (isKey) { + throwMissingKey(key.getName()); + } nested = new BasicDBObject(); nestedParts.put(entryParts[0], nested); } nested.append(entryParts[1], chunk.get(key)); if (replace && isKey) { - BasicDBObject deleteNested = deleteNestedParts.get(entryParts[0]); - if (deleteNested == null) { - deleteNested = new BasicDBObject(); - deleteNestedParts.put(entryParts[0], deleteNested); + BasicDBObject replaceKeyNested = replaceKeyNestedParts.get(entryParts[0]); + if (replaceKeyNested == null) { + replaceKeyNested = new BasicDBObject(); + replaceKeyNestedParts.put(entryParts[0], replaceKeyNested); } - deleteNested.append(entryParts[1], deleteNested); + replaceKeyNested.append(entryParts[1], replaceKeyNested); } } else { String mongoKey = key.getName(); String value = chunk.get(key); + if ((value == null) && isKey) { + throwMissingKey(key.getName()); + } toInsert.append(mongoKey, value); if (replace && isKey) { - toDelete.append(mongoKey, value); + replaceKey.append(mongoKey, value); } } } @@ -167,12 +175,83 @@ toInsert.append(mongoKey, nestedParts.get(mongoKey)); } if (replace) { - for (String mongoKey : deleteNestedParts.keySet()) { - toDelete.append(mongoKey, deleteNestedParts.get(mongoKey)); + for (String mongoKey : replaceKeyNestedParts.keySet()) { + replaceKey.append(mongoKey, replaceKeyNestedParts.get(mongoKey)); } - coll.remove(toDelete); + coll.update(replaceKey, toInsert, true, false); + } else { + coll.insert(toInsert); } - coll.insert(toInsert); + } + + @Override + protected void updateChunkImpl(Chunk chunk) { + Category cat = chunk.getCategory(); + DBCollection coll = getCachedCollection(cat.getName()); + BasicDBObject toUpdate = new BasicDBObject(); + BasicDBObject updateKey = getAgentDBObject(); + Map<String, BasicDBObject> nestedParts = new HashMap<String, BasicDBObject>(); + Map<String, BasicDBObject> updateKeyNestedParts = new HashMap<String, BasicDBObject>(); + for (Iterator<com.redhat.thermostat.agent.storage.Key> iter = cat.getEntryIterator(); iter.hasNext();) { + com.redhat.thermostat.agent.storage.Key key = iter.next(); + boolean isKey = key.isPartialCategoryKey(); + String[] entryParts = key.getName().split("\\."); + if (entryParts.length == 2) { + BasicDBObject nested = nestedParts.get(entryParts[0]); + if (nested == null) { + if (isKey) { + throwMissingKey(key.getName()); + } + } else { + if (isKey) { + BasicDBObject updateKeyNested = updateKeyNestedParts.get(entryParts[0]); + if (updateKeyNested == null) { + updateKeyNested = new BasicDBObject(); + updateKeyNestedParts.put(entryParts[0], updateKeyNested); + } + updateKeyNested.append(entryParts[1], updateKeyNested); + } else { + nested.append(SET_MODIFIER, new BasicDBObject(entryParts[1], chunk.get(key))); + } + } + } else { + String mongoKey = key.getName(); + String value = chunk.get(key); + if (value == null) { + if (isKey) { + throwMissingKey(key.getName()); + } + } else { + if (isKey) { + updateKey.append(mongoKey, value); + } else { + toUpdate.append(SET_MODIFIER, new BasicDBObject(mongoKey, value)); + } + } + } + } + for (String mongoKey : nestedParts.keySet()) { + toUpdate.append(mongoKey, nestedParts.get(mongoKey)); + } + for (String mongoKey : updateKeyNestedParts.keySet()) { + updateKey.append(mongoKey, updateKeyNestedParts.get(mongoKey)); + } + coll.update(updateKey, toUpdate); + } + + private void throwMissingKey(String keyName) { + throw new IllegalArgumentException("Attempt to insert chunk with incomplete partial key. Missing: " + keyName); + } + + private DBCollection getCachedCollection(String collName) { + DBCollection coll = collectionCache.get(collName); + if (coll == null) { + coll = db.getCollection(collName); + if (coll != null) { + collectionCache.put(collName, coll); + } + } + return coll; } private DBObject createConfigDBObject(StartupConfiguration config, BackendRegistry registry) {
--- a/src/com/redhat/thermostat/agent/storage/Storage.java Mon Jan 30 13:42:10 2012 -0500 +++ b/src/com/redhat/thermostat/agent/storage/Storage.java Tue Jan 31 12:01:59 2012 -0500 @@ -73,12 +73,24 @@ } public final void putChunk(Chunk chunk, Backend backend) { + validateChunkOrigin(chunk, backend); + putChunkImpl(chunk); + } + + public final void updateChunk(Chunk chunk, Backend backend) { + validateChunkOrigin(chunk, backend); + updateChunkImpl(chunk); + } + + private void validateChunkOrigin(Chunk chunk, Backend origin) { Category category = chunk.getCategory(); - if (backend != categoryMap.get(category.getName())) { // This had better be not just equivalent, but actually the same object. - throw new IllegalArgumentException("Invalid category-backend combination while inserting data. Category: " + category.getName() + " Backend: " + backend.getName()); + if (origin != categoryMap.get(category.getName())) { // This had better be not just equivalent, but actually the same object. + throw new IllegalArgumentException("Invalid category-backend combination while inserting data. Category: " + category.getName() + " Backend: " + origin.getName()); } - addChunkImpl(chunk); } - protected abstract void addChunkImpl(Chunk chunk); + protected abstract void putChunkImpl(Chunk chunk); + + protected abstract void updateChunkImpl(Chunk chunk); + }
--- a/src/com/redhat/thermostat/backend/Backend.java Mon Jan 30 13:42:10 2012 -0500 +++ b/src/com/redhat/thermostat/backend/Backend.java Tue Jan 31 12:01:59 2012 -0500 @@ -191,4 +191,8 @@ public final void store(Chunk chunk) { storage.putChunk(chunk, this); } + + public final void update(Chunk chunk) { + storage.updateChunk(chunk, this); + } }
--- a/src/com/redhat/thermostat/backend/system/JvmStatHostListener.java Mon Jan 30 13:42:10 2012 -0500 +++ b/src/com/redhat/thermostat/backend/system/JvmStatHostListener.java Tue Jan 31 12:01:59 2012 -0500 @@ -122,13 +122,12 @@ logger.warning("Disconnected from host"); } + @SuppressWarnings("unchecked") // Unchecked casts to (Set<Integer>). @Override public void vmStatusChanged(VmStatusChangeEvent event) { MonitoredHost host = event.getMonitoredHost(); - Iterator<Integer> newActive = event.getStarted().iterator(); - while (newActive.hasNext()) { - Integer newVm = newActive.next(); + for (Integer newVm : (Set<Integer>) event.getStarted()) { try { logger.fine("New vm: " + newVm); sendNewVM(newVm, host); @@ -139,9 +138,7 @@ } } - Iterator<Integer> newStopped = event.getTerminated().iterator(); - while (newStopped.hasNext()) { - Integer stoppedVm = newStopped.next(); + for (Integer stoppedVm : (Set<Integer>) event.getTerminated()) { try { logger.fine("stopped vm: " + stoppedVm); sendStoppedVM(stoppedVm, host); @@ -195,11 +192,12 @@ VmIdentifier resolvedVmID = host.getHostIdentifier().resolve( new VmIdentifier(vmId.toString())); if (resolvedVmID != null) { + long stopTime = System.currentTimeMillis(); listenerMap.remove(vmId); for (JvmStatusListener statusListener : statusListeners) { statusListener.jvmStopped(vmId); } - // TODO store updated vminfo chunk with stop time. + backend.update(makeVmInfoUpdateStoppedChunk(vmId, stopTime)); } } @@ -225,6 +223,13 @@ return chunk; } + private Chunk makeVmInfoUpdateStoppedChunk(int vmId, long stopTimeStamp) { + Chunk chunk = new Chunk(vmInfoCategory, false); + chunk.put(vmInfoIdKey, String.valueOf(vmId)); + chunk.put(vmInfoStopTimeKey, String.valueOf(stopTimeStamp)); + return chunk; + } + @Override public void addJvmStatusListener(JvmStatusListener listener) { statusListeners.add(listener);
--- a/src/com/redhat/thermostat/backend/system/SystemBackend.java Mon Jan 30 13:42:10 2012 -0500 +++ b/src/com/redhat/thermostat/backend/system/SystemBackend.java Tue Jan 31 12:01:59 2012 -0500 @@ -266,7 +266,7 @@ } private Chunk makeCpuChunk(CpuStat cpuStat) { - Chunk chunk = new Chunk(/* cpuStat.getTimeStamp(), */cpuStatCategory, false); + Chunk chunk = new Chunk(cpuStatCategory, false); chunk.put(Key.TIMESTAMP, Long.toString(cpuStat.getTimeStamp())); chunk.put(cpu5LoadKey, Double.toString(cpuStat.getLoad5())); chunk.put(cpu10LoadKey, Double.toString(cpuStat.getLoad10()));