Mercurial > hg > release > thermostat-0.11
changeset 1048:edaf77f17cbd
Make thread harvester publish status to storage
Reviewed-by: jerboaa
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2013-March/006191.html
line wrap: on
line diff
--- a/thread/client-common/src/main/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadMXBeanCollector.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/client-common/src/main/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadMXBeanCollector.java Tue Mar 26 11:17:08 2013 -0400 @@ -58,6 +58,7 @@ import com.redhat.thermostat.thread.client.common.collector.ThreadCollector; import com.redhat.thermostat.thread.collector.HarvesterCommand; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; import com.redhat.thermostat.thread.model.ThreadInfoData; import com.redhat.thermostat.thread.model.ThreadSummary; import com.redhat.thermostat.thread.model.VMThreadCapabilities; @@ -167,34 +168,11 @@ @Override public boolean isHarvesterCollecting() { - Request harvester = createRequest(); - harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.IS_COLLECTING.name()); - harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString()); - - final CountDownLatch latch = new CountDownLatch(1); - final boolean[] result = new boolean[1]; - - harvester.addListener(new RequestResponseListener() { - @Override - public void fireComplete(Request request, Response response) { - switch (response.getType()) { - case OK: - result[0] = true; - break; - default: - break; - } - latch.countDown(); - } - }); - - try { - enqueueRequest(harvester); - latch.await(); - } catch (CommandException e) { - logger.log(Level.WARNING, "Failed to enqueue request", e); - } catch (InterruptedException ignore) {} - return result[0]; + ThreadHarvestingStatus status = threadDao.getLatestHarvestingStatus(ref); + if (status == null) { + return false; + } + return status.isHarvesting(); } @Override
--- a/thread/client-common/src/test/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadCollectorTest.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/client-common/src/test/java/com/redhat/thermostat/thread/client/common/collector/impl/ThreadCollectorTest.java Tue Mar 26 11:17:08 2013 -0400 @@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -62,6 +63,7 @@ import com.redhat.thermostat.thread.client.common.collector.ThreadCollector; import com.redhat.thermostat.thread.collector.HarvesterCommand; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; import com.redhat.thermostat.thread.model.VMThreadCapabilities; public class ThreadCollectorTest { @@ -117,6 +119,18 @@ verify(threadDao).loadCapabilities(reference); assertSame(resCaps, caps); } + + @Test + public void testHarvesterCollecting() { + ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class); + when(status.isHarvesting()).thenReturn(true); + ThreadCollector collector = new ThreadMXBeanCollector(context, reference); + when(threadDao.getLatestHarvestingStatus(reference)).thenReturn(status); + + collector.setThreadDao(threadDao); + + assertTrue(collector.isHarvesterCollecting()); + } @Test public void testStart() {
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/HarvesterCommand.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/HarvesterCommand.java Tue Mar 26 11:17:08 2013 -0400 @@ -40,7 +40,6 @@ START, STOP, - IS_COLLECTING, VM_ID;
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java Tue Mar 26 11:17:08 2013 -0400 @@ -43,6 +43,7 @@ import com.redhat.thermostat.storage.core.Storage; import com.redhat.thermostat.storage.core.VmRef; import com.redhat.thermostat.thread.model.ThreadInfoData; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; import com.redhat.thermostat.thread.model.ThreadSummary; import com.redhat.thermostat.thread.model.VMThreadCapabilities; @@ -80,6 +81,18 @@ ThreadSummary loadLastestSummary(VmRef ref); List<ThreadSummary> loadSummary(VmRef ref, long since); + static final String HARVESTING_DATA = "harvesting"; + static final Key<String> HARVESTING_STATUS_KEY = new Key<String> (HARVESTING_DATA, false); + static final Category<ThreadHarvestingStatus> THREAD_HARVESTING_STATUS = + new Category<>("vm-thread-harvesting", ThreadHarvestingStatus.class, + Key.AGENT_ID, + Key.VM_ID, + Key.TIMESTAMP, + HARVESTING_STATUS_KEY); + + ThreadHarvestingStatus getLatestHarvestingStatus(VmRef vm); + void saveHarvestingStatus(ThreadHarvestingStatus status); + static final String THREAD_STATE = "threadState"; static final Key<String> THREAD_STATE_KEY = new Key<String>(THREAD_STATE, false); static final String THREAD_ID = "threadId";
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java Tue Mar 26 11:17:08 2013 -0400 @@ -49,6 +49,7 @@ import com.redhat.thermostat.storage.core.VmRef; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; import com.redhat.thermostat.thread.model.ThreadInfoData; import com.redhat.thermostat.thread.model.ThreadSummary; import com.redhat.thermostat.thread.model.VMThreadCapabilities; @@ -60,6 +61,7 @@ this.storage = storage; storage.registerCategory(THREAD_CAPABILITIES); storage.registerCategory(THREAD_SUMMARY); + storage.registerCategory(THREAD_HARVESTING_STATUS); storage.registerCategory(THREAD_INFO); } @@ -123,7 +125,27 @@ return result; } - + + @Override + public void saveHarvestingStatus(ThreadHarvestingStatus status) { + Put add = storage.createAdd(THREAD_HARVESTING_STATUS); + add.setPojo(status); + add.apply(); + } + + @Override + public ThreadHarvestingStatus getLatestHarvestingStatus(VmRef vm) { + Query<ThreadHarvestingStatus> query = prepareQuery(THREAD_HARVESTING_STATUS, vm); + query.sort(Key.TIMESTAMP, Query.SortDirection.DESCENDING); + query.limit(1); + + Cursor<ThreadHarvestingStatus> cursor = query.execute(); + if (cursor.hasNext()) { + return cursor.next(); + } + return null; + } + @Override public void saveThreadInfo(ThreadInfoData info) { Put add = storage.createAdd(THREAD_INFO);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/model/ThreadHarvestingStatus.java Tue Mar 26 11:17:08 2013 -0400 @@ -0,0 +1,80 @@ +/* + * Copyright 2012, 2013 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.model; + +import com.redhat.thermostat.storage.core.Entity; +import com.redhat.thermostat.storage.core.Persist; +import com.redhat.thermostat.storage.model.BasePojo; + +@Entity +public class ThreadHarvestingStatus extends BasePojo { + + private int vmId; + private long timeStamp; + private boolean collecting; + + @Persist + public int getVmId() { + return vmId; + } + + @Persist + public void setVmId(int newVmId) { + this.vmId = newVmId; + } + + @Persist + public long getTimeStamp() { + return timeStamp; + } + + @Persist + public void setTimeStamp(long newTimeStamp) { + this.timeStamp = newTimeStamp; + } + + @Persist + public boolean isHarvesting() { + return collecting; + } + + @Persist + public void setHarvesting(boolean collecting) { + this.collecting = collecting; + } + +}
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java Tue Mar 26 11:17:08 2013 -0400 @@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -49,17 +50,19 @@ import org.junit.Test; +import com.redhat.thermostat.storage.core.Add; import com.redhat.thermostat.storage.core.Category; import com.redhat.thermostat.storage.core.Cursor; import com.redhat.thermostat.storage.core.HostRef; import com.redhat.thermostat.storage.core.Key; import com.redhat.thermostat.storage.core.Query; import com.redhat.thermostat.storage.core.Query.Criteria; +import com.redhat.thermostat.storage.core.Query.SortDirection; import com.redhat.thermostat.storage.core.Replace; import com.redhat.thermostat.storage.core.Storage; import com.redhat.thermostat.storage.core.VmRef; -import com.redhat.thermostat.storage.core.Query.Criteria; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; import com.redhat.thermostat.thread.model.VMThreadCapabilities; public class ThreadDaoImplTest { @@ -72,6 +75,7 @@ ThreadDaoImpl dao = new ThreadDaoImpl(storage); verify(storage).registerCategory(ThreadDao.THREAD_CAPABILITIES); + verify(storage).registerCategory(ThreadDao.THREAD_HARVESTING_STATUS); verify(storage).registerCategory(ThreadDao.THREAD_INFO); verify(storage).registerCategory(ThreadDao.THREAD_SUMMARY); } @@ -159,7 +163,55 @@ verify(storage).createReplace(ThreadDao.THREAD_CAPABILITIES); verify(replace).setPojo(caps); verify(replace).apply(); + } + @Test + public void testGetLatestHarvestingStatus() { + VmRef vm = mock(VmRef.class); + when(vm.getId()).thenReturn(42); + when(vm.getIdString()).thenReturn("42"); + + HostRef agent = mock(HostRef.class); + when(agent.getAgentId()).thenReturn("0xcafe"); + when(vm.getAgent()).thenReturn(agent); + + Storage storage = mock(Storage.class); + Query<ThreadHarvestingStatus> query = mock(Query.class); + Cursor<ThreadHarvestingStatus> cursor = mock(Cursor.class); + ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class); + + when(cursor.hasNext()).thenReturn(true); + when(cursor.next()).thenReturn(status); + when(query.execute()).thenReturn(cursor); + + when(storage.createQuery(ThreadDaoImpl.THREAD_HARVESTING_STATUS)).thenReturn(query); + + ThreadDaoImpl dao = new ThreadDaoImpl(storage); + ThreadHarvestingStatus result = dao.getLatestHarvestingStatus(vm); + + verify(query).where(Key.AGENT_ID, Criteria.EQUALS, agent.getAgentId()); + verify(query).where(Key.VM_ID, Criteria.EQUALS, vm.getId()); + verify(query).sort(Key.TIMESTAMP, SortDirection.DESCENDING); + verify(query).execute(); + verify(query).limit(1); + verifyNoMoreInteractions(query); + + assertSame(status, result); + } + + @Test + public void testSetHarvestingStatus() { + Storage storage = mock(Storage.class); + Add add = mock(Add.class); + when(storage.createAdd(ThreadDaoImpl.THREAD_HARVESTING_STATUS)).thenReturn(add); + + ThreadHarvestingStatus status = mock(ThreadHarvestingStatus.class); + + ThreadDaoImpl dao = new ThreadDaoImpl(storage); + dao.saveHarvestingStatus(status); + + verify(add).setPojo(status); + verify(add).apply(); } }
--- a/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadBackend.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadBackend.java Tue Mar 26 11:17:08 2013 -0400 @@ -36,14 +36,19 @@ package com.redhat.thermostat.thread.harvester; +import java.util.logging.Logger; + import com.redhat.thermostat.agent.VmStatusListener; import com.redhat.thermostat.agent.VmStatusListenerRegistrar; import com.redhat.thermostat.agent.command.ReceiverRegistry; import com.redhat.thermostat.backend.BaseBackend; import com.redhat.thermostat.common.Version; +import com.redhat.thermostat.common.utils.LoggingUtils; public class ThreadBackend extends BaseBackend implements VmStatusListener { + private static final Logger logger = LoggingUtils.getLogger(ThreadBackend.class); + private final ReceiverRegistry registry; private final ThreadHarvester harvester; @@ -70,6 +75,7 @@ } vmListener.register(this); registry.registerReceiver(harvester); + // FIXME enable harvester active = true; return true; } @@ -81,6 +87,7 @@ } vmListener.unregister(this); registry.unregisterReceivers(); + // FIXME disable harvester active = false; return true; } @@ -92,15 +99,19 @@ @Override public void vmStatusChanged(Status newStatus, int pid) { + String vmId = String.valueOf(pid); switch (newStatus) { case VM_STARTED: case VM_ACTIVE: /* this is blocking */ - harvester.saveVmCaps(String.valueOf(pid)); + harvester.saveVmCaps(vmId); + harvester.addThreadHarvestingStatus(vmId); break; case VM_STOPPED: + harvester.stopHarvester(vmId); + harvester.addThreadHarvestingStatus(vmId); + break; default: - /* nothing to do */ - break; + logger.warning("Unexpected VM state: " + newStatus); } }
--- a/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java Tue Mar 26 11:17:08 2013 -0400 @@ -43,12 +43,15 @@ import java.util.concurrent.ScheduledExecutorService; import com.redhat.thermostat.agent.command.RequestReceiver; +import com.redhat.thermostat.common.Clock; +import com.redhat.thermostat.common.SystemClock; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Response; import com.redhat.thermostat.common.command.Response.ResponseType; import com.redhat.thermostat.thread.collector.HarvesterCommand; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; public class ThreadHarvester implements RequestReceiver { @@ -56,10 +59,16 @@ Map<String, Harvester> connectors; private ThreadDao dao; - + private Clock clock; + public ThreadHarvester(ScheduledExecutorService executor) { + this(executor, new SystemClock()); + } + + public ThreadHarvester(ScheduledExecutorService executor, Clock clock) { this.executor = executor; - connectors = new HashMap<>(); + this.connectors = new HashMap<>(); + this.clock = clock; } public void setThreadDao(ThreadDao dao) { @@ -89,12 +98,6 @@ result = stopHarvester(vmId); break; } - case IS_COLLECTING: { - // this is blocking too - String vmId = request.getParameter(HarvesterCommand.VM_ID.name()); - // FIXME: this need to be replaced when we support response parameters - return isCollecting(vmId) ? new Response(ResponseType.OK) : new Response(ResponseType.NOK); - } default: result = false; break; @@ -107,17 +110,13 @@ } } - private boolean isCollecting(String vmId) { - Harvester harvester = connectors.get(vmId); - if (harvester == null) { - return false; + public boolean startHarvester(String vmId) { + Harvester harvester = getHarvester(vmId); + boolean result = harvester.start(); + if (result) { + updateHarvestingStatus(Integer.valueOf(vmId), result); } - return harvester.isConnected(); - } - - private boolean startHarvester(String vmId) { - Harvester harvester = getHarvester(vmId); - return harvester.start(); + return result; } boolean saveVmCaps(String vmId) { @@ -125,14 +124,26 @@ return harvester.saveVmCaps(); } - private boolean stopHarvester(String vmId) { + public boolean stopHarvester(String vmId) { Harvester harvester = connectors.get(vmId); if (harvester != null) { return harvester.stop(); } + updateHarvestingStatus(Integer.valueOf(vmId), false); return true; } - + + public void addThreadHarvestingStatus(String pid) { + updateHarvestingStatus(Integer.valueOf(pid), connectors.containsKey(pid)); + } + + private void updateHarvestingStatus(int vmId, boolean harvesting) { + ThreadHarvestingStatus status = new ThreadHarvestingStatus(); + status.setTimeStamp(clock.getRealTimeMillis()); + status.setVmId(vmId); + status.setHarvesting(harvesting); + dao.saveHarvestingStatus(status); + } Harvester getHarvester(String vmId) { Harvester harvester = connectors.get(vmId); if (harvester == null) {
--- a/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadBackendTest.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadBackendTest.java Tue Mar 26 11:17:08 2013 -0400 @@ -38,6 +38,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +47,7 @@ import org.junit.Test; import com.redhat.thermostat.agent.VmStatusListenerRegistrar; +import com.redhat.thermostat.agent.VmStatusListener.Status; import com.redhat.thermostat.agent.command.ReceiverRegistry; import com.redhat.thermostat.common.Version; @@ -87,4 +89,20 @@ verify(vmListenerRegistrar).unregister(backend); verify(receiverRegistry).unregisterReceivers(); } + + @Test + public void testVmStarts() { + backend.vmStatusChanged(Status.VM_STARTED, 10); + + verify(threadHarvester).saveVmCaps("10"); + verify(threadHarvester).addThreadHarvestingStatus("10"); + } + + @Test + public void testVmStops() { + backend.vmStatusChanged(Status.VM_STOPPED, 10); + + verify(threadHarvester).stopHarvester("10"); + verify(threadHarvester).addThreadHarvestingStatus("10"); + } }
--- a/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java Fri Mar 22 16:43:10 2013 -0400 +++ b/thread/harvester/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java Tue Mar 26 11:17:08 2013 -0400 @@ -48,11 +48,13 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.redhat.thermostat.common.Clock; import com.redhat.thermostat.common.command.Request; import com.redhat.thermostat.common.command.Response; import com.redhat.thermostat.common.command.Response.ResponseType; import com.redhat.thermostat.thread.collector.HarvesterCommand; import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.model.ThreadHarvestingStatus; public class ThreadHarvesterTest { @@ -160,5 +162,26 @@ assertEquals(ResponseType.ERROR, response.getType()); } + + @Test + public void testHarvestingStatus() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + Clock clock = mock(Clock.class); + when(clock.getRealTimeMillis()).thenReturn(1l); + ThreadDao dao = mock(ThreadDao.class); + + ThreadHarvester harvester = new ThreadHarvester(executor, clock); + harvester.setThreadDao(dao); + + harvester.addThreadHarvestingStatus("10"); + + ArgumentCaptor<ThreadHarvestingStatus> statusCaptor = ArgumentCaptor.forClass(ThreadHarvestingStatus.class); + verify(dao).saveHarvestingStatus(statusCaptor.capture()); + + ThreadHarvestingStatus status = statusCaptor.getValue(); + assertEquals(10, status.getVmId()); + assertEquals(false, status.isHarvesting()); + assertEquals(1, status.getTimeStamp()); + } }