changeset 1048:edaf77f17cbd

Make thread harvester publish status to storage Reviewed-by: jerboaa Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2013-March/006191.html
author Omair Majid <omajid@redhat.com>
date Tue, 26 Mar 2013 11:17:08 -0400
parents 7dc61d031410
children 3f0416e9db80
files thread/client-common/src/main/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadMXBeanCollector.java thread/client-common/src/test/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadCollectorTest.java thread/collector/src/main/java/com/redhat/thermostat/thread/collector/HarvesterCommand.java thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java thread/collector/src/main/java/com/redhat/thermostat/thread/model/ThreadHarvestingStatus.java thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadBackend.java thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadBackendTest.java thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java
diffstat 11 files changed, 275 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/thread/client-common/src/main/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadMXBeanCollector.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/client-common/src/main/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadMXBeanCollector.java	Tue Mar 26 11:17:08 2013 -0400
@@ -58,6 +58,7 @@
 import com.redhat.thermostat.thread.client.common.collector.ThreadCollector;
 import com.redhat.thermostat.thread.collector.HarvesterCommand;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 import com.redhat.thermostat.thread.model.ThreadInfoData;
 import com.redhat.thermostat.thread.model.ThreadSummary;
 import com.redhat.thermostat.thread.model.VMThreadCapabilities;
@@ -167,34 +168,11 @@
     
     @Override
     public boolean isHarvesterCollecting() {
-        Request harvester = createRequest();
-        harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.IS_COLLECTING.name());
-        harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString());
-
-        final CountDownLatch latch = new CountDownLatch(1);        
-        final boolean[] result = new boolean[1];
-
-        harvester.addListener(new RequestResponseListener() {
-            @Override
-            public void fireComplete(Request request, Response response) {
-                switch (response.getType()) {
-                case OK:
-                    result[0] = true;
-                    break;
-                default:
-                    break;
-                }
-                latch.countDown();
-            }
-        });
-        
-        try {
-            enqueueRequest(harvester);
-            latch.await();
-        } catch (CommandException e) {
-            logger.log(Level.WARNING, "Failed to enqueue request", e);
-        } catch (InterruptedException ignore) {}
-        return result[0];
+        ThreadHarvestingStatus status = threadDao.getLatestHarvestingStatus(ref);
+        if (status == null) {
+            return false;
+        }
+        return status.isHarvesting();
     }
     
     @Override
--- a/thread/client-common/src/test/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadCollectorTest.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/client-common/src/test/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadCollectorTest.java	Tue Mar 26 11:17:08 2013 -0400
@@ -39,6 +39,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -62,6 +63,7 @@
 import com.redhat.thermostat.thread.client.common.collector.ThreadCollector;
 import com.redhat.thermostat.thread.collector.HarvesterCommand;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 import com.redhat.thermostat.thread.model.VMThreadCapabilities;
 
 public class ThreadCollectorTest {
@@ -117,6 +119,18 @@
         verify(threadDao).loadCapabilities(reference);
         assertSame(resCaps, caps);
     }
