changeset 570:7d4bdce24e21

Move tread collectors to agent review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-August/002957.html reviewed-by: vanaltj
author Mario Torre <neugens.limasoftware@gmail.com>
date Wed, 29 Aug 2012 21:22:47 +0200
parents 8a0323e7e0ac
children e8838f118b34
files agent/cli/src/main/java/com/redhat/thermostat/agent/cli/AgentApplication.java agent/command/src/main/java/com/redhat/thermostat/agent/command/ReceiverRegistry.java distribution/config/bundles.properties distribution/pom.xml thread/collector/pom.xml thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorFactoryImpl.java thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadMXBeanCollector.java thread/collector/src/main/java/com/redhat/thermostat/thread/collector/osgi/Activator.java thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/Harvester.java thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/HarvesterCommand.java thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java thread/collector/src/main/java/com/redhat/thermostat/utils/management/MXBeanConnector.java thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorFactoryTest.java thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorTest.java thread/collector/src/test/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorTest.java thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/HarvesterTest.java thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java thread/harvester/pom.xml thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/osgi/Activator.java thread/pom.xml
diffstat 23 files changed, 1487 insertions(+), 465 deletions(-) [+]
line wrap: on
line diff
--- a/agent/cli/src/main/java/com/redhat/thermostat/agent/cli/AgentApplication.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/agent/cli/src/main/java/com/redhat/thermostat/agent/cli/AgentApplication.java	Wed Aug 29 21:22:47 2012 +0200
@@ -64,6 +64,7 @@
 import com.redhat.thermostat.common.storage.Connection.ConnectionListener;
 import com.redhat.thermostat.common.storage.Connection.ConnectionStatus;
 import com.redhat.thermostat.common.storage.MongoStorageProvider;
+import com.redhat.thermostat.common.storage.Storage;
 import com.redhat.thermostat.common.storage.StorageProvider;
 import com.redhat.thermostat.common.tools.BasicCommand;
 import com.redhat.thermostat.common.utils.LoggingUtils;
@@ -106,7 +107,7 @@
         final Logger logger = LoggingUtils.getLogger(AgentApplication.class);
 
         StorageProvider connProv = new MongoStorageProvider(configuration);
-        DAOFactory daoFactory = new MongoDAOFactory(connProv);
+        final DAOFactory daoFactory = new MongoDAOFactory(connProv);
         ApplicationContext.getInstance().setDAOFactory(daoFactory);
         TimerFactory timerFactory = new ThreadPoolTimerFactory(1);
         ApplicationContext.getInstance().setTimerFactory(timerFactory);
@@ -123,7 +124,9 @@
                     logger.fine("Connecting to storage.");
                     break;
                 case CONNECTED:
-                    logger.fine("Connected to storage.");
+                    logger.fine("Connected to storage, registering storage as service");
+                    Storage storage = daoFactory.getStorage();
+                    OSGIUtils.getInstance().registerService(Storage.class, storage);
                     break;
                 case FAILED_TO_CONNECT:
                     logger.warning("Could not connect to storage.");
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/ReceiverRegistry.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/ReceiverRegistry.java	Wed Aug 29 21:22:47 2012 +0200
@@ -60,3 +60,4 @@
         proxy.unregisterAll();
     }
 }
+
--- a/distribution/config/bundles.properties	Wed Aug 29 14:32:51 2012 -0400
+++ b/distribution/config/bundles.properties	Wed Aug 29 21:22:47 2012 +0200
@@ -20,7 +20,10 @@
       thermostat-thread-client-swing-@project.version@.jar, \
       thermostat-thread-client-controllers-@project.version@.jar, \
       thermostat-thread-client-common-@project.version@.jar, \
-      thermostat-osgi-process-handler-@project.version@.jar
+      thermostat-osgi-process-handler-@project.version@.jar, \
+      thermostat-client-command-@project.version@.jar, \
+      thermostat-client-command-@project.version@.jar, \
+      thermostat-agent-command-@project.version@.jar
 
 service = thermostat-agent-core-@project.version@.jar, \
           thermostat-osgi-process-handler-@project.version@.jar, \
@@ -34,7 +37,10 @@
         thermostat-common-core-@project.version@.jar, \
         thermostat-agent-cli-@project.version@.jar, \
         thermostat-common-command-@project.version@.jar, \
-        thermostat-agent-command-@project.version@.jar
+        thermostat-agent-command-@project.version@.jar, \
+        thermostat-thread-collector-@project.version@.jar, \
+        thermostat-thread-harvester-@project.version@.jar, \
+        thermostat-client-command-@project.version@.jar
 
 storage = thermostat-agent-core-@project.version@.jar, \
           thermostat-osgi-process-handler-@project.version@.jar, \
--- a/distribution/pom.xml	Wed Aug 29 14:32:51 2012 -0400
+++ b/distribution/pom.xml	Wed Aug 29 21:22:47 2012 +0200
@@ -347,6 +347,11 @@
         <artifactId>thermostat-thread-collector</artifactId>
         <version>${project.version}</version>
     </dependency>      
-  
+    <dependency>
+        <groupId>com.redhat.thermostat</groupId>
+        <artifactId>thermostat-thread-harvester</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+     
   </dependencies>
 </project>
--- a/thread/collector/pom.xml	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/pom.xml	Wed Aug 29 21:22:47 2012 +0200
@@ -66,6 +66,16 @@
       <artifactId>thermostat-common-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.redhat.thermostat</groupId>
+      <artifactId>thermostat-agent-command</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.redhat.thermostat</groupId>
+      <artifactId>thermostat-client-command</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     
     <dependency>
       <groupId>org.osgi</groupId>
@@ -100,6 +110,7 @@
             <Export-Package>
               com.redhat.thermostat.thread.collector,
               com.redhat.thermostat.thread.dao,
+              com.redhat.thermostat.thread.harvester,
             </Export-Package>
             <Private-Package>
               com.redhat.thermostat.thread.collector.osgi,
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorFactoryImpl.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorFactoryImpl.java	Wed Aug 29 21:22:47 2012 +0200
@@ -36,43 +36,21 @@
 
 package com.redhat.thermostat.thread.collector.impl;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
 import com.redhat.thermostat.common.dao.VmRef;
 import com.redhat.thermostat.thread.collector.ThreadCollector;
 import com.redhat.thermostat.thread.collector.ThreadCollectorFactory;
 import com.redhat.thermostat.thread.dao.ThreadDao;
