changeset 61:a4ac69374d2f

Add chunk updating to storage layer and use it to push vm stop time.
author Jon VanAlten <jon.vanalten@redhat.com>
date Tue, 31 Jan 2012 12:01:59 -0500
parents a55a4ed7e6b6
children c1cc0b37e8c1
files src/com/redhat/thermostat/agent/storage/MongoStorage.java src/com/redhat/thermostat/agent/storage/Storage.java src/com/redhat/thermostat/backend/Backend.java src/com/redhat/thermostat/backend/system/JvmStatHostListener.java src/com/redhat/thermostat/backend/system/SystemBackend.java
diffstat 5 files changed, 129 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/src/com/redhat/thermostat/agent/storage/MongoStorage.java	Mon Jan 30 13:42:10 2012 -0500
+++ b/src/com/redhat/thermostat/agent/storage/MongoStorage.java	Tue Jan 31 12:01:59 2012 -0500
@@ -66,11 +66,13 @@
 public class MongoStorage extends Storage {
 
     public static final String KEY_AGENT_ID = "agent-id";
+    public static final String SET_MODIFIER = "$set";
 
     private static final Logger logger = LoggingUtils.getLogger(MongoStorage.class);
 
     private Mongo mongo = null;
     private DB db = null;
+    private Map<String, DBCollection> collectionCache = new HashMap<String, DBCollection>();
 
     private UUID agentId = null;
 
@@ -79,7 +81,7 @@
         connect(new MongoURI(uri));
     }
 
-    public void connect(MongoURI uri) throws UnknownHostException {
+    private void connect(MongoURI uri) throws UnknownHostException {
         mongo = new Mongo(uri);
         db = mongo.getDB(StorageConstants.THERMOSTAT_DB_NAME);
     }
@@ -123,17 +125,17 @@
     }
 
     @Override