+
+    @Test
+    public void testHarvesterCollecting() {
+        ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class);
+        when(status.isHarvesting()).thenReturn(true);
+        ThreadCollector collector = new ThreadMXBeanCollector(context, reference);
+        when(threadDao.getLatestHarvestingStatus(reference)).thenReturn(status);
+
+        collector.setThreadDao(threadDao);
+
+        assertTrue(collector.isHarvesterCollecting());
+    }
     
     @Test
     public void testStart() {
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/HarvesterCommand.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/HarvesterCommand.java	Tue Mar 26 11:17:08 2013 -0400
@@ -40,7 +40,6 @@
 
     START,
     STOP,
-    IS_COLLECTING,
     
     VM_ID;
 
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java	Tue Mar 26 11:17:08 2013 -0400
@@ -43,6 +43,7 @@
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.VmRef;
 import com.redhat.thermostat.thread.model.ThreadInfoData;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 import com.redhat.thermostat.thread.model.ThreadSummary;
 import com.redhat.thermostat.thread.model.VMThreadCapabilities;
 
@@ -80,6 +81,18 @@
     ThreadSummary loadLastestSummary(VmRef ref);
     List<ThreadSummary> loadSummary(VmRef ref, long since);
 
+    static final String HARVESTING_DATA = "harvesting";
+    static final Key<String> HARVESTING_STATUS_KEY = new Key<String> (HARVESTING_DATA, false);
+    static final Category<ThreadHarvestingStatus> THREAD_HARVESTING_STATUS =
+            new Category<>("vm-thread-harvesting", ThreadHarvestingStatus.class,
+                    Key.AGENT_ID,
+                    Key.VM_ID,
+                    Key.TIMESTAMP,
+                    HARVESTING_STATUS_KEY);
+
+    ThreadHarvestingStatus getLatestHarvestingStatus(VmRef vm);
+    void saveHarvestingStatus(ThreadHarvestingStatus status);
+
     static final String THREAD_STATE = "threadState";
     static final Key<String> THREAD_STATE_KEY = new Key<String>(THREAD_STATE, false);
     static final String THREAD_ID = "threadId";
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java	Tue Mar 26 11:17:08 2013 -0400
@@ -49,6 +49,7 @@
 import com.redhat.thermostat.storage.core.VmRef;
 import com.redhat.thermostat.storage.model.Pojo;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 import com.redhat.thermostat.thread.model.ThreadInfoData;
 import com.redhat.thermostat.thread.model.ThreadSummary;
 import com.redhat.thermostat.thread.model.VMThreadCapabilities;
@@ -60,6 +61,7 @@
         this.storage = storage;
         storage.registerCategory(THREAD_CAPABILITIES);
         storage.registerCategory(THREAD_SUMMARY);
+        storage.registerCategory(THREAD_HARVESTING_STATUS);
         storage.registerCategory(THREAD_INFO);
     }
 
@@ -123,7 +125,27 @@
         
         return result;
     }
-    
+
+    @Override
+    public void saveHarvestingStatus(ThreadHarvestingStatus status) {
+        Put add = storage.createAdd(THREAD_HARVESTING_STATUS);
+        add.setPojo(status);
+        add.apply();
+    }
+
+    @Override
+    public ThreadHarvestingStatus getLatestHarvestingStatus(VmRef vm) {
+        Query<ThreadHarvestingStatus> query = prepareQuery(THREAD_HARVESTING_STATUS, vm);
+        query.sort(Key.TIMESTAMP, Query.SortDirection.DESCENDING);
+        query.limit(1);
+
+        Cursor<ThreadHarvestingStatus> cursor = query.execute();
+        if (cursor.hasNext()) {
+            return cursor.next();
+        }
+        return null;
+    }
+
     @Override
     public void saveThreadInfo(ThreadInfoData info) {
         Put add = storage.createAdd(THREAD_INFO);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/model/ThreadHarvestingStatus.java	Tue Mar 26 11:17:08 2013 -0400
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012, 2013 Red Hat, Inc.
+ *
+ * This file is part of Thermostat.
+ *
+ * Thermostat is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 2, or (at your
+ * option) any later version.
+ *
+ * Thermostat is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Thermostat; see the file COPYING.  If not see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Linking this code with other modules is making a combined work
+ * based on this code.  Thus, the terms and conditions of the GNU
+ * General Public License cover the whole combination.
+ *
+ * As a special exception, the copyright holders of this code give
+ * you permission to link this code with independent modules to
+ * produce an executable, regardless of the license terms of these
+ * independent modules, and to copy and distribute the resulting
+ * executable under terms of your choice, provided that you also
+ * meet, for each linked independent module, the terms and conditions
+ * of the license of that module.  An independent module is a module
+ * which is not derived from or based on this code.  If you modify
+ * this code, you may extend this exception to your version of the
+ * library, but you are not obligated to do so.  If you do not wish
+ * to do so, delete this exception statement from your version.
+ */
+
+package com.redhat.thermostat.thread.model;
+
+import com.redhat.thermostat.storage.core.Entity;
+import com.redhat.thermostat.storage.core.Persist;
+import com.redhat.thermostat.storage.model.BasePojo;
+
+@Entity
+public class ThreadHarvestingStatus extends BasePojo {
+
+    private int vmId;
+    private long timeStamp;
+    private boolean collecting;
+
+    @Persist
+    public int getVmId() {
+        return vmId;
+    }
+
+    @Persist
+    public void setVmId(int newVmId) {
+        this.vmId = newVmId;
+    }
+
+    @Persist
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    @Persist
+    public void setTimeStamp(long newTimeStamp) {
+        this.timeStamp = newTimeStamp;
+    }
+
+    @Persist
+    public boolean isHarvesting() {
+        return collecting;
+    }
+
+    @Persist
+    public void setHarvesting(boolean collecting) {
+        this.collecting = collecting;
+    }
+
+}
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java	Tue Mar 26 11:17:08 2013 -0400
@@ -38,6 +38,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -49,17 +50,19 @@
 
 import org.junit.Test;
 
+import com.redhat.thermostat.storage.core.Add;
 import com.redhat.thermostat.storage.core.Category;
 import com.redhat.thermostat.storage.core.Cursor;
 import com.redhat.thermostat.storage.core.HostRef;
 import com.redhat.thermostat.storage.core.Key;
 import com.redhat.thermostat.storage.core.Query;
 import com.redhat.thermostat.storage.core.Query.Criteria;
+import com.redhat.thermostat.storage.core.Query.SortDirection;
 import com.redhat.thermostat.storage.core.Replace;
 import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.VmRef;
-import com.redhat.thermostat.storage.core.Query.Criteria;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 import com.redhat.thermostat.thread.model.VMThreadCapabilities;
 
 public class ThreadDaoImplTest {
@@ -72,6 +75,7 @@
         ThreadDaoImpl dao = new ThreadDaoImpl(storage);
         
         verify(storage).registerCategory(ThreadDao.THREAD_CAPABILITIES);
+        verify(storage).registerCategory(ThreadDao.THREAD_HARVESTING_STATUS);
         verify(storage).registerCategory(ThreadDao.THREAD_INFO);
         verify(storage).registerCategory(ThreadDao.THREAD_SUMMARY);
     }
@@ -159,7 +163,55 @@
         verify(storage).createReplace(ThreadDao.THREAD_CAPABILITIES);
         verify(replace).setPojo(caps);
         verify(replace).apply();
+    }
 