-import com.redhat.thermostat.utils.management.MXBeanConnector;
 
 public class ThreadCollectorFactoryImpl implements ThreadCollectorFactory {
 
-    private Map<VmRef, ThreadMXBeanCollector> collectors;
-    
     private ThreadDao threadDao;
-    private ScheduledExecutorService threadPool;
     
-    public ThreadCollectorFactoryImpl(ThreadDao threadDao, ScheduledExecutorService threadPool) {
+    public ThreadCollectorFactoryImpl(ThreadDao threadDao) {
         this.threadDao = threadDao;
-        this.threadPool = threadPool;
-        
-        collectors = new HashMap<VmRef, ThreadMXBeanCollector>();
     }
     
     @Override
     public synchronized ThreadCollector getCollector(VmRef reference) {
-        ThreadMXBeanCollector collector = collectors.get(reference);
-        if (collector == null) {
-            collector = new ThreadMXBeanCollector(threadDao, reference, new MXBeanConnector(reference), threadPool);
-            collectors.put(reference, collector);
-        }
-        return collector;
-    }
-    
-    public void shutdown() {
-        for (ThreadMXBeanCollector collector : collectors.values()) {
-            collector.stopHarvester();
-        }
+        return new ThreadMXBeanCollector(threadDao, reference);
     }
 }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadMXBeanCollector.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadMXBeanCollector.java	Wed Aug 29 21:22:47 2012 +0200
@@ -36,98 +36,71 @@
 
 package com.redhat.thermostat.thread.collector.impl;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
 
-import javax.management.MalformedObjectNameException;
-
+import com.redhat.thermostat.client.command.RequestQueue;
+import com.redhat.thermostat.common.command.Request;
+import com.redhat.thermostat.common.command.RequestResponseListener;
+import com.redhat.thermostat.common.command.Response;
+import com.redhat.thermostat.common.command.Request.RequestType;
+import com.redhat.thermostat.common.dao.HostRef;
 import com.redhat.thermostat.common.dao.VmRef;
+import com.redhat.thermostat.common.utils.OSGIUtils;
 import com.redhat.thermostat.thread.collector.ThreadCollector;
 import com.redhat.thermostat.thread.collector.ThreadSummary;
 import com.redhat.thermostat.thread.collector.VMThreadCapabilities;
 import com.redhat.thermostat.thread.dao.ThreadDao;
-import com.redhat.thermostat.utils.management.MXBeanConnection;
-import com.redhat.thermostat.utils.management.MXBeanConnector;
+import com.redhat.thermostat.thread.harvester.HarvesterCommand;
+import com.redhat.thermostat.thread.harvester.ThreadHarvester;
 
-@SuppressWarnings("restriction")
 public class ThreadMXBeanCollector implements ThreadCollector {
 
     private ThreadDao threadDao;
     private VmRef ref;
-    private MXBeanConnector connector;
 
-    private MXBeanConnection connection;
-    private ThreadMXBean collectorBean;
-    
-    private boolean isConnected;
-    private ScheduledExecutorService threadPool;
-    
-    private ScheduledFuture<?> harvester;
-    
-    public ThreadMXBeanCollector(ThreadDao threadDao, VmRef ref, MXBeanConnector connector, ScheduledExecutorService threadPool) {
+    public ThreadMXBeanCollector(ThreadDao threadDao, VmRef ref) {
         this.threadDao = threadDao;
         this.ref = ref;
-        this.connector = connector;
-        this.threadPool = threadPool;
+    }
+
+    Request createRequest() {
+        HostRef targetHostRef = ref.getAgent();
+        
+        // todo
+        String address = threadDao.getStorage().getConfigListenAddress(targetHostRef);
+        String [] host = address.split(":");
+        
+        InetSocketAddress target = new InetSocketAddress(host[0], Integer.parseInt(host[1]));
+        Request harvester = new Request(RequestType.RESPONSE_EXPECTED, target);
+
+        harvester.setReceiver(ThreadHarvester.class.getName());
+        
+        return harvester;
+    }
+    
+    @Override
+    public void startHarvester() {
+        
+        Request harvester = createRequest();
+        harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.START.name());
+        harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString());
+        harvester.setParameter(HarvesterCommand.AGENT_ID.name(), ref.getAgent().getAgentId());
+
+        RequestQueue queue = getRequestQueue();
+        queue.putRequest(harvester);
     }
 
     @Override
-    public synchronized void startHarvester() {
-        if (isConnected) return;
-        
-        if (!connector.isAttached()) {
-            try {
-                connector.attach();
-            } catch (Exception ignore) {
-                ignore.printStackTrace();
-            }
-        }
-        
-        try {
-            connection = connector.connect();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        isConnected = true;
+    public void stopHarvester() {
         
-        harvester = threadPool.scheduleAtFixedRate(new Harvester(), 0, 1, TimeUnit.SECONDS);
-    }
-    
-    private class Harvester implements Runnable {
-        @Override
-        public void run() {
-            harvestData();
-        }
-    }
-    
-    @Override
-    public synchronized void stopHarvester() {
-        if (!isConnected) return;
-        
-        harvester.cancel(false);
-        
-        try {
-            connection.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        
-        if (connector.isAttached()) {
-            try {
-                connector.close();
-            } catch (Exception ignore) {
-                ignore.printStackTrace();
-            }
-        }
-        
-        isConnected = false;
+        Request harvester = createRequest();
+        harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.STOP.name());
+        harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString());
+
+        RequestQueue queue = getRequestQueue();
+        queue.putRequest(harvester);
     }
     
     @Override
@@ -160,127 +133,39 @@
     public List<com.redhat.thermostat.thread.collector.ThreadInfo> getThreadInfo(long since) {
         return threadDao.loadThreadInfo(ref, since);
     }
-    
-    public synchronized void harvestData() {
-        try {
-            
-            long timestamp = System.currentTimeMillis();
-            
-            ThreadMXSummary summary = new ThreadMXSummary();
-            
-            collectorBean = getDataCollectorBean(connection);
-            
-            summary.setCurrentLiveThreads(collectorBean.getThreadCount());
-            summary.setDaemonThreads(collectorBean.getDaemonThreadCount());
-            summary.setTimestamp(timestamp);
-            
-            threadDao.saveSummary(ref, summary);
-            
-            long [] ids = collectorBean.getAllThreadIds();
-            long[] allocatedBytes = null;
-            
-            // now the details for the threads
-            if (collectorBean instanceof com.sun.management.ThreadMXBean) {
-                com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) collectorBean;
-                boolean wasEnabled = false;
-                if (sunBean.isThreadAllocatedMemorySupported()) {
-                    wasEnabled = sunBean.isThreadAllocatedMemoryEnabled();
-                    sunBean.setThreadAllocatedMemoryEnabled(true);
-                    allocatedBytes = sunBean.getThreadAllocatedBytes(ids);
-                    sunBean.setThreadAllocatedMemoryEnabled(wasEnabled);
-                }
-            }
 
-            ThreadInfo[] threadInfos = collectorBean.getThreadInfo(ids, true, true);
-            
-            for (int i = 0; i < ids.length; i++) {
-                ThreadMXInfo info = new ThreadMXInfo();
-                ThreadInfo beanInfo = threadInfos[i];
-
-                info.setTimeStamp(timestamp);
-
-                info.setName(beanInfo.getThreadName());
-                info.setID(beanInfo.getThreadId());
-                info.setState(beanInfo.getThreadState());
-                info.setStackTrace(beanInfo.getStackTrace());
-
-                info.setCPUTime(collectorBean.getThreadCpuTime(info.getThreadID()));
-                info.setUserTime(collectorBean.getThreadUserTime(info.getThreadID()));
-                
-                info.setBlockedCount(beanInfo.getBlockedCount());
-                info.setWaitedCount(beanInfo.getWaitedCount());
-                
-                if (allocatedBytes != null) {
-                    info.setAllocatedBytes(allocatedBytes[i]);
-                }
-
-                threadDao.saveThreadInfo(ref, info);
-            }
-            
-        } catch (MalformedObjectNameException e) {
-            e.printStackTrace();
-        }
-    }
-    
     @Override
-    public synchronized VMThreadCapabilities getVMThreadCapabilities() {
+    public VMThreadCapabilities getVMThreadCapabilities() {
         
         VMThreadCapabilities caps = threadDao.loadCapabilities(ref);
         if (caps == null) {
-            if (!connector.isAttached()) {
-                try {
-                    connector.attach();
-                } catch (Exception ignore) {
-                    ignore.printStackTrace();
-                }
-            }
+            Request harvester = createRequest();
+            harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.VM_CAPS.name());
+            harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString());
+            harvester.setParameter(HarvesterCommand.AGENT_ID.name(), ref.getAgent().getAgentId());
             
-            // query caps to the vm, then save for later
-            try (MXBeanConnection connection = connector.connect()) {
-                             
-                ThreadMXBean bean = getDataCollectorBean(connection);
-                VMThreadMXCapabilities _caps = new VMThreadMXCapabilities();
-                
-                if (bean.isThreadCpuTimeSupported()) _caps.addFeature(ThreadDao.CPU_TIME);
-                if (bean.isThreadContentionMonitoringSupported()) _caps.addFeature(ThreadDao.CONTENTION_MONITOR);
-                
-                if (bean instanceof com.sun.management.ThreadMXBean) {
-                    com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) bean;
-                    if (sunBean.isThreadAllocatedMemorySupported())
-                        _caps.addFeature(ThreadDao.THREAD_ALLOCATED_MEMORY);
+            final CountDownLatch latch = new CountDownLatch(1);
+            harvester.addListener(new RequestResponseListener() {
+                @Override
+                public void fireComplete(Request request, Response response) {
+                    latch.countDown();
                 }
-                
-                caps = _caps;
-                
-                threadDao.saveCapabilities(ref, caps);
-                
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            });
 
-            if (connector.isAttached()) {
-                try {
-                    connector.close();
-                } catch (Exception ignore) {
-                    ignore.printStackTrace();
-                }
+            RequestQueue queue = getRequestQueue();
+            queue.putRequest(harvester);
+        
+            try {
+                latch.await();
+                caps = threadDao.loadCapabilities(ref);
+            } catch (InterruptedException ignore) {
+                caps = new VMThreadMXCapabilities();
             }
         }
-        
         return caps;
     }
     
-    private ThreadMXBean getDataCollectorBean(MXBeanConnection connection) throws MalformedObjectNameException {
-        ThreadMXBean bean = null;
-        try {
-            bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME,
-                                          com.sun.management.ThreadMXBean.class);
-        } catch (MalformedObjectNameException ignore) {}
-        
-        if (bean == null) {
-            bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME,
-                                          ThreadMXBean.class);
-        }
-        return bean;
+    RequestQueue getRequestQueue() {
+        return OSGIUtils.getInstance().getService(RequestQueue.class); 
     }
 }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/osgi/Activator.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/osgi/Activator.java	Wed Aug 29 21:22:47 2012 +0200