-    protected void addChunkImpl(Chunk chunk) {
+    protected void putChunkImpl(Chunk chunk) {
         Category cat = chunk.getCategory();
-        DBCollection coll = db.getCollection(cat.getName());
+        DBCollection coll = getCachedCollection(cat.getName());
         BasicDBObject toInsert = getAgentDBObject();
-        BasicDBObject toDelete = null;
+        BasicDBObject replaceKey = null;
         boolean replace = chunk.getReplace();
         Map<String, BasicDBObject> nestedParts = new HashMap<String, BasicDBObject>();
-        Map<String, BasicDBObject> deleteNestedParts = null;
+        Map<String, BasicDBObject> replaceKeyNestedParts = null;
         if (replace) {
-            toDelete = getAgentDBObject();
-            deleteNestedParts = new HashMap<String, BasicDBObject>();
+            replaceKey = getAgentDBObject();
+            replaceKeyNestedParts = new HashMap<String, BasicDBObject>();
         }
         for (Iterator<com.redhat.thermostat.agent.storage.Key> iter = cat.getEntryIterator(); iter.hasNext();) {
             com.redhat.thermostat.agent.storage.Key key = iter.next();
@@ -142,24 +144,30 @@
             if (entryParts.length == 2) {
                 BasicDBObject nested = nestedParts.get(entryParts[0]);
                 if (nested == null) {
+                    if (isKey) {
+                        throwMissingKey(key.getName());
+                    }
                     nested = new BasicDBObject();
                     nestedParts.put(entryParts[0], nested);
                 }
                 nested.append(entryParts[1], chunk.get(key));
                 if (replace && isKey) {
-                    BasicDBObject deleteNested = deleteNestedParts.get(entryParts[0]);
-                    if (deleteNested == null) {
-                        deleteNested = new BasicDBObject();
-                        deleteNestedParts.put(entryParts[0], deleteNested);
+                    BasicDBObject replaceKeyNested = replaceKeyNestedParts.get(entryParts[0]);
+                    if (replaceKeyNested == null) {
+                        replaceKeyNested = new BasicDBObject();
+                        replaceKeyNestedParts.put(entryParts[0], replaceKeyNested);
                     }
-                    deleteNested.append(entryParts[1], deleteNested);
+                    replaceKeyNested.append(entryParts[1], replaceKeyNested);
                 }
             } else {
                 String mongoKey = key.getName();
                 String value = chunk.get(key);
+                if ((value == null) && isKey) {
+                    throwMissingKey(key.getName());
+                }
                 toInsert.append(mongoKey, value);
                 if (replace && isKey) {
-                    toDelete.append(mongoKey, value);
+                    replaceKey.append(mongoKey, value);
                 }
             }
         }
@@ -167,12 +175,83 @@
             toInsert.append(mongoKey, nestedParts.get(mongoKey));
         }
         if (replace) {
-            for (String mongoKey : deleteNestedParts.keySet()) {
-                toDelete.append(mongoKey, deleteNestedParts.get(mongoKey));
+            for (String mongoKey : replaceKeyNestedParts.keySet()) {
+                replaceKey.append(mongoKey, replaceKeyNestedParts.get(mongoKey));
             }
-            coll.remove(toDelete);
+            coll.update(replaceKey, toInsert, true, false);
+        } else {
+            coll.insert(toInsert);
         }
-        coll.insert(toInsert);
+    }
+
+    @Override
+    protected void updateChunkImpl(Chunk chunk) {
+        Category cat = chunk.getCategory();
+        DBCollection coll = getCachedCollection(cat.getName());
+        BasicDBObject toUpdate = new BasicDBObject();
+        BasicDBObject updateKey = getAgentDBObject();
+        Map<String, BasicDBObject> nestedParts = new HashMap<String, BasicDBObject>();
+        Map<String, BasicDBObject> updateKeyNestedParts = new HashMap<String, BasicDBObject>();
+        for (Iterator<com.redhat.thermostat.agent.storage.Key> iter = cat.getEntryIterator(); iter.hasNext();) {
+            com.redhat.thermostat.agent.storage.Key key = iter.next();
+            boolean isKey = key.isPartialCategoryKey();
+            String[] entryParts = key.getName().split("\\.");
+            if (entryParts.length == 2) {
+                BasicDBObject nested = nestedParts.get(entryParts[0]);
+                if (nested == null) {
+                    if (isKey) {
+                        throwMissingKey(key.getName());
+                    }
+                } else {
+                    if (isKey) {
+                        BasicDBObject updateKeyNested = updateKeyNestedParts.get(entryParts[0]);
+                        if (updateKeyNested == null) {
+                            updateKeyNested = new BasicDBObject();
+                            updateKeyNestedParts.put(entryParts[0], updateKeyNested);
+                        }
+                        updateKeyNested.append(entryParts[1], updateKeyNested);
+                    } else {
+                        nested.append(SET_MODIFIER, new BasicDBObject(entryParts[1], chunk.get(key)));
+                    }
+                }
+            } else {
+                String mongoKey = key.getName();
+                String value = chunk.get(key);
+                if (value == null) {
+                    if (isKey) {
+                        throwMissingKey(key.getName());
+                    }
+                } else {
+                    if (isKey) {
+                        updateKey.append(mongoKey, value);
+                    } else {
+                        toUpdate.append(SET_MODIFIER, new BasicDBObject(mongoKey, value));
+                    }
+                }
+            }
+        }
+        for (String mongoKey : nestedParts.keySet()) {
+            toUpdate.append(mongoKey, nestedParts.get(mongoKey));
+        }
+        for (String mongoKey : updateKeyNestedParts.keySet()) {
+            updateKey.append(mongoKey, updateKeyNestedParts.get(mongoKey));
+        }
+        coll.update(updateKey, toUpdate);
+    }
+
+    private void throwMissingKey(String keyName) {
+        throw new IllegalArgumentException("Attempt to insert chunk with incomplete partial key.  Missing: " + keyName);
+    }
+
+    private DBCollection getCachedCollection(String collName) {
+        DBCollection coll = collectionCache.get(collName);
+        if (coll == null) {
+            coll = db.getCollection(collName);
+            if (coll != null) {
+                collectionCache.put(collName, coll);
+            }
+        }
+        return coll;
     }
 
     private DBObject createConfigDBObject(StartupConfiguration config, BackendRegistry registry) {
--- a/src/com/redhat/thermostat/agent/storage/Storage.java	Mon Jan 30 13:42:10 2012 -0500
+++ b/src/com/redhat/thermostat/agent/storage/Storage.java	Tue Jan 31 12:01:59 2012 -0500
@@ -73,12 +73,24 @@
     }
 
     public final void putChunk(Chunk chunk, Backend backend) {
+        validateChunkOrigin(chunk, backend);
+        putChunkImpl(chunk);
+    }
+
+    public final void updateChunk(Chunk chunk, Backend backend) {
+        validateChunkOrigin(chunk, backend);
+        updateChunkImpl(chunk);
+    }
+
+    private void validateChunkOrigin(Chunk chunk, Backend origin) {
         Category category = chunk.getCategory();
-        if (backend != categoryMap.get(category.getName())) { // This had better be not just equivalent, but actually the same object.
-            throw new IllegalArgumentException("Invalid category-backend combination while inserting data.  Category: " + category.getName() + "  Backend: " + backend.getName());
+        if (origin != categoryMap.get(category.getName())) { // This had better be not just equivalent, but actually the same object.
+            throw new IllegalArgumentException("Invalid category-backend combination while inserting data.  Category: " + category.getName() + "  Backend: " + origin.getName());
         }
-        addChunkImpl(chunk);
     }
     
-    protected abstract void addChunkImpl(Chunk chunk);
+    protected abstract void putChunkImpl(Chunk chunk);
+
+    protected abstract void updateChunkImpl(Chunk chunk);
+
 }
--- a/src/com/redhat/thermostat/backend/Backend.java	Mon Jan 30 13:42:10 2012 -0500
+++ b/src/com/redhat/thermostat/backend/Backend.java	Tue Jan 31 12:01:59 2012 -0500
@@ -191,4 +191,8 @@
     public final void store(Chunk chunk) {
         storage.putChunk(chunk, this);
     }
+
+    public final void update(Chunk chunk) {
+        storage.updateChunk(chunk, this);
+    }
 }
--- a/src/com/redhat/thermostat/backend/system/JvmStatHostListener.java	Mon Jan 30 13:42:10 2012 -0500
+++ b/src/com/redhat/thermostat/backend/system/JvmStatHostListener.java	Tue Jan 31 12:01:59 2012 -0500
@@ -122,13 +122,12 @@
         logger.warning("Disconnected from host");
     }
 