+    @Test
+    public void testGetLatestHarvestingStatus() {
+        VmRef vm = mock(VmRef.class);
+        when(vm.getId()).thenReturn(42);
+        when(vm.getIdString()).thenReturn("42");
+
+        HostRef agent = mock(HostRef.class);
+        when(agent.getAgentId()).thenReturn("0xcafe");
+        when(vm.getAgent()).thenReturn(agent);
+
+        Storage storage = mock(Storage.class);
+        Query<ThreadHarvestingStatus> query = mock(Query.class);
+        Cursor<ThreadHarvestingStatus> cursor = mock(Cursor.class);
+        ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class);
+
+        when(cursor.hasNext()).thenReturn(true);
+        when(cursor.next()).thenReturn(status);
+        when(query.execute()).thenReturn(cursor);
+
+        when(storage.createQuery(ThreadDaoImpl.THREAD_HARVESTING_STATUS)).thenReturn(query);
+
+        ThreadDaoImpl dao = new ThreadDaoImpl(storage);
+        ThreadHarvestingStatus result = dao.getLatestHarvestingStatus(vm);
+
+        verify(query).where(Key.AGENT_ID, Criteria.EQUALS, agent.getAgentId());
+        verify(query).where(Key.VM_ID, Criteria.EQUALS, vm.getId());
+        verify(query).sort(Key.TIMESTAMP, SortDirection.DESCENDING);
+        verify(query).execute();
+        verify(query).limit(1);
+        verifyNoMoreInteractions(query);
+
+        assertSame(status, result);
+    }
+
+    @Test
+    public void testSetHarvestingStatus() {
+        Storage storage = mock(Storage.class);
+        Add add = mock(Add.class);
+        when(storage.createAdd(ThreadDaoImpl.THREAD_HARVESTING_STATUS)).thenReturn(add);
+
+        ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class);
+
+        ThreadDaoImpl dao = new ThreadDaoImpl(storage);
+        dao.saveHarvestingStatus(status);
+
+        verify(add).setPojo(status);
+        verify(add).apply();
     }
 }
 