@@ -36,9 +36,6 @@
 
 package com.redhat.thermostat.thread.collector.osgi;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
@@ -52,11 +49,11 @@
 
 public class Activator implements BundleActivator {
     
-    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(24);
     private ThreadCollectorFactoryImpl collectorFactory;
     
     @Override
     public void start(final BundleContext context) throws Exception {
+        
         @SuppressWarnings({ "rawtypes", "unchecked" })
         ServiceTracker tracker = new ServiceTracker(context, Storage.class.getName(), null) {
             @Override
@@ -65,10 +62,11 @@
                 Storage storage = (Storage) context.getService(reference);
                 
                 ThreadDao threadDao = new ThreadDaoImpl(storage);
-                collectorFactory = new ThreadCollectorFactoryImpl(threadDao, executor);
+                collectorFactory = new ThreadCollectorFactoryImpl(threadDao);
 
                 context.registerService(ThreadCollectorFactory.class.getName(), collectorFactory, null);
-                
+                context.registerService(ThreadDao.class.getName(), threadDao, null);
+
                 return super.addingService(reference);
             }
         };
@@ -77,11 +75,5 @@
 
     @Override
     public void stop(BundleContext context) throws Exception {
-        if (collectorFactory != null) {
-            collectorFactory.shutdown();
-        }
-        if (executor != null) {
-            executor.shutdown();
-        }
     }
 }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java	Wed Aug 29 21:22:47 2012 +0200
@@ -41,6 +41,7 @@
 import com.redhat.thermostat.common.dao.VmRef;
 import com.redhat.thermostat.common.storage.Category;
 import com.redhat.thermostat.common.storage.Key;
+import com.redhat.thermostat.common.storage.Storage;
 import com.redhat.thermostat.thread.collector.ThreadInfo;
 import com.redhat.thermostat.thread.collector.ThreadSummary;
 import com.redhat.thermostat.thread.collector.VMThreadCapabilities;
@@ -62,7 +63,7 @@
 
 
     VMThreadCapabilities loadCapabilities(VmRef ref);
-    void saveCapabilities(VmRef ref, VMThreadCapabilities caps);
+    void saveCapabilities(String vmId, String agentId, VMThreadCapabilities caps);
 
     static final String LIVE_THREADS = "thread-living";
     static final Key<Long> LIVE_THREADS_KEY = new Key<Long>(LIVE_THREADS, false);
@@ -74,7 +75,7 @@
                          Key.TIMESTAMP,
                          LIVE_THREADS_KEY, DAEMON_THREADS_KEY);
     
-    void saveSummary(VmRef vm, ThreadSummary summary);
+    void saveSummary(String vmId, String agentId, ThreadSummary summary);
     ThreadSummary loadLastestSummary(VmRef ref);
     List<ThreadSummary> loadSummary(VmRef ref, long since);
 
@@ -104,6 +105,8 @@
                          THREAD_USER_TIME_KEY, THREAD_BLOCKED_COUNT_KEY,
                          THREAD_WAIT_COUNT_KEY, THREAD_STACK_TRACE_ID_KEY);
     
-    void saveThreadInfo(VmRef ref, ThreadInfo info);
+    void saveThreadInfo(String vmId, String agentId, ThreadInfo info);
     List<ThreadInfo> loadThreadInfo(VmRef ref, long since);
+    
+    Storage getStorage();
 }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java	Wed Aug 29 21:22:47 2012 +0200
@@ -84,8 +84,8 @@
     }
     
     @Override