+    @SuppressWarnings("unchecked") // Unchecked casts to (Set<Integer>).
     @Override
     public void vmStatusChanged(VmStatusChangeEvent event) {
         MonitoredHost host = event.getMonitoredHost();
 
-        Iterator<Integer> newActive = event.getStarted().iterator();
-        while (newActive.hasNext()) {
-            Integer newVm = newActive.next();
+        for (Integer newVm : (Set<Integer>) event.getStarted()) {
             try {
                 logger.fine("New vm: " + newVm);
                 sendNewVM(newVm, host);
@@ -139,9 +138,7 @@
             }
         }
 
-        Iterator<Integer> newStopped = event.getTerminated().iterator();
-        while (newStopped.hasNext()) {
-            Integer stoppedVm = newStopped.next();
+        for (Integer stoppedVm : (Set<Integer>) event.getTerminated()) {
             try {
                 logger.fine("stopped vm: " + stoppedVm);
                 sendStoppedVM(stoppedVm, host);
@@ -195,11 +192,12 @@
         VmIdentifier resolvedVmID = host.getHostIdentifier().resolve(
                 new VmIdentifier(vmId.toString()));
         if (resolvedVmID != null) {
+            long stopTime = System.currentTimeMillis();
             listenerMap.remove(vmId);
             for (JvmStatusListener statusListener : statusListeners) {
                 statusListener.jvmStopped(vmId);
             }
-            // TODO store updated vminfo chunk with stop time.
+            backend.update(makeVmInfoUpdateStoppedChunk(vmId, stopTime));
         }
     }
 
@@ -225,6 +223,13 @@
         return chunk;
     }
 
+    private Chunk makeVmInfoUpdateStoppedChunk(int vmId, long stopTimeStamp) {
+        Chunk chunk = new Chunk(vmInfoCategory, false);
+        chunk.put(vmInfoIdKey, String.valueOf(vmId));
+        chunk.put(vmInfoStopTimeKey, String.valueOf(stopTimeStamp));
+        return chunk;
+    }
+
     @Override
     public void addJvmStatusListener(JvmStatusListener listener) {
         statusListeners.add(listener);
--- a/src/com/redhat/thermostat/backend/system/SystemBackend.java	Mon Jan 30 13:42:10 2012 -0500
+++ b/src/com/redhat/thermostat/backend/system/SystemBackend.java	Tue Jan 31 12:01:59 2012 -0500
@@ -266,7 +266,7 @@
     }
 
     private Chunk makeCpuChunk(CpuStat cpuStat) {
-        Chunk chunk = new Chunk(/* cpuStat.getTimeStamp(), */cpuStatCategory, false);
+        Chunk chunk = new Chunk(cpuStatCategory, false);
         chunk.put(Key.TIMESTAMP, Long.toString(cpuStat.getTimeStamp()));
         chunk.put(cpu5LoadKey, Double.toString(cpuStat.getLoad5()));
         chunk.put(cpu10LoadKey, Double.toString(cpuStat.getLoad10()));