--- a/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadBackend.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadBackend.java	Tue Mar 26 11:17:08 2013 -0400
@@ -36,14 +36,19 @@
 
 package com.redhat.thermostat.thread.harvester;
 
+import java.util.logging.Logger;
+
 import com.redhat.thermostat.agent.VmStatusListener;
 import com.redhat.thermostat.agent.VmStatusListenerRegistrar;
 import com.redhat.thermostat.agent.command.ReceiverRegistry;
 import com.redhat.thermostat.backend.BaseBackend;
 import com.redhat.thermostat.common.Version;
+import com.redhat.thermostat.common.utils.LoggingUtils;
 
 public class ThreadBackend extends BaseBackend implements VmStatusListener {
 
+    private static final Logger logger = LoggingUtils.getLogger(ThreadBackend.class);
+
     private final ReceiverRegistry registry;
     private final ThreadHarvester harvester;
 
@@ -70,6 +75,7 @@
         }
         vmListener.register(this);
         registry.registerReceiver(harvester);
+        // FIXME enable harvester
         active = true;
         return true;
     }
@@ -81,6 +87,7 @@
         }
         vmListener.unregister(this);
         registry.unregisterReceivers();
+        // FIXME disable harvester
         active = false;
         return true;
     }
@@ -92,15 +99,19 @@
 
     @Override
     public void vmStatusChanged(Status newStatus, int pid) {
+        String vmId = String.valueOf(pid);
         switch (newStatus) {
         case VM_STARTED: case VM_ACTIVE:
             /* this is blocking */
-            harvester.saveVmCaps(String.valueOf(pid));
+            harvester.saveVmCaps(vmId);
+            harvester.addThreadHarvestingStatus(vmId);
             break;
         case VM_STOPPED:
+            harvester.stopHarvester(vmId);
+            harvester.addThreadHarvestingStatus(vmId);
+            break;
         default:
-            /* nothing to do */
-            break;
+            logger.warning("Unexpected VM state: " + newStatus);
         }
     }
 
--- a/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java	Tue Mar 26 11:17:08 2013 -0400
@@ -43,12 +43,15 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import com.redhat.thermostat.agent.command.RequestReceiver;
+import com.redhat.thermostat.common.Clock;
+import com.redhat.thermostat.common.SystemClock;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Response;
 import com.redhat.thermostat.common.command.Response.ResponseType;
 
 import com.redhat.thermostat.thread.collector.HarvesterCommand;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 
 public class ThreadHarvester implements RequestReceiver {
 
@@ -56,10 +59,16 @@
     Map<String, Harvester> connectors;
 
     private ThreadDao dao;
-    
+    private Clock clock;
+
     public ThreadHarvester(ScheduledExecutorService executor) {
+        this(executor, new SystemClock());
+    }
+    
+    public ThreadHarvester(ScheduledExecutorService executor, Clock clock) {
         this.executor = executor;
-        connectors = new HashMap<>();
+        this.connectors = new HashMap<>();
+        this.clock = clock;
     }
 
     public void setThreadDao(ThreadDao dao) {
@@ -89,12 +98,6 @@
             result = stopHarvester(vmId);
             break;
         }
-        case IS_COLLECTING: {
-            // this is blocking too
-            String vmId = request.getParameter(HarvesterCommand.VM_ID.name());
-            // FIXME: this need to be replaced when we support response parameters
-            return isCollecting(vmId) ? new Response(ResponseType.OK) : new Response(ResponseType.NOK);
-        }        
         default:
             result = false;
             break;
@@ -107,17 +110,13 @@
         }
     }
     