-    public void saveCapabilities(VmRef vm, VMThreadCapabilities caps) {
-        Chunk chunk = prepareChunk(THREAD_CAPABILITIES, true, vm);
+    public void saveCapabilities(String vmId, String agentId, VMThreadCapabilities caps) {
+        Chunk chunk = prepareChunk(THREAD_CAPABILITIES, true, vmId, agentId);
         
         chunk.put(CONTENTION_MONITOR_KEY, caps.supportContentionMonitor());
         chunk.put(CPU_TIME_KEY, caps.supportCPUTime());
@@ -95,8 +95,8 @@
     }
     
     @Override
-    public void saveSummary(VmRef vm, ThreadSummary summary) {
-        Chunk chunk = prepareChunk(THREAD_SUMMARY, false, vm);
+    public void saveSummary(String vmId, String agentId, ThreadSummary summary) {
+        Chunk chunk = prepareChunk(THREAD_SUMMARY, false, vmId, agentId);
         
         chunk.put(LIVE_THREADS_KEY, summary.currentLiveThreads());
         chunk.put(DAEMON_THREADS_KEY, summary.currentDaemonThreads());
@@ -145,8 +145,8 @@
     }
     
     @Override
-    public void saveThreadInfo(VmRef ref, ThreadInfo info) {
-        Chunk chunk = prepareChunk(THREAD_INFO, false, ref);
+    public void saveThreadInfo(String vmId, String agentId, ThreadInfo info) {
+        Chunk chunk = prepareChunk(THREAD_INFO, false, vmId, agentId);
         
         chunk.put(Key.TIMESTAMP, info.getTimeStamp());
 
@@ -191,10 +191,19 @@
         return result;
     }
     
-    private Chunk prepareChunk(Category category, boolean replace, VmRef vm) {
+    private Chunk prepareChunk(Category category, boolean replace, String vmId, String agentId) {
         Chunk chunk = new Chunk(category, replace);
-        chunk.put(Key.AGENT_ID, vm.getAgent().getAgentId());
-        chunk.put(Key.VM_ID, vm.getId());
+        chunk.put(Key.AGENT_ID, agentId);
+        chunk.put(Key.VM_ID, Integer.valueOf(vmId));
         return chunk;
     }
+    
+    private Chunk prepareChunk(Category category, boolean replace, VmRef vm) {
+        return prepareChunk(category, replace, vm.getIdString(), vm.getAgent().getAgentId());
+    }
+    
+    @Override
+    public Storage getStorage() {
+        return storage;
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/Harvester.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.harvester;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MalformedObjectNameException;
+
+import com.redhat.thermostat.common.dao.VmRef;
+import com.redhat.thermostat.thread.collector.impl.ThreadMXInfo;
+import com.redhat.thermostat.thread.collector.impl.ThreadMXSummary;
+import com.redhat.thermostat.thread.collector.impl.VMThreadMXCapabilities;
+import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.utils.management.MXBeanConnection;
+import com.redhat.thermostat.utils.management.MXBeanConnector;
+
+@SuppressWarnings("restriction")
+class Harvester {
+    
+    private boolean isConnected;
+    private ScheduledExecutorService threadPool;
+    private ScheduledFuture<?> harvester;
+    
+    MXBeanConnector connector;
+    
+    private MXBeanConnection connection;
+    private ThreadMXBean collectorBean;
+    private ThreadDao threadDao;
+    private String vmId;
+    private String agentId;
+    
+    Harvester(ThreadDao threadDao, ScheduledExecutorService threadPool, String vmId, String agentId) {
+        this.connector = new MXBeanConnector(vmId);
+        this.threadDao = threadDao;
+        this.vmId = vmId;
+        this.agentId = agentId;
+        this.threadPool = threadPool;
+    }
+    
+    synchronized void start() {
+        if (isConnected) return;
+              
+        if (!connector.isAttached()) {
+            try {
+                connector.attach();
+            } catch (Exception ignore) {
+                ignore.printStackTrace();
+            }
+        }
+      
+        try {
+            connection = connector.connect();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        isConnected = true;
+        
+        harvester = threadPool.scheduleAtFixedRate(new HarvesterAction(), 0, 1, TimeUnit.SECONDS);
+    }
+    
+    boolean isConnected() {
+        return isConnected;
+    }
+    
+    synchronized void stop() {
+        if (!isConnected)
+            return;
+        
+        harvester.cancel(false);
+
+        try {
+            connection.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        if (connector.isAttached()) {
+            try {
+                connector.close();
+            } catch (Exception ignore) {
+                ignore.printStackTrace();
+            }
+        }
+
+        isConnected = false;
+    }
+    
+    ThreadMXBean getDataCollectorBean(MXBeanConnection connection)
+            throws MalformedObjectNameException
+    {
+        ThreadMXBean bean = null;
+        try {
+            bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME,
+                                          com.sun.management.ThreadMXBean.class);
+        } catch (MalformedObjectNameException ignore) {}
+
+        if (bean == null) {
+            bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME,
+                                          ThreadMXBean.class);
+        }
+        return bean;
+    }
+    
+    synchronized void harvestData() {
+      try {
+          long timestamp = System.currentTimeMillis();
+          
+          ThreadMXSummary summary = new ThreadMXSummary();
+          
+          collectorBean = getDataCollectorBean(connection);
+          
+          summary.setCurrentLiveThreads(collectorBean.getThreadCount());
+          summary.setDaemonThreads(collectorBean.getDaemonThreadCount());
+          summary.setTimestamp(timestamp);
+          
+          threadDao.saveSummary(vmId, agentId, summary);
+          
+          long [] ids = collectorBean.getAllThreadIds();
+          long[] allocatedBytes = null;
+          
+          // now the details for the threads
+          if (collectorBean instanceof com.sun.management.ThreadMXBean) {
+              com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) collectorBean;
+              boolean wasEnabled = false;
+              if (sunBean.isThreadAllocatedMemorySupported()) {
+                  wasEnabled = sunBean.isThreadAllocatedMemoryEnabled();
+                  sunBean.setThreadAllocatedMemoryEnabled(true);
+                  allocatedBytes = sunBean.getThreadAllocatedBytes(ids);
+                  sunBean.setThreadAllocatedMemoryEnabled(wasEnabled);
+              }
+          }
+
+          ThreadInfo[] threadInfos = collectorBean.getThreadInfo(ids, true, true);
+          
+          for (int i = 0; i < ids.length; i++) {
+              ThreadMXInfo info = new ThreadMXInfo();
+              ThreadInfo beanInfo = threadInfos[i];
+
+              info.setTimeStamp(timestamp);
+
+              info.setName(beanInfo.getThreadName());
+              info.setID(beanInfo.getThreadId());
+              info.setState(beanInfo.getThreadState());
+              info.setStackTrace(beanInfo.getStackTrace());
+
+              info.setCPUTime(collectorBean.getThreadCpuTime(info.getThreadID()));
+              info.setUserTime(collectorBean.getThreadUserTime(info.getThreadID()));
+              
+              info.setBlockedCount(beanInfo.getBlockedCount());
+              info.setWaitedCount(beanInfo.getWaitedCount());
+              
+              if (allocatedBytes != null) {
+                  info.setAllocatedBytes(allocatedBytes[i]);
+              }
+
+              threadDao.saveThreadInfo(vmId, agentId, info);
+          }
+          
+      } catch (MalformedObjectNameException e) {
+          e.printStackTrace();
+      }
+    }
+    
+    private class HarvesterAction implements Runnable {
+        @Override
+        public void run() {
+            harvestData();
+        }
+    }
+    
+    synchronized void saveVmCaps() {
+        
+        boolean closeAfter = false;
+        if (!connector.isAttached()) {
+            closeAfter = true; 
+            try {
+                connector.attach();
+            } catch (Exception ignore) {
+                ignore.printStackTrace();
+            }
+        }
+
+        try (MXBeanConnection connection = connector.connect()) {
+
+            ThreadMXBean bean = getDataCollectorBean(connection);
+            VMThreadMXCapabilities caps = new VMThreadMXCapabilities();
+
+            if (bean.isThreadCpuTimeSupported())
+                caps.addFeature(ThreadDao.CPU_TIME);
+            if (bean.isThreadContentionMonitoringSupported())
+                caps.addFeature(ThreadDao.CONTENTION_MONITOR);
+
+            if (bean instanceof com.sun.management.ThreadMXBean) {
+                com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) bean;
+                if (sunBean.isThreadAllocatedMemorySupported())
+                    caps.addFeature(ThreadDao.THREAD_ALLOCATED_MEMORY);
+            }
+
+            threadDao.saveCapabilities(vmId, agentId, caps);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if (closeAfter) {
+            try {
+                connector.close();
+            } catch (Exception ignore) {
+                ignore.printStackTrace();
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/HarvesterCommand.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,11 @@
+package com.redhat.thermostat.thread.harvester;
+
+public enum HarvesterCommand {
+
+    START,
+    STOP,
+    VM_CAPS,
+    
+    AGENT_ID,
+    VM_ID;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.harvester;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.redhat.thermostat.agent.command.RequestReceiver;
+import com.redhat.thermostat.common.command.Request;
+import com.redhat.thermostat.common.command.Response;
+import com.redhat.thermostat.common.command.Response.ResponseType;
+
+import com.redhat.thermostat.thread.dao.ThreadDao;
+
+public class ThreadHarvester implements RequestReceiver {
+
+    private ScheduledExecutorService executor;
+    Map<String, Harvester> connectors;
+
+    private ThreadDao dao;
+    
+    public ThreadHarvester(ScheduledExecutorService executor, ThreadDao dao) {
+        this.executor = executor;
+        connectors = new HashMap<>();
+        this.dao = dao;
+    }
+    
+    @Override
+    public Response receive(Request request) {
+        
+        String command = request.getParameter(HarvesterCommand.class.getName());
+        switch (HarvesterCommand.valueOf(command)) {
+        case START: {
+            String vmId = request.getParameter(HarvesterCommand.VM_ID.name());
+            String agentId = request.getParameter(HarvesterCommand.AGENT_ID.name());
+            startHarvester(vmId, agentId);
+            break;
+        }   
+        case STOP: {
+            String vmId = request.getParameter(HarvesterCommand.VM_ID.name());
+            stopHarvester(vmId);
+            break;
+        }
+        case VM_CAPS: {
+            // this is blocking
+            String vmId = request.getParameter(HarvesterCommand.VM_ID.name());
+            String agentId = request.getParameter(HarvesterCommand.AGENT_ID.name());
+            saveVmCaps(vmId, agentId);
+            break;
+        }
+        default:
+            break;
+        }
+        
+        return new Response(ResponseType.OK);
+    }
+    
+    private void startHarvester(String vmId, String agentId) {
+        Harvester harvester = getHarvester(vmId, agentId);
+        harvester.start();
+    }
+    
+    private void saveVmCaps(String vmId, String agentId) {
+        Harvester harvester = getHarvester(vmId, agentId);
+        harvester.saveVmCaps();
+    }
+    
+    private void stopHarvester(String vmId) {
+        Harvester harvester = connectors.get(vmId);
+        if (harvester != null) {
+            harvester.stop();
+        }
+    }
+    
+    Harvester getHarvester(String vmId, String agentId) {
+        Harvester harvester = connectors.get(vmId);
+        if (harvester == null) {
+            harvester = new Harvester(dao, executor, vmId, agentId);
+            connectors.put(vmId, harvester);
+        }
+        
+        return harvester;
+    }
+}
--- a/thread/collector/src/main/java/com/redhat/thermostat/utils/management/MXBeanConnector.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/utils/management/MXBeanConnector.java	Wed Aug 29 21:22:47 2012 +0200
@@ -54,20 +54,25 @@
     private static final String CONNECTOR_ADDRESS_PROPERTY = "com.sun.management.jmxremote.localConnectorAddress";
     private String connectorAddress;
     
-    private VmRef reference;
     private VirtualMachine vm;
     
     private boolean attached;
     
+    private String reference;
+    
+    public MXBeanConnector(String reference) {
+        this.reference = reference;
+    }
+    
     public MXBeanConnector(VmRef reference) {
-        this.reference = reference;
+        this.reference = reference.getStringID();
     }
     
     public synchronized void attach() throws Exception {
         if (attached)
             throw new IOException("Already attached");
         
-        vm = VirtualMachine.attach(reference.getStringID());
+        vm = VirtualMachine.attach(reference);
         attached = true;
         
         Properties props = vm.getAgentProperties();
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorFactoryTest.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorFactoryTest.java	Wed Aug 29 21:22:47 2012 +0200
@@ -53,20 +53,9 @@
     public void testThreadCollectorFactory() {
         ThreadDao threadDao = mock(ThreadDao.class);
         VmRef reference = mock(VmRef.class);
-        
-        ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
-        
-        ThreadCollectorFactory factory = new ThreadCollectorFactoryImpl(threadDao, threadPool);
+                
+        ThreadCollectorFactory factory = new ThreadCollectorFactoryImpl(threadDao);
         ThreadCollector collector = factory.getCollector(reference);
         assertNotNull(collector);
-        
-        // ask again, it must be the same instance as before
-        ThreadCollector collector2 = factory.getCollector(reference);
-        assertSame(collector, collector2);
-        
-        // and now of course it must be different
-        VmRef reference2 = mock(VmRef.class);
-        ThreadCollector collector3 = factory.getCollector(reference2);
-        assertNotSame(collector, collector3);
     }
 }
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorTest.java	Wed Aug 29 14:32:51 2012 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,204 +0,0 @@
-/*
- * Copyright 2012 Red Hat, Inc.
- *
- * This file is part of Thermostat.
- *
- * Thermostat is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 2, or (at your
- * option) any later version.
- *
- * Thermostat is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Thermostat; see the file COPYING.  If not see
- * <http://www.gnu.org/licenses/>.
- *
- * Linking this code with other modules is making a combined work
- * based on this code.  Thus, the terms and conditions of the GNU
- * General Public License cover the whole combination.
- *
- * As a special exception, the copyright holders of this code give
- * you permission to link this code with independent modules to
- * produce an executable, regardless of the license terms of these
- * independent modules, and to copy and distribute the resulting
- * executable under terms of your choice, provided that you also
- * meet, for each linked independent module, the terms and conditions
- * of the license of that module.  An independent module is a module
- * which is not derived from or based on this code.  If you modify
- * this code, you may extend this exception to your version of the
- * library, but you are not obligated to do so.  If you do not wish
- * to do so, delete this exception statement from your version.
- */
-
-package com.redhat.thermostat.thread.collector;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.management.MalformedObjectNameException;
-
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import com.redhat.thermostat.common.dao.VmRef;
-import com.redhat.thermostat.thread.collector.impl.ThreadMXBeanCollector;
-import com.redhat.thermostat.thread.dao.ThreadDao;
-import com.redhat.thermostat.utils.management.MXBeanConnection;
-import com.redhat.thermostat.utils.management.MXBeanConnector;
-
-public class ThreadCollectorTest {
-
-    @Test
-    public void testVMCapabilitiesNotInDAO() throws Exception {
-        
-        VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class);
-        when(referenceCaps.supportContentionMonitor()).thenReturn(true);
-        when(referenceCaps.supportCPUTime()).thenReturn(false);
-        
-        VmRef reference = mock(VmRef.class);
-        
-        ThreadDao threadDao = mock(ThreadDao.class);
-        when(threadDao.loadCapabilities(reference)).thenReturn(null);
-        
-        MXBeanConnector connector = mock(MXBeanConnector.class);
-        when(connector.isAttached()).thenReturn(false).thenReturn(true);
-        
-        MXBeanConnection connection = mock(MXBeanConnection.class);
-        when(connector.connect()).thenReturn(connection);
-                
-        ArgumentCaptor<String> captor1 = ArgumentCaptor.forClass(String.class);
-        ArgumentCaptor<Class> captor2 = ArgumentCaptor.forClass(Class.class);
-        
-        ThreadMXBean bean = mock(ThreadMXBean.class);
-        when(bean.isThreadCpuTimeSupported()).thenReturn(false);
-        when(bean.isThreadContentionMonitoringSupported()).thenReturn(true);
-        
-        when(connection.createProxy(captor1.capture(), captor2.capture())).thenThrow(new MalformedObjectNameException()).thenReturn(bean);
-        
-        ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
-        
-        /* ************* */
-        
-        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool);
-        VMThreadCapabilities caps = collector.getVMThreadCapabilities();
-
-        String beanName = captor1.getValue();
-        assertEquals(ManagementFactory.THREAD_MXBEAN_NAME, beanName);
-        
-        Class clazz = captor2.getValue();
-        assertEquals(ThreadMXBean.class.getName(), clazz.getName());
-        
-        verify(threadDao).loadCapabilities(reference);
-        
-        verify(connector).attach();
-        verify(connector).connect();
-        verify(connector).close();
-        
-        verify(threadDao).saveCapabilities(reference, caps);
-        
-        assertTrue(caps.supportContentionMonitor());
-        assertFalse(caps.supportCPUTime());
-        assertFalse(caps.supportThreadAllocatedMemory());
-    }
-    
-    @SuppressWarnings("restriction")
-    @Test
-    public void testHasThreadMemorySupport() throws Exception {
-        
-        VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class);
-        when(referenceCaps.supportContentionMonitor()).thenReturn(true);
-        when(referenceCaps.supportCPUTime()).thenReturn(false);
-        
-        VmRef reference = mock(VmRef.class);
-        
-        ThreadDao threadDao = mock(ThreadDao.class);
-        when(threadDao.loadCapabilities(reference)).thenReturn(null);
-        
-        MXBeanConnector connector = mock(MXBeanConnector.class);
-        when(connector.isAttached()).thenReturn(false).thenReturn(true);
-        
-        MXBeanConnection connection = mock(MXBeanConnection.class);
-        when(connector.connect()).thenReturn(connection);
-                
-        ArgumentCaptor<String> captor1 = ArgumentCaptor.forClass(String.class);
-        ArgumentCaptor<Class> captor2 = ArgumentCaptor.forClass(Class.class);
-        
-        com.sun.management.ThreadMXBean bean = mock(com.sun.management.ThreadMXBean.class);
-        when(bean.isThreadCpuTimeSupported()).thenReturn(false);
-        when(bean.isThreadContentionMonitoringSupported()).thenReturn(true);
-        when(bean.isThreadAllocatedMemorySupported()).thenReturn(true);
-        
-        when(connection.createProxy(captor1.capture(), captor2.capture())).thenReturn(bean);
-        
-        ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
-        
-        /* ************* */
-        
-        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool);
-        VMThreadCapabilities caps = collector.getVMThreadCapabilities();
-
-        String beanName = captor1.getValue();
-        assertEquals(ManagementFactory.THREAD_MXBEAN_NAME, beanName);
-        
-        Class clazz = captor2.getValue();
-        assertEquals(com.sun.management.ThreadMXBean.class.getName(), clazz.getName());
-        
-        verify(threadDao).loadCapabilities(reference);
-        
-        verify(connector).attach();
-        verify(connector).connect();
-        verify(connector).close();
-        
-        verify(threadDao).saveCapabilities(reference, caps);
-        
-        assertTrue(caps.supportContentionMonitor());
-        assertFalse(caps.supportCPUTime());
-        assertTrue(caps.supportThreadAllocatedMemory());
-    }
-    
-    @Test
-    public void testVMCapabilitiesInDAO() throws Exception {
-        
-        VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class);
-        when(referenceCaps.supportContentionMonitor()).thenReturn(true);
-        when(referenceCaps.supportCPUTime()).thenReturn(false);
-        
-        VmRef reference = mock(VmRef.class);
-        
-        ThreadDao threadDao = mock(ThreadDao.class);
-        when(threadDao.loadCapabilities(reference)).thenReturn(referenceCaps);
-        
-        MXBeanConnector connector = mock(MXBeanConnector.class);
-        
-        ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
-        
-        /* ************* */
-        
-        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool);
-        VMThreadCapabilities caps = collector.getVMThreadCapabilities();
-
-        verify(threadDao).loadCapabilities(reference);
-        
-        verify(connector, times(0)).attach();
-        verify(connector, times(0)).connect();
-        verify(connector, times(0)).close();
-        
-        verify(threadDao, times(0)).saveCapabilities(reference, caps);
-        
-        assertTrue(caps.supportContentionMonitor());
-        assertFalse(caps.supportCPUTime());
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorTest.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.collector.impl;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.redhat.thermostat.client.command.RequestQueue;
+import com.redhat.thermostat.common.command.Request;
+import com.redhat.thermostat.common.command.RequestResponseListener;
+import com.redhat.thermostat.common.dao.HostRef;
+import com.redhat.thermostat.common.dao.VmRef;
+import com.redhat.thermostat.thread.collector.ThreadCollector;
+import com.redhat.thermostat.thread.collector.VMThreadCapabilities;
+import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.harvester.HarvesterCommand;
+
+public class ThreadCollectorTest {
+
+    @Test
+    public void testVMCapabilitiesNotInDAO() throws Exception {
+        
+        HostRef agent = mock(HostRef.class);
+        when(agent.getAgentId()).thenReturn("42");
+        
+        VmRef reference = mock(VmRef.class);
+        when(reference.getIdString()).thenReturn("00101010");
+        when(reference.getAgent()).thenReturn(agent);
+        ThreadDao threadDao = mock(ThreadDao.class);
+        
+        VMThreadCapabilities resCaps = mock(VMThreadCapabilities.class);
+        when(threadDao.loadCapabilities(reference)).thenReturn(null).thenReturn(resCaps);
+        
+        final Request request = mock(Request.class);
+        final RequestQueue requestQueue = mock(RequestQueue.class);
+        
+        final ArgumentCaptor<RequestResponseListener> captor = ArgumentCaptor.forClass(RequestResponseListener.class);
+        doNothing().when(request).addListener(captor.capture());
+        
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Request req = (Request) invocation.getArguments()[0];
+                assertSame(request, req);
+                
+                RequestResponseListener listener = captor.getValue();
+                listener.fireComplete(null, null);
+                
+                return null;
+            }
+
+        }).when(requestQueue).putRequest(request);
+                
+        /* ************* */
+        
+        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) {
+            @Override
+            Request createRequest() {
+                return request;
+            }
+            @Override
+            RequestQueue getRequestQueue() {
+                return requestQueue;
+            }
+        };
+        
+        VMThreadCapabilities caps = collector.getVMThreadCapabilities();
+
+        verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.VM_CAPS.name());
+        verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010");
+        verify(request).setParameter(HarvesterCommand.AGENT_ID.name(), "42");
+        
+        verify(requestQueue).putRequest(request);
+        
+        verify(threadDao, times(2)).loadCapabilities(reference);
+        assertSame(resCaps, caps);
+    }
+    
+    @Test
+    public void testVMCapabilitiesInDAO() throws Exception {
+        
+        VmRef reference = mock(VmRef.class);
+        ThreadDao threadDao = mock(ThreadDao.class);
+        
+        VMThreadCapabilities resCaps = mock(VMThreadCapabilities.class);
+        when(threadDao.loadCapabilities(reference)).thenReturn(resCaps);
+        
+        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) {
+            @Override
+            Request createRequest() {
+                fail();
+                return null;
+            }
+            @Override
+            RequestQueue getRequestQueue() {
+                fail();
+                return null;
+            }
+        };
+        
+        VMThreadCapabilities caps = collector.getVMThreadCapabilities();
+ 
+        verify(threadDao, times(1)).loadCapabilities(reference);
+        assertSame(resCaps, caps);
+    }
+    
+    @Test
+    public void testStart() {
+        
+        HostRef agent = mock(HostRef.class);
+        when(agent.getAgentId()).thenReturn("42");
+        
+        final Request request = mock(Request.class);
+        final RequestQueue requestQueue = mock(RequestQueue.class);
+        ThreadDao threadDao = mock(ThreadDao.class);
+        VmRef reference = mock(VmRef.class);
+        when(reference.getIdString()).thenReturn("00101010");
+        when(reference.getAgent()).thenReturn(agent);
+        
+        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) {
+            @Override
+            Request createRequest() {
+                return request;
+            }
+            @Override
+            RequestQueue getRequestQueue() {
+                return requestQueue;
+            }
+        };
+        collector.startHarvester();
+        
+        verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.START.name());
+        verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010");
+        verify(request).setParameter(HarvesterCommand.AGENT_ID.name(), "42");
+        
+        verify(requestQueue).putRequest(request);
+    }
+    
+    
+    @Test
+    public void testStop() {
+
+        final Request request = mock(Request.class);
+        final RequestQueue requestQueue = mock(RequestQueue.class);
+        ThreadDao threadDao = mock(ThreadDao.class);
+        VmRef reference = mock(VmRef.class);
+        when(reference.getIdString()).thenReturn("00101010");
+        
+        ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) {
+            @Override
+            Request createRequest() {
+                return request;
+            }
+            @Override
+            RequestQueue getRequestQueue() {
+                return requestQueue;
+            }
+        };
+        collector.stopHarvester();
+        
+        verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.STOP.name());
+        verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010");
+        
+        verify(requestQueue).putRequest(request);
+    }    
+}
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java	Wed Aug 29 21:22:47 2012 +0200
@@ -61,7 +61,9 @@
     public void testCreateConnectionKey() {
         Storage storage = mock(Storage.class);
         
+        @SuppressWarnings("unused")
         ThreadDaoImpl dao = new ThreadDaoImpl(storage);
+        
         verify(storage).createConnectionKey(ThreadDao.THREAD_CAPABILITIES);
         verify(storage).createConnectionKey(ThreadDao.THREAD_INFO);
         verify(storage).createConnectionKey(ThreadDao.THREAD_SUMMARY);
@@ -102,14 +104,6 @@
     public void testSaveVMCapabilities() {
         Storage storage = mock(Storage.class);
         
-        VmRef ref = mock(VmRef.class);
-        when(ref.getId()).thenReturn(42);
-        
-        HostRef agent = mock(HostRef.class);
-        when(agent.getAgentId()).thenReturn("0xcafe");
-
-        when(ref.getAgent()).thenReturn(agent);
-
         Chunk answer = mock(Chunk.class);
         when(answer.get(ThreadDao.CONTENTION_MONITOR_KEY)).thenReturn(false);
         when(answer.get(ThreadDao.CPU_TIME_KEY)).thenReturn(true);
@@ -120,7 +114,7 @@
         when(caps.supportThreadAllocatedMemory()).thenReturn(true);
         
         ThreadDaoImpl dao = new ThreadDaoImpl(storage);
-        dao.saveCapabilities(ref, caps);
+        dao.saveCapabilities("42", "0xcafe", caps);
 
         ArgumentCaptor<Chunk> queryCaptor = ArgumentCaptor.forClass(Chunk.class);
         verify(storage).putChunk(queryCaptor.capture());
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/HarvesterTest.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.harvester;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
+
+import static org.junit.Assert.*;
+
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MalformedObjectNameException;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.redhat.thermostat.thread.collector.impl.ThreadMXInfo;
+import com.redhat.thermostat.thread.collector.impl.ThreadMXSummary;
+import com.redhat.thermostat.thread.collector.impl.VMThreadMXCapabilities;
+import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.utils.management.MXBeanConnection;
+import com.redhat.thermostat.utils.management.MXBeanConnector;
+
+public class HarvesterTest {
+
+    @Test
+    public void testStart() {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        when(mockedConnector.isAttached()).thenReturn(false);
+        
+        ArgumentCaptor<Runnable> arg0 = ArgumentCaptor.forClass(Runnable.class);
+        ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Long> arg2 = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<TimeUnit> arg3 = ArgumentCaptor.forClass(TimeUnit.class);
+        
+        final boolean [] harvestDataCalled = new boolean[1];
+        
+        when(executor.scheduleAtFixedRate(arg0.capture(), arg1.capture(), arg2.capture(), arg3.capture())).thenReturn(null);
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") {
+            { connector = mockedConnector; }
+            @Override
+            synchronized void harvestData() {
+                harvestDataCalled[0] = true;
+            }
+        };
+        
+        harvester.start();
+        
+        verify(executor).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+        
+        assertTrue(arg1.getValue() == 0);
+        assertTrue(arg2.getValue() == 1);
+        assertEquals(TimeUnit.SECONDS, arg3.getValue());
+        
+        Runnable action = arg0.getValue();
+        assertNotNull(action);
+        
+        action.run();
+        
+        assertTrue(harvestDataCalled[0]);
+        
+        assertTrue(harvester.isConnected());
+    }
+    
+    /**
+     *  Mostly the same as testStart, but we call harvester.start() twice
+     */
+    @Test
+    public void testStartOnce() throws Exception {
+
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        when(mockedConnector.isAttached()).thenReturn(false);
+        
+        ArgumentCaptor<Runnable> arg0 = ArgumentCaptor.forClass(Runnable.class);
+        ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Long> arg2 = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<TimeUnit> arg3 = ArgumentCaptor.forClass(TimeUnit.class);
+        
+        final boolean [] harvestDataCalled = new boolean[1];
+        
+        when(executor.scheduleAtFixedRate(arg0.capture(), arg1.capture(), arg2.capture(), arg3.capture())).thenReturn(null);
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") {
+            { connector = mockedConnector; }
+            @Override
+            synchronized void harvestData() {
+                harvestDataCalled[0] = true;
+            }
+        };
+        
+        harvester.start();
+        harvester.start();
+
+        verify(mockedConnector, times(1)).isAttached();
+        verify(mockedConnector, times(1)).attach();
+        verify(executor, times(1)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+        
+        assertTrue(arg1.getValue() == 0);
+        assertTrue(arg2.getValue() == 1);
+        assertEquals(TimeUnit.SECONDS, arg3.getValue());
+        
+        Runnable action = arg0.getValue();
+        assertNotNull(action);
+        
+        action.run();
+        
+        assertTrue(harvestDataCalled[0]);
+        
+        assertTrue(harvester.isConnected());
+    }
+    
+    @Test
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void testStopAfterStarting() throws Exception {
+        
+        ScheduledFuture future = mock(ScheduledFuture.class);
+        
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        
+        MXBeanConnection connection = mock(MXBeanConnection.class);
+        
+        when(mockedConnector.connect()).thenReturn(connection);
+        
+        when(mockedConnector.isAttached()).thenReturn(true);
+        
+        when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe")
+        {{ connector = mockedConnector; }};
+        
+        harvester.start();
+        
+        assertTrue(harvester.isConnected());
+        
+        harvester.stop();
+        
+        verify(future).cancel(false);
+        verify(connection).close();
+        
+        // needs to be 2 times, since is called once in start
+        verify(mockedConnector, times(2)).isAttached();
+        verify(mockedConnector).close();
+        
+        assertFalse(harvester.isConnected());
+    }
+    
+    /**
+     *  Mostly the same as testStopAfterStarting, but we call harvester.stop()
+     *  twice
+     */
+    @Test
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void testStopTwiceAfterStarting() throws Exception {
+        
+        ScheduledFuture future = mock(ScheduledFuture.class);
+        
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        
+        MXBeanConnection connection = mock(MXBeanConnection.class);
+        
+        when(mockedConnector.connect()).thenReturn(connection);
+        
+        when(mockedConnector.isAttached()).thenReturn(true);
+        
+        when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe")
+        {{ connector = mockedConnector; }};
+        
+        harvester.start();
+        
+        assertTrue(harvester.isConnected());
+        
+        harvester.stop();
+        harvester.stop();
+
+        verify(future, times(1)).cancel(false);
+        verify(connection, times(1)).close();
+        
+        // needs to be 2 times, since is called once in start
+        verify(mockedConnector, times(2)).isAttached();
+        
+        verify(mockedConnector, times(1)).close();
+        
+        assertFalse(harvester.isConnected());
+    }
+    
+    @Test
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void testStopNotStarted() throws Exception {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        
+        ScheduledFuture future = mock(ScheduledFuture.class);
+        when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);
+        
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        MXBeanConnection connection = mock(MXBeanConnection.class);
+        when(mockedConnector.connect()).thenReturn(connection);
+        when(mockedConnector.isAttached()).thenReturn(true);
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe")
+        {{ connector = mockedConnector; }};
+        
+        verify(executor, times(0)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+        
+        assertFalse(harvester.isConnected());
+        
+        harvester.stop();
+
+        assertFalse(harvester.isConnected());
+        verify(mockedConnector, times(0)).isAttached();
+        
+        verify(future, times(0)).cancel(false);
+    }
+    
+    @Test
+    public void testHarvestData() {
+        
+        long ids[] = new long [] {
+            0, 1
+        };
+        
+        ThreadInfo info1 = mock(ThreadInfo.class);
+        when(info1.getThreadName()).thenReturn("fluff1");
+        when(info1.getThreadId()).thenReturn(1l);
+
+        ThreadInfo info2 = mock(ThreadInfo.class);
+        when(info2.getThreadName()).thenReturn("fluff2");
+        when(info2.getThreadId()).thenReturn(2l);
+
+        ThreadInfo[] infos = new ThreadInfo[] {
+            info1,
+            info2     
+        };
+                
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+        ArgumentCaptor<String> vmCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> agentCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<ThreadMXSummary> summaryCapture = ArgumentCaptor.forClass(ThreadMXSummary.class);
+
+        ThreadDao dao = mock(ThreadDao.class);
+        doNothing().when(dao).saveSummary(vmCapture.capture(), agentCapture.capture(), summaryCapture.capture());
+        
+        ArgumentCaptor<String> vmCapture2 = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> agentCapture2 = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<ThreadMXInfo> threadInfoCapture = ArgumentCaptor.forClass(ThreadMXInfo.class);        
+        doNothing().when(dao).saveThreadInfo(vmCapture2.capture(), agentCapture2.capture(), threadInfoCapture.capture());
+
+        final ThreadMXBean collectorBean = mock(ThreadMXBean.class);
+
+        when(collectorBean.getThreadCount()).thenReturn(42);
+        when(collectorBean.getAllThreadIds()).thenReturn(ids);
+        when(collectorBean.getThreadInfo(ids, true, true)).thenReturn(infos);
+
+        final boolean [] getDataCollectorBeanCalled = new boolean[1];
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") {
+            @Override
+            ThreadMXBean getDataCollectorBean(MXBeanConnection connection)
+                    throws MalformedObjectNameException {
+                getDataCollectorBeanCalled[0] = true;
+                return collectorBean;
+            }
+        };
+
+        harvester.harvestData();
+        
+        assertTrue(getDataCollectorBeanCalled[0]);
+        
+        verify(collectorBean).getThreadInfo(ids, true, true);
+        
+        verify(dao).saveSummary(anyString(), anyString(), any(ThreadMXSummary.class));
+        
+        // once for each thread info
+        verify(dao, times(2)).saveThreadInfo(anyString(), anyString(), any(ThreadMXInfo.class));
+        
+        assertEquals(42, summaryCapture.getValue().currentLiveThreads());
+        assertEquals("42", vmCapture.getValue());
+        assertEquals("0xcafe", agentCapture.getValue());
+        
+        assertEquals(42, summaryCapture.getValue().currentLiveThreads());
+        assertEquals("42", vmCapture2.getAllValues().get(0));
+        assertEquals("42", vmCapture2.getAllValues().get(1));
+
+        assertEquals("0xcafe", agentCapture2.getAllValues().get(0));
+        assertEquals("0xcafe", agentCapture2.getAllValues().get(1));
+        
+        List<ThreadMXInfo> threadInfos = threadInfoCapture.getAllValues();
+        assertEquals(2, threadInfos.size());
+        
+        assertEquals("fluff1", threadInfos.get(0).getName());
+        assertEquals("fluff2", threadInfos.get(1).getName());
+        
+        verify(collectorBean, times(1)).getThreadCpuTime(1l);
+        verify(collectorBean, times(1)).getThreadCpuTime(2l);
+    }
+    
+    @Test
+    public void testSaveVmCaps() {
+
+        final MXBeanConnector mockedConnector = mock(MXBeanConnector.class);
+        when(mockedConnector.isAttached()).thenReturn(true);
+        
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+        ArgumentCaptor<String> vmCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> agentCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<VMThreadMXCapabilities> capsCapture = ArgumentCaptor.forClass(VMThreadMXCapabilities.class);
+
+        ThreadDao dao = mock(ThreadDao.class);
+        doNothing().when(dao).saveCapabilities(vmCapture.capture(), agentCapture.capture(), capsCapture.capture());
+      
+        final ThreadMXBean collectorBean = mock(ThreadMXBean.class);
+        when(collectorBean.isThreadCpuTimeSupported()).thenReturn(true);
+        when(collectorBean.isThreadContentionMonitoringSupported()).thenReturn(true);
+
+        final boolean [] getDataCollectorBeanCalled = new boolean[1];
+        
+        Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") {
+            { connector = mockedConnector; }
+            @Override
+            ThreadMXBean getDataCollectorBean(MXBeanConnection connection)
+                    throws MalformedObjectNameException {
+                getDataCollectorBeanCalled[0] = true;
+                return collectorBean;
+            }
+        };
+
+        harvester.saveVmCaps();
+        assertTrue(getDataCollectorBeanCalled[0]);
+        
+        verify(dao, times(1)).saveCapabilities(anyString(), anyString(), any(VMThreadMXCapabilities.class));
+        assertEquals("42", vmCapture.getValue());
+        assertEquals("0xcafe", agentCapture.getValue());
+
+        List<String> features = capsCapture.getValue().getSupportedFeaturesList();
+        assertEquals(2, features.size());
+        assertTrue(features.contains(ThreadDao.CPU_TIME));
+        assertTrue(features.contains(ThreadDao.CONTENTION_MONITOR));
+        assertFalse(features.contains(ThreadDao.THREAD_ALLOCATED_MEMORY));
+    }    
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.harvester;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.redhat.thermostat.common.command.Request;
+import com.redhat.thermostat.thread.dao.ThreadDao;
+
+public class ThreadHarvesterTest {
+
+    @Test
+    public void testStart() {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        Request request = mock(Request.class);
+        
+        final boolean[] getHarvesterCalled = new boolean[1];
+        final Harvester harverster = mock(Harvester.class);
+        
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        
+        when(request.getParameter(captor.capture())).
+            thenReturn(HarvesterCommand.START.name()).
+            thenReturn("42").
+            thenReturn("0xcafe");
+        
+        ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) {
+            @Override
+            Harvester getHarvester(String vmId, String agentId) {
+                
+                getHarvesterCalled[0] = true;
+                assertEquals("42", vmId);
+                assertEquals("0xcafe", agentId);
+                
+                return harverster;
+            }
+        };
+        threadHarvester.receive(request);
+        
+        List<String> values = captor.getAllValues();
+        assertEquals(3, values.size());
+        
+        assertEquals(HarvesterCommand.class.getName(), values.get(0));
+        assertEquals(HarvesterCommand.VM_ID.name(), values.get(1));
+        assertEquals(HarvesterCommand.AGENT_ID.name(), values.get(2));
+        
+        assertTrue(getHarvesterCalled[0]);
+        
+        verify(harverster).start();
+    }
+
+    @Test
+    public void testStop() {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        Request request = mock(Request.class);
+        
+        final Harvester harverster = mock(Harvester.class);
+        
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        
+        when(request.getParameter(captor.capture())).
+            thenReturn(HarvesterCommand.STOP.name()).
+            thenReturn("42");
+        
+        ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) {
+            { connectors.put("42", harverster); }
+        };
+        threadHarvester.receive(request);
+        
+        List<String> values = captor.getAllValues();
+        assertEquals(2, values.size());
+        
+        assertEquals(HarvesterCommand.class.getName(), values.get(0));
+        assertEquals(HarvesterCommand.VM_ID.name(), values.get(1));
+                
+        verify(harverster).stop();        
+    }
+    
+    @Test
+    public void testSaveVmCaps() {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        ThreadDao dao = mock(ThreadDao.class);
+        Request request = mock(Request.class);
+        
+        final boolean[] getHarvesterCalled = new boolean[1];
+        final Harvester harverster = mock(Harvester.class);
+        
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        
+        when(request.getParameter(captor.capture())).
+            thenReturn(HarvesterCommand.VM_CAPS.name()).
+            thenReturn("42").
+            thenReturn("0xcafe");
+        
+        ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) {
+            @Override
+            Harvester getHarvester(String vmId, String agentId) {
+                
+                getHarvesterCalled[0] = true;
+                assertEquals("42", vmId);
+                assertEquals("0xcafe", agentId);
+                
+                return harverster;
+            }
+        };
+        threadHarvester.receive(request);
+        
+        List<String> values = captor.getAllValues();
+        assertEquals(3, values.size());
+        
+        assertEquals(HarvesterCommand.class.getName(), values.get(0));
+        assertEquals(HarvesterCommand.VM_ID.name(), values.get(1));
+        assertEquals(HarvesterCommand.AGENT_ID.name(), values.get(2));
+        
+        assertTrue(getHarvesterCalled[0]);
+        
+        verify(harverster).saveVmCaps();
+    }    
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/harvester/pom.xml	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+
+ Copyright 2012 Red Hat, Inc.
+
+ This file is part of Thermostat.
+
+ Thermostat is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ Thermostat is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with Thermostat; see the file COPYING.  If not see
+ <http://www.gnu.org/licenses />.
+
+ Linking this code with other modules is making a combined work
+ based on this code.  Thus, the terms and conditions of the GNU
+ General Public License cover the whole combination.
+
+ As a special exception, the copyright holders of this code give
+ you permission to link this code with independent modules to
+ produce an executable, regardless of the license terms of these
+ independent modules, and to copy and distribute the resulting
+ executable under terms of your choice, provided that you also
+ meet, for each linked independent module, the terms and conditions
+ of the license of that module.  An independent module is a module
+ which is not derived from or based on this code.  If you modify
+ this code, you may extend this exception to your version of the
+ library, but you are not obligated to do so.  If you do not wish
+ to do so, delete this exception statement from your version.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>com.redhat.thermostat</groupId>
+    <artifactId>thermostat-thread</artifactId>
+    <version>0.4.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>thermostat-thread-harvester</artifactId>
+  <packaging>bundle</packaging>
+
+  <name>Thermostat Thread Info Harvester</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.redhat.thermostat</groupId>
+      <artifactId>thermostat-thread-collector</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.compendium</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Bundle-Vendor>Red Hat, Inc.</Bundle-Vendor>
+            <Bundle-Activator>com.redhat.thermostat.thread.harvester.osgi.Activator</Bundle-Activator>
+            <Bundle-SymbolicName>com.redhat.thermostat.thread.harvester</Bundle-SymbolicName>
+            <Private-Package>
+              com.redhat.thermostat.thread.harvester.osgi,
+            </Private-Package>
+            <!-- Do not autogenerate uses clauses in Manifests -->
+            <_nouses>true</_nouses>
+          </instructions>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/osgi/Activator.java	Wed Aug 29 21:22:47 2012 +0200
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.harvester.osgi;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+
+import com.redhat.thermostat.agent.command.ReceiverRegistry;
+import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.harvester.ThreadHarvester;
+
+public class Activator implements BundleActivator {
+    
+    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(24);
+    
+    @Override
+    public void start(final BundleContext context) throws Exception {
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        ServiceTracker tracker = new ServiceTracker(context, ThreadDao.class.getName(), null) {
+            @Override
+            public Object addingService(ServiceReference reference) {
+                ThreadDao threadDao = (ThreadDao) context.getService(reference);
+
+                ThreadHarvester harvester = new ThreadHarvester(executor, threadDao);
+  
+                ReceiverRegistry registry = new ReceiverRegistry(context);
+                registry.registerReceiver(harvester);
+                
+                return super.addingService(reference);
+            }
+        };
+        tracker.open();
+    }
+
+    @Override
+    public void stop(BundleContext context) throws Exception {
+        if (executor != null) {
+            executor.shutdown();
+        }        
+    }
+}
--- a/thread/pom.xml	Wed Aug 29 14:32:51 2012 -0400
+++ b/thread/pom.xml	Wed Aug 29 21:22:47 2012 +0200
@@ -60,10 +60,10 @@
 
   <modules>
     <module>collector</module>
+    <module>harvester</module>
     <module>client-common</module>
     <module>client-swing</module>
     <module>client-controllers</module>
   </modules>
-
 </project>