-    private boolean isCollecting(String vmId) {
-        Harvester harvester = connectors.get(vmId);
-        if (harvester == null) {
-            return false;
+    public boolean startHarvester(String vmId) {
+        Harvester harvester = getHarvester(vmId);
+        boolean result = harvester.start();
+        if (result) {
+            updateHarvestingStatus(Integer.valueOf(vmId), result);
         }
-        return harvester.isConnected();
-    }
-    
-    private boolean startHarvester(String vmId) {
-        Harvester harvester = getHarvester(vmId);
-        return harvester.start();
+        return result;
     }
     
     boolean saveVmCaps(String vmId) {
@@ -125,14 +124,26 @@
         return harvester.saveVmCaps();
     }
     
-    private boolean stopHarvester(String vmId) {
+    public boolean stopHarvester(String vmId) {
         Harvester harvester = connectors.get(vmId);
         if (harvester != null) {
             return harvester.stop();
         }
+        updateHarvestingStatus(Integer.valueOf(vmId), false);
         return true;
     }
-    
+
+    public void addThreadHarvestingStatus(String pid) {
+        updateHarvestingStatus(Integer.valueOf(pid), connectors.containsKey(pid));
+    }
+
+    private void updateHarvestingStatus(int vmId, boolean harvesting) {
+        ThreadHarvestingStatus status = new ThreadHarvestingStatus();
+        status.setTimeStamp(clock.getRealTimeMillis());
+        status.setVmId(vmId);
+        status.setHarvesting(harvesting);
+        dao.saveHarvestingStatus(status);
+    }
     Harvester getHarvester(String vmId) {
         Harvester harvester = connectors.get(vmId);
         if (harvester == null) {
--- a/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadBackendTest.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadBackendTest.java	Tue Mar 26 11:17:08 2013 -0400
@@ -38,6 +38,7 @@
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -46,6 +47,7 @@
 import org.junit.Test;
 
 import com.redhat.thermostat.agent.VmStatusListenerRegistrar;
+import com.redhat.thermostat.agent.VmStatusListener.Status;
 import com.redhat.thermostat.agent.command.ReceiverRegistry;
 import com.redhat.thermostat.common.Version;
 
@@ -87,4 +89,20 @@
         verify(vmListenerRegistrar).unregister(backend);
         verify(receiverRegistry).unregisterReceivers();
     }
+
+    @Test
+    public void testVmStarts() {
+        backend.vmStatusChanged(Status.VM_STARTED, 10);
+
+        verify(threadHarvester).saveVmCaps("10");
+        verify(threadHarvester).addThreadHarvestingStatus("10");
+    }
+
+    @Test
+    public void testVmStops() {
+        backend.vmStatusChanged(Status.VM_STOPPED, 10);
+
+        verify(threadHarvester).stopHarvester("10");
+        verify(threadHarvester).addThreadHarvestingStatus("10");
+    }
 }
--- a/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java	Fri Mar 22 16:43:10 2013 -0400
+++ b/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java	Tue Mar 26 11:17:08 2013 -0400
@@ -48,11 +48,13 @@
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.redhat.thermostat.common.Clock;
 import com.redhat.thermostat.common.command.Request;
 import com.redhat.thermostat.common.command.Response;
 import com.redhat.thermostat.common.command.Response.ResponseType;
 import com.redhat.thermostat.thread.collector.HarvesterCommand;
 import com.redhat.thermostat.thread.dao.ThreadDao;
+import com.redhat.thermostat.thread.model.ThreadHarvestingStatus;
 
 public class ThreadHarvesterTest {
 
@@ -160,5 +162,26 @@
 
         assertEquals(ResponseType.ERROR, response.getType());
     }
+
+    @Test
+    public void testHarvestingStatus() {
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+        Clock clock = mock(Clock.class);
+        when(clock.getRealTimeMillis()).thenReturn(1l);
+        ThreadDao dao = mock(ThreadDao.class);
+
+        ThreadHarvester harvester = new ThreadHarvester(executor, clock);
+        harvester.setThreadDao(dao);
+
+        harvester.addThreadHarvestingStatus("10");
+
+        ArgumentCaptor<ThreadHarvestingStatus> statusCaptor = ArgumentCaptor.forClass(ThreadHarvestingStatus.class);
+        verify(dao).saveHarvestingStatus(statusCaptor.capture());
+
+        ThreadHarvestingStatus status = statusCaptor.getValue();
+        assertEquals(10, status.getVmId());
+        assertEquals(false, status.isHarvesting());
+        assertEquals(1, status.getTimeStamp());
+    }
 }