Mercurial > hg > release > thermostat-0.4
changeset 570:7d4bdce24e21
Move tread collectors to agent
review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-August/002957.html
reviewed-by: vanaltj
line wrap: on
line diff
--- a/agent/cli/src/main/java/com/redhat/thermostat/agent/cli/AgentApplication.java Wed Aug 29 14:32:51 2012 -0400 +++ b/agent/cli/src/main/java/com/redhat/thermostat/agent/cli/AgentApplication.java Wed Aug 29 21:22:47 2012 +0200 @@ -64,6 +64,7 @@ import com.redhat.thermostat.common.storage.Connection.ConnectionListener; import com.redhat.thermostat.common.storage.Connection.ConnectionStatus; import com.redhat.thermostat.common.storage.MongoStorageProvider; +import com.redhat.thermostat.common.storage.Storage; import com.redhat.thermostat.common.storage.StorageProvider; import com.redhat.thermostat.common.tools.BasicCommand; import com.redhat.thermostat.common.utils.LoggingUtils; @@ -106,7 +107,7 @@ final Logger logger = LoggingUtils.getLogger(AgentApplication.class); StorageProvider connProv = new MongoStorageProvider(configuration); - DAOFactory daoFactory = new MongoDAOFactory(connProv); + final DAOFactory daoFactory = new MongoDAOFactory(connProv); ApplicationContext.getInstance().setDAOFactory(daoFactory); TimerFactory timerFactory = new ThreadPoolTimerFactory(1); ApplicationContext.getInstance().setTimerFactory(timerFactory); @@ -123,7 +124,9 @@ logger.fine("Connecting to storage."); break; case CONNECTED: - logger.fine("Connected to storage."); + logger.fine("Connected to storage, registering storage as service"); + Storage storage = daoFactory.getStorage(); + OSGIUtils.getInstance().registerService(Storage.class, storage); break; case FAILED_TO_CONNECT: logger.warning("Could not connect to storage.");
--- a/agent/command/src/main/java/com/redhat/thermostat/agent/command/ReceiverRegistry.java Wed Aug 29 14:32:51 2012 -0400 +++ b/agent/command/src/main/java/com/redhat/thermostat/agent/command/ReceiverRegistry.java Wed Aug 29 21:22:47 2012 +0200 @@ -60,3 +60,4 @@ proxy.unregisterAll(); } } +
--- a/distribution/config/bundles.properties Wed Aug 29 14:32:51 2012 -0400 +++ b/distribution/config/bundles.properties Wed Aug 29 21:22:47 2012 +0200 @@ -20,7 +20,10 @@ thermostat-thread-client-swing-@project.version@.jar, \ thermostat-thread-client-controllers-@project.version@.jar, \ thermostat-thread-client-common-@project.version@.jar, \ - thermostat-osgi-process-handler-@project.version@.jar + thermostat-osgi-process-handler-@project.version@.jar, \ + thermostat-client-command-@project.version@.jar, \ + thermostat-client-command-@project.version@.jar, \ + thermostat-agent-command-@project.version@.jar service = thermostat-agent-core-@project.version@.jar, \ thermostat-osgi-process-handler-@project.version@.jar, \ @@ -34,7 +37,10 @@ thermostat-common-core-@project.version@.jar, \ thermostat-agent-cli-@project.version@.jar, \ thermostat-common-command-@project.version@.jar, \ - thermostat-agent-command-@project.version@.jar + thermostat-agent-command-@project.version@.jar, \ + thermostat-thread-collector-@project.version@.jar, \ + thermostat-thread-harvester-@project.version@.jar, \ + thermostat-client-command-@project.version@.jar storage = thermostat-agent-core-@project.version@.jar, \ thermostat-osgi-process-handler-@project.version@.jar, \
--- a/distribution/pom.xml Wed Aug 29 14:32:51 2012 -0400 +++ b/distribution/pom.xml Wed Aug 29 21:22:47 2012 +0200 @@ -347,6 +347,11 @@ <artifactId>thermostat-thread-collector</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-thread-harvester</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> </project>
--- a/thread/collector/pom.xml Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/pom.xml Wed Aug 29 21:22:47 2012 +0200 @@ -66,6 +66,16 @@ <artifactId>thermostat-common-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-agent-command</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-client-command</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.osgi</groupId> @@ -100,6 +110,7 @@ <Export-Package> com.redhat.thermostat.thread.collector, com.redhat.thermostat.thread.dao, + com.redhat.thermostat.thread.harvester, </Export-Package> <Private-Package> com.redhat.thermostat.thread.collector.osgi,
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorFactoryImpl.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorFactoryImpl.java Wed Aug 29 21:22:47 2012 +0200 @@ -36,43 +36,21 @@ package com.redhat.thermostat.thread.collector.impl; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; - import com.redhat.thermostat.common.dao.VmRef; import com.redhat.thermostat.thread.collector.ThreadCollector; import com.redhat.thermostat.thread.collector.ThreadCollectorFactory; import com.redhat.thermostat.thread.dao.ThreadDao; -import com.redhat.thermostat.utils.management.MXBeanConnector; public class ThreadCollectorFactoryImpl implements ThreadCollectorFactory { - private Map<VmRef, ThreadMXBeanCollector> collectors; - private ThreadDao threadDao; - private ScheduledExecutorService threadPool; - public ThreadCollectorFactoryImpl(ThreadDao threadDao, ScheduledExecutorService threadPool) { + public ThreadCollectorFactoryImpl(ThreadDao threadDao) { this.threadDao = threadDao; - this.threadPool = threadPool; - - collectors = new HashMap<VmRef, ThreadMXBeanCollector>(); } @Override public synchronized ThreadCollector getCollector(VmRef reference) { - ThreadMXBeanCollector collector = collectors.get(reference); - if (collector == null) { - collector = new ThreadMXBeanCollector(threadDao, reference, new MXBeanConnector(reference), threadPool); - collectors.put(reference, collector); - } - return collector; - } - - public void shutdown() { - for (ThreadMXBeanCollector collector : collectors.values()) { - collector.stopHarvester(); - } + return new ThreadMXBeanCollector(threadDao, reference); } }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadMXBeanCollector.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/impl/ThreadMXBeanCollector.java Wed Aug 29 21:22:47 2012 +0200 @@ -36,98 +36,71 @@ package com.redhat.thermostat.thread.collector.impl; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; +import java.net.InetSocketAddress; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; -import javax.management.MalformedObjectNameException; - +import com.redhat.thermostat.client.command.RequestQueue; +import com.redhat.thermostat.common.command.Request; +import com.redhat.thermostat.common.command.RequestResponseListener; +import com.redhat.thermostat.common.command.Response; +import com.redhat.thermostat.common.command.Request.RequestType; +import com.redhat.thermostat.common.dao.HostRef; import com.redhat.thermostat.common.dao.VmRef; +import com.redhat.thermostat.common.utils.OSGIUtils; import com.redhat.thermostat.thread.collector.ThreadCollector; import com.redhat.thermostat.thread.collector.ThreadSummary; import com.redhat.thermostat.thread.collector.VMThreadCapabilities; import com.redhat.thermostat.thread.dao.ThreadDao; -import com.redhat.thermostat.utils.management.MXBeanConnection; -import com.redhat.thermostat.utils.management.MXBeanConnector; +import com.redhat.thermostat.thread.harvester.HarvesterCommand; +import com.redhat.thermostat.thread.harvester.ThreadHarvester; -@SuppressWarnings("restriction") public class ThreadMXBeanCollector implements ThreadCollector { private ThreadDao threadDao; private VmRef ref; - private MXBeanConnector connector; - private MXBeanConnection connection; - private ThreadMXBean collectorBean; - - private boolean isConnected; - private ScheduledExecutorService threadPool; - - private ScheduledFuture<?> harvester; - - public ThreadMXBeanCollector(ThreadDao threadDao, VmRef ref, MXBeanConnector connector, ScheduledExecutorService threadPool) { + public ThreadMXBeanCollector(ThreadDao threadDao, VmRef ref) { this.threadDao = threadDao; this.ref = ref; - this.connector = connector; - this.threadPool = threadPool; + } + + Request createRequest() { + HostRef targetHostRef = ref.getAgent(); + + // todo + String address = threadDao.getStorage().getConfigListenAddress(targetHostRef); + String [] host = address.split(":"); + + InetSocketAddress target = new InetSocketAddress(host[0], Integer.parseInt(host[1])); + Request harvester = new Request(RequestType.RESPONSE_EXPECTED, target); + + harvester.setReceiver(ThreadHarvester.class.getName()); + + return harvester; + } + + @Override + public void startHarvester() { + + Request harvester = createRequest(); + harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.START.name()); + harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString()); + harvester.setParameter(HarvesterCommand.AGENT_ID.name(), ref.getAgent().getAgentId()); + + RequestQueue queue = getRequestQueue(); + queue.putRequest(harvester); } @Override - public synchronized void startHarvester() { - if (isConnected) return; - - if (!connector.isAttached()) { - try { - connector.attach(); - } catch (Exception ignore) { - ignore.printStackTrace(); - } - } - - try { - connection = connector.connect(); - } catch (Exception e) { - e.printStackTrace(); - } - isConnected = true; + public void stopHarvester() { - harvester = threadPool.scheduleAtFixedRate(new Harvester(), 0, 1, TimeUnit.SECONDS); - } - - private class Harvester implements Runnable { - @Override - public void run() { - harvestData(); - } - } - - @Override - public synchronized void stopHarvester() { - if (!isConnected) return; - - harvester.cancel(false); - - try { - connection.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - if (connector.isAttached()) { - try { - connector.close(); - } catch (Exception ignore) { - ignore.printStackTrace(); - } - } - - isConnected = false; + Request harvester = createRequest(); + harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.STOP.name()); + harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString()); + + RequestQueue queue = getRequestQueue(); + queue.putRequest(harvester); } @Override @@ -160,127 +133,39 @@ public List<com.redhat.thermostat.thread.collector.ThreadInfo> getThreadInfo(long since) { return threadDao.loadThreadInfo(ref, since); } - - public synchronized void harvestData() { - try { - - long timestamp = System.currentTimeMillis(); - - ThreadMXSummary summary = new ThreadMXSummary(); - - collectorBean = getDataCollectorBean(connection); - - summary.setCurrentLiveThreads(collectorBean.getThreadCount()); - summary.setDaemonThreads(collectorBean.getDaemonThreadCount()); - summary.setTimestamp(timestamp); - - threadDao.saveSummary(ref, summary); - - long [] ids = collectorBean.getAllThreadIds(); - long[] allocatedBytes = null; - - // now the details for the threads - if (collectorBean instanceof com.sun.management.ThreadMXBean) { - com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) collectorBean; - boolean wasEnabled = false; - if (sunBean.isThreadAllocatedMemorySupported()) { - wasEnabled = sunBean.isThreadAllocatedMemoryEnabled(); - sunBean.setThreadAllocatedMemoryEnabled(true); - allocatedBytes = sunBean.getThreadAllocatedBytes(ids); - sunBean.setThreadAllocatedMemoryEnabled(wasEnabled); - } - } - ThreadInfo[] threadInfos = collectorBean.getThreadInfo(ids, true, true); - - for (int i = 0; i < ids.length; i++) { - ThreadMXInfo info = new ThreadMXInfo(); - ThreadInfo beanInfo = threadInfos[i]; - - info.setTimeStamp(timestamp); - - info.setName(beanInfo.getThreadName()); - info.setID(beanInfo.getThreadId()); - info.setState(beanInfo.getThreadState()); - info.setStackTrace(beanInfo.getStackTrace()); - - info.setCPUTime(collectorBean.getThreadCpuTime(info.getThreadID())); - info.setUserTime(collectorBean.getThreadUserTime(info.getThreadID())); - - info.setBlockedCount(beanInfo.getBlockedCount()); - info.setWaitedCount(beanInfo.getWaitedCount()); - - if (allocatedBytes != null) { - info.setAllocatedBytes(allocatedBytes[i]); - } - - threadDao.saveThreadInfo(ref, info); - } - - } catch (MalformedObjectNameException e) { - e.printStackTrace(); - } - } - @Override - public synchronized VMThreadCapabilities getVMThreadCapabilities() { + public VMThreadCapabilities getVMThreadCapabilities() { VMThreadCapabilities caps = threadDao.loadCapabilities(ref); if (caps == null) { - if (!connector.isAttached()) { - try { - connector.attach(); - } catch (Exception ignore) { - ignore.printStackTrace(); - } - } + Request harvester = createRequest(); + harvester.setParameter(HarvesterCommand.class.getName(), HarvesterCommand.VM_CAPS.name()); + harvester.setParameter(HarvesterCommand.VM_ID.name(), ref.getIdString()); + harvester.setParameter(HarvesterCommand.AGENT_ID.name(), ref.getAgent().getAgentId()); - // query caps to the vm, then save for later - try (MXBeanConnection connection = connector.connect()) { - - ThreadMXBean bean = getDataCollectorBean(connection); - VMThreadMXCapabilities _caps = new VMThreadMXCapabilities(); - - if (bean.isThreadCpuTimeSupported()) _caps.addFeature(ThreadDao.CPU_TIME); - if (bean.isThreadContentionMonitoringSupported()) _caps.addFeature(ThreadDao.CONTENTION_MONITOR); - - if (bean instanceof com.sun.management.ThreadMXBean) { - com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) bean; - if (sunBean.isThreadAllocatedMemorySupported()) - _caps.addFeature(ThreadDao.THREAD_ALLOCATED_MEMORY); + final CountDownLatch latch = new CountDownLatch(1); + harvester.addListener(new RequestResponseListener() { + @Override + public void fireComplete(Request request, Response response) { + latch.countDown(); } - - caps = _caps; - - threadDao.saveCapabilities(ref, caps); - - } catch (Exception e) { - e.printStackTrace(); - } + }); - if (connector.isAttached()) { - try { - connector.close(); - } catch (Exception ignore) { - ignore.printStackTrace(); - } + RequestQueue queue = getRequestQueue(); + queue.putRequest(harvester); + + try { + latch.await(); + caps = threadDao.loadCapabilities(ref); + } catch (InterruptedException ignore) { + caps = new VMThreadMXCapabilities(); } } - return caps; } - private ThreadMXBean getDataCollectorBean(MXBeanConnection connection) throws MalformedObjectNameException { - ThreadMXBean bean = null; - try { - bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME, - com.sun.management.ThreadMXBean.class); - } catch (MalformedObjectNameException ignore) {} - - if (bean == null) { - bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME, - ThreadMXBean.class); - } - return bean; + RequestQueue getRequestQueue() { + return OSGIUtils.getInstance().getService(RequestQueue.class); } }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/osgi/Activator.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/collector/osgi/Activator.java Wed Aug 29 21:22:47 2012 +0200 @@ -36,9 +36,6 @@ package com.redhat.thermostat.thread.collector.osgi; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; @@ -52,11 +49,11 @@ public class Activator implements BundleActivator { - private ScheduledExecutorService executor = Executors.newScheduledThreadPool(24); private ThreadCollectorFactoryImpl collectorFactory; @Override public void start(final BundleContext context) throws Exception { + @SuppressWarnings({ "rawtypes", "unchecked" }) ServiceTracker tracker = new ServiceTracker(context, Storage.class.getName(), null) { @Override @@ -65,10 +62,11 @@ Storage storage = (Storage) context.getService(reference); ThreadDao threadDao = new ThreadDaoImpl(storage); - collectorFactory = new ThreadCollectorFactoryImpl(threadDao, executor); + collectorFactory = new ThreadCollectorFactoryImpl(threadDao); context.registerService(ThreadCollectorFactory.class.getName(), collectorFactory, null); - + context.registerService(ThreadDao.class.getName(), threadDao, null); + return super.addingService(reference); } }; @@ -77,11 +75,5 @@ @Override public void stop(BundleContext context) throws Exception { - if (collectorFactory != null) { - collectorFactory.shutdown(); - } - if (executor != null) { - executor.shutdown(); - } } }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/ThreadDao.java Wed Aug 29 21:22:47 2012 +0200 @@ -41,6 +41,7 @@ import com.redhat.thermostat.common.dao.VmRef; import com.redhat.thermostat.common.storage.Category; import com.redhat.thermostat.common.storage.Key; +import com.redhat.thermostat.common.storage.Storage; import com.redhat.thermostat.thread.collector.ThreadInfo; import com.redhat.thermostat.thread.collector.ThreadSummary; import com.redhat.thermostat.thread.collector.VMThreadCapabilities; @@ -62,7 +63,7 @@ VMThreadCapabilities loadCapabilities(VmRef ref); - void saveCapabilities(VmRef ref, VMThreadCapabilities caps); + void saveCapabilities(String vmId, String agentId, VMThreadCapabilities caps); static final String LIVE_THREADS = "thread-living"; static final Key<Long> LIVE_THREADS_KEY = new Key<Long>(LIVE_THREADS, false); @@ -74,7 +75,7 @@ Key.TIMESTAMP, LIVE_THREADS_KEY, DAEMON_THREADS_KEY); - void saveSummary(VmRef vm, ThreadSummary summary); + void saveSummary(String vmId, String agentId, ThreadSummary summary); ThreadSummary loadLastestSummary(VmRef ref); List<ThreadSummary> loadSummary(VmRef ref, long since); @@ -104,6 +105,8 @@ THREAD_USER_TIME_KEY, THREAD_BLOCKED_COUNT_KEY, THREAD_WAIT_COUNT_KEY, THREAD_STACK_TRACE_ID_KEY); - void saveThreadInfo(VmRef ref, ThreadInfo info); + void saveThreadInfo(String vmId, String agentId, ThreadInfo info); List<ThreadInfo> loadThreadInfo(VmRef ref, long since); + + Storage getStorage(); }
--- a/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImpl.java Wed Aug 29 21:22:47 2012 +0200 @@ -84,8 +84,8 @@ } @Override - public void saveCapabilities(VmRef vm, VMThreadCapabilities caps) { - Chunk chunk = prepareChunk(THREAD_CAPABILITIES, true, vm); + public void saveCapabilities(String vmId, String agentId, VMThreadCapabilities caps) { + Chunk chunk = prepareChunk(THREAD_CAPABILITIES, true, vmId, agentId); chunk.put(CONTENTION_MONITOR_KEY, caps.supportContentionMonitor()); chunk.put(CPU_TIME_KEY, caps.supportCPUTime()); @@ -95,8 +95,8 @@ } @Override - public void saveSummary(VmRef vm, ThreadSummary summary) { - Chunk chunk = prepareChunk(THREAD_SUMMARY, false, vm); + public void saveSummary(String vmId, String agentId, ThreadSummary summary) { + Chunk chunk = prepareChunk(THREAD_SUMMARY, false, vmId, agentId); chunk.put(LIVE_THREADS_KEY, summary.currentLiveThreads()); chunk.put(DAEMON_THREADS_KEY, summary.currentDaemonThreads()); @@ -145,8 +145,8 @@ } @Override - public void saveThreadInfo(VmRef ref, ThreadInfo info) { - Chunk chunk = prepareChunk(THREAD_INFO, false, ref); + public void saveThreadInfo(String vmId, String agentId, ThreadInfo info) { + Chunk chunk = prepareChunk(THREAD_INFO, false, vmId, agentId); chunk.put(Key.TIMESTAMP, info.getTimeStamp()); @@ -191,10 +191,19 @@ return result; } - private Chunk prepareChunk(Category category, boolean replace, VmRef vm) { + private Chunk prepareChunk(Category category, boolean replace, String vmId, String agentId) { Chunk chunk = new Chunk(category, replace); - chunk.put(Key.AGENT_ID, vm.getAgent().getAgentId()); - chunk.put(Key.VM_ID, vm.getId()); + chunk.put(Key.AGENT_ID, agentId); + chunk.put(Key.VM_ID, Integer.valueOf(vmId)); return chunk; } + + private Chunk prepareChunk(Category category, boolean replace, VmRef vm) { + return prepareChunk(category, replace, vm.getIdString(), vm.getAgent().getAgentId()); + } + + @Override + public Storage getStorage() { + return storage; + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/Harvester.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,253 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.harvester; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.management.MalformedObjectNameException; + +import com.redhat.thermostat.common.dao.VmRef; +import com.redhat.thermostat.thread.collector.impl.ThreadMXInfo; +import com.redhat.thermostat.thread.collector.impl.ThreadMXSummary; +import com.redhat.thermostat.thread.collector.impl.VMThreadMXCapabilities; +import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.utils.management.MXBeanConnection; +import com.redhat.thermostat.utils.management.MXBeanConnector; + +@SuppressWarnings("restriction") +class Harvester { + + private boolean isConnected; + private ScheduledExecutorService threadPool; + private ScheduledFuture<?> harvester; + + MXBeanConnector connector; + + private MXBeanConnection connection; + private ThreadMXBean collectorBean; + private ThreadDao threadDao; + private String vmId; + private String agentId; + + Harvester(ThreadDao threadDao, ScheduledExecutorService threadPool, String vmId, String agentId) { + this.connector = new MXBeanConnector(vmId); + this.threadDao = threadDao; + this.vmId = vmId; + this.agentId = agentId; + this.threadPool = threadPool; + } + + synchronized void start() { + if (isConnected) return; + + if (!connector.isAttached()) { + try { + connector.attach(); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + + try { + connection = connector.connect(); + } catch (Exception e) { + e.printStackTrace(); + } + isConnected = true; + + harvester = threadPool.scheduleAtFixedRate(new HarvesterAction(), 0, 1, TimeUnit.SECONDS); + } + + boolean isConnected() { + return isConnected; + } + + synchronized void stop() { + if (!isConnected) + return; + + harvester.cancel(false); + + try { + connection.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + if (connector.isAttached()) { + try { + connector.close(); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + + isConnected = false; + } + + ThreadMXBean getDataCollectorBean(MXBeanConnection connection) + throws MalformedObjectNameException + { + ThreadMXBean bean = null; + try { + bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME, + com.sun.management.ThreadMXBean.class); + } catch (MalformedObjectNameException ignore) {} + + if (bean == null) { + bean = connection.createProxy(ManagementFactory.THREAD_MXBEAN_NAME, + ThreadMXBean.class); + } + return bean; + } + + synchronized void harvestData() { + try { + long timestamp = System.currentTimeMillis(); + + ThreadMXSummary summary = new ThreadMXSummary(); + + collectorBean = getDataCollectorBean(connection); + + summary.setCurrentLiveThreads(collectorBean.getThreadCount()); + summary.setDaemonThreads(collectorBean.getDaemonThreadCount()); + summary.setTimestamp(timestamp); + + threadDao.saveSummary(vmId, agentId, summary); + + long [] ids = collectorBean.getAllThreadIds(); + long[] allocatedBytes = null; + + // now the details for the threads + if (collectorBean instanceof com.sun.management.ThreadMXBean) { + com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) collectorBean; + boolean wasEnabled = false; + if (sunBean.isThreadAllocatedMemorySupported()) { + wasEnabled = sunBean.isThreadAllocatedMemoryEnabled(); + sunBean.setThreadAllocatedMemoryEnabled(true); + allocatedBytes = sunBean.getThreadAllocatedBytes(ids); + sunBean.setThreadAllocatedMemoryEnabled(wasEnabled); + } + } + + ThreadInfo[] threadInfos = collectorBean.getThreadInfo(ids, true, true); + + for (int i = 0; i < ids.length; i++) { + ThreadMXInfo info = new ThreadMXInfo(); + ThreadInfo beanInfo = threadInfos[i]; + + info.setTimeStamp(timestamp); + + info.setName(beanInfo.getThreadName()); + info.setID(beanInfo.getThreadId()); + info.setState(beanInfo.getThreadState()); + info.setStackTrace(beanInfo.getStackTrace()); + + info.setCPUTime(collectorBean.getThreadCpuTime(info.getThreadID())); + info.setUserTime(collectorBean.getThreadUserTime(info.getThreadID())); + + info.setBlockedCount(beanInfo.getBlockedCount()); + info.setWaitedCount(beanInfo.getWaitedCount()); + + if (allocatedBytes != null) { + info.setAllocatedBytes(allocatedBytes[i]); + } + + threadDao.saveThreadInfo(vmId, agentId, info); + } + + } catch (MalformedObjectNameException e) { + e.printStackTrace(); + } + } + + private class HarvesterAction implements Runnable { + @Override + public void run() { + harvestData(); + } + } + + synchronized void saveVmCaps() { + + boolean closeAfter = false; + if (!connector.isAttached()) { + closeAfter = true; + try { + connector.attach(); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + + try (MXBeanConnection connection = connector.connect()) { + + ThreadMXBean bean = getDataCollectorBean(connection); + VMThreadMXCapabilities caps = new VMThreadMXCapabilities(); + + if (bean.isThreadCpuTimeSupported()) + caps.addFeature(ThreadDao.CPU_TIME); + if (bean.isThreadContentionMonitoringSupported()) + caps.addFeature(ThreadDao.CONTENTION_MONITOR); + + if (bean instanceof com.sun.management.ThreadMXBean) { + com.sun.management.ThreadMXBean sunBean = (com.sun.management.ThreadMXBean) bean; + if (sunBean.isThreadAllocatedMemorySupported()) + caps.addFeature(ThreadDao.THREAD_ALLOCATED_MEMORY); + } + + threadDao.saveCapabilities(vmId, agentId, caps); + + } catch (Exception e) { + e.printStackTrace(); + } + + if (closeAfter) { + try { + connector.close(); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/HarvesterCommand.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,11 @@ +package com.redhat.thermostat.thread.harvester; + +public enum HarvesterCommand { + + START, + STOP, + VM_CAPS, + + AGENT_ID, + VM_ID; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/main/java/com/redhat/thermostat/thread/harvester/ThreadHarvester.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,119 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.harvester; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + +import com.redhat.thermostat.agent.command.RequestReceiver; +import com.redhat.thermostat.common.command.Request; +import com.redhat.thermostat.common.command.Response; +import com.redhat.thermostat.common.command.Response.ResponseType; + +import com.redhat.thermostat.thread.dao.ThreadDao; + +public class ThreadHarvester implements RequestReceiver { + + private ScheduledExecutorService executor; + Map<String, Harvester> connectors; + + private ThreadDao dao; + + public ThreadHarvester(ScheduledExecutorService executor, ThreadDao dao) { + this.executor = executor; + connectors = new HashMap<>(); + this.dao = dao; + } + + @Override + public Response receive(Request request) { + + String command = request.getParameter(HarvesterCommand.class.getName()); + switch (HarvesterCommand.valueOf(command)) { + case START: { + String vmId = request.getParameter(HarvesterCommand.VM_ID.name()); + String agentId = request.getParameter(HarvesterCommand.AGENT_ID.name()); + startHarvester(vmId, agentId); + break; + } + case STOP: { + String vmId = request.getParameter(HarvesterCommand.VM_ID.name()); + stopHarvester(vmId); + break; + } + case VM_CAPS: { + // this is blocking + String vmId = request.getParameter(HarvesterCommand.VM_ID.name()); + String agentId = request.getParameter(HarvesterCommand.AGENT_ID.name()); + saveVmCaps(vmId, agentId); + break; + } + default: + break; + } + + return new Response(ResponseType.OK); + } + + private void startHarvester(String vmId, String agentId) { + Harvester harvester = getHarvester(vmId, agentId); + harvester.start(); + } + + private void saveVmCaps(String vmId, String agentId) { + Harvester harvester = getHarvester(vmId, agentId); + harvester.saveVmCaps(); + } + + private void stopHarvester(String vmId) { + Harvester harvester = connectors.get(vmId); + if (harvester != null) { + harvester.stop(); + } + } + + Harvester getHarvester(String vmId, String agentId) { + Harvester harvester = connectors.get(vmId); + if (harvester == null) { + harvester = new Harvester(dao, executor, vmId, agentId); + connectors.put(vmId, harvester); + } + + return harvester; + } +}
--- a/thread/collector/src/main/java/com/redhat/thermostat/utils/management/MXBeanConnector.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/main/java/com/redhat/thermostat/utils/management/MXBeanConnector.java Wed Aug 29 21:22:47 2012 +0200 @@ -54,20 +54,25 @@ private static final String CONNECTOR_ADDRESS_PROPERTY = "com.sun.management.jmxremote.localConnectorAddress"; private String connectorAddress; - private VmRef reference; private VirtualMachine vm; private boolean attached; + private String reference; + + public MXBeanConnector(String reference) { + this.reference = reference; + } + public MXBeanConnector(VmRef reference) { - this.reference = reference; + this.reference = reference.getStringID(); } public synchronized void attach() throws Exception { if (attached) throw new IOException("Already attached"); - vm = VirtualMachine.attach(reference.getStringID()); + vm = VirtualMachine.attach(reference); attached = true; Properties props = vm.getAgentProperties();
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorFactoryTest.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorFactoryTest.java Wed Aug 29 21:22:47 2012 +0200 @@ -53,20 +53,9 @@ public void testThreadCollectorFactory() { ThreadDao threadDao = mock(ThreadDao.class); VmRef reference = mock(VmRef.class); - - ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class); - - ThreadCollectorFactory factory = new ThreadCollectorFactoryImpl(threadDao, threadPool); + + ThreadCollectorFactory factory = new ThreadCollectorFactoryImpl(threadDao); ThreadCollector collector = factory.getCollector(reference); assertNotNull(collector); - - // ask again, it must be the same instance as before - ThreadCollector collector2 = factory.getCollector(reference); - assertSame(collector, collector2); - - // and now of course it must be different - VmRef reference2 = mock(VmRef.class); - ThreadCollector collector3 = factory.getCollector(reference2); - assertNotSame(collector, collector3); } }
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/ThreadCollectorTest.java Wed Aug 29 14:32:51 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,204 +0,0 @@ -/* - * Copyright 2012 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - -package com.redhat.thermostat.thread.collector; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.util.concurrent.ScheduledExecutorService; - -import javax.management.MalformedObjectNameException; - -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import com.redhat.thermostat.common.dao.VmRef; -import com.redhat.thermostat.thread.collector.impl.ThreadMXBeanCollector; -import com.redhat.thermostat.thread.dao.ThreadDao; -import com.redhat.thermostat.utils.management.MXBeanConnection; -import com.redhat.thermostat.utils.management.MXBeanConnector; - -public class ThreadCollectorTest { - - @Test - public void testVMCapabilitiesNotInDAO() throws Exception { - - VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class); - when(referenceCaps.supportContentionMonitor()).thenReturn(true); - when(referenceCaps.supportCPUTime()).thenReturn(false); - - VmRef reference = mock(VmRef.class); - - ThreadDao threadDao = mock(ThreadDao.class); - when(threadDao.loadCapabilities(reference)).thenReturn(null); - - MXBeanConnector connector = mock(MXBeanConnector.class); - when(connector.isAttached()).thenReturn(false).thenReturn(true); - - MXBeanConnection connection = mock(MXBeanConnection.class); - when(connector.connect()).thenReturn(connection); - - ArgumentCaptor<String> captor1 = ArgumentCaptor.forClass(String.class); - ArgumentCaptor<Class> captor2 = ArgumentCaptor.forClass(Class.class); - - ThreadMXBean bean = mock(ThreadMXBean.class); - when(bean.isThreadCpuTimeSupported()).thenReturn(false); - when(bean.isThreadContentionMonitoringSupported()).thenReturn(true); - - when(connection.createProxy(captor1.capture(), captor2.capture())).thenThrow(new MalformedObjectNameException()).thenReturn(bean); - - ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class); - - /* ************* */ - - ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool); - VMThreadCapabilities caps = collector.getVMThreadCapabilities(); - - String beanName = captor1.getValue(); - assertEquals(ManagementFactory.THREAD_MXBEAN_NAME, beanName); - - Class clazz = captor2.getValue(); - assertEquals(ThreadMXBean.class.getName(), clazz.getName()); - - verify(threadDao).loadCapabilities(reference); - - verify(connector).attach(); - verify(connector).connect(); - verify(connector).close(); - - verify(threadDao).saveCapabilities(reference, caps); - - assertTrue(caps.supportContentionMonitor()); - assertFalse(caps.supportCPUTime()); - assertFalse(caps.supportThreadAllocatedMemory()); - } - - @SuppressWarnings("restriction") - @Test - public void testHasThreadMemorySupport() throws Exception { - - VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class); - when(referenceCaps.supportContentionMonitor()).thenReturn(true); - when(referenceCaps.supportCPUTime()).thenReturn(false); - - VmRef reference = mock(VmRef.class); - - ThreadDao threadDao = mock(ThreadDao.class); - when(threadDao.loadCapabilities(reference)).thenReturn(null); - - MXBeanConnector connector = mock(MXBeanConnector.class); - when(connector.isAttached()).thenReturn(false).thenReturn(true); - - MXBeanConnection connection = mock(MXBeanConnection.class); - when(connector.connect()).thenReturn(connection); - - ArgumentCaptor<String> captor1 = ArgumentCaptor.forClass(String.class); - ArgumentCaptor<Class> captor2 = ArgumentCaptor.forClass(Class.class); - - com.sun.management.ThreadMXBean bean = mock(com.sun.management.ThreadMXBean.class); - when(bean.isThreadCpuTimeSupported()).thenReturn(false); - when(bean.isThreadContentionMonitoringSupported()).thenReturn(true); - when(bean.isThreadAllocatedMemorySupported()).thenReturn(true); - - when(connection.createProxy(captor1.capture(), captor2.capture())).thenReturn(bean); - - ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class); - - /* ************* */ - - ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool); - VMThreadCapabilities caps = collector.getVMThreadCapabilities(); - - String beanName = captor1.getValue(); - assertEquals(ManagementFactory.THREAD_MXBEAN_NAME, beanName); - - Class clazz = captor2.getValue(); - assertEquals(com.sun.management.ThreadMXBean.class.getName(), clazz.getName()); - - verify(threadDao).loadCapabilities(reference); - - verify(connector).attach(); - verify(connector).connect(); - verify(connector).close(); - - verify(threadDao).saveCapabilities(reference, caps); - - assertTrue(caps.supportContentionMonitor()); - assertFalse(caps.supportCPUTime()); - assertTrue(caps.supportThreadAllocatedMemory()); - } - - @Test - public void testVMCapabilitiesInDAO() throws Exception { - - VMThreadCapabilities referenceCaps = mock(VMThreadCapabilities.class); - when(referenceCaps.supportContentionMonitor()).thenReturn(true); - when(referenceCaps.supportCPUTime()).thenReturn(false); - - VmRef reference = mock(VmRef.class); - - ThreadDao threadDao = mock(ThreadDao.class); - when(threadDao.loadCapabilities(reference)).thenReturn(referenceCaps); - - MXBeanConnector connector = mock(MXBeanConnector.class); - - ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class); - - /* ************* */ - - ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference, connector, threadPool); - VMThreadCapabilities caps = collector.getVMThreadCapabilities(); - - verify(threadDao).loadCapabilities(reference); - - verify(connector, times(0)).attach(); - verify(connector, times(0)).connect(); - verify(connector, times(0)).close(); - - verify(threadDao, times(0)).saveCapabilities(reference, caps); - - assertTrue(caps.supportContentionMonitor()); - assertFalse(caps.supportCPUTime()); - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/collector/impl/ThreadCollectorTest.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,212 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.collector.impl; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.redhat.thermostat.client.command.RequestQueue; +import com.redhat.thermostat.common.command.Request; +import com.redhat.thermostat.common.command.RequestResponseListener; +import com.redhat.thermostat.common.dao.HostRef; +import com.redhat.thermostat.common.dao.VmRef; +import com.redhat.thermostat.thread.collector.ThreadCollector; +import com.redhat.thermostat.thread.collector.VMThreadCapabilities; +import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.harvester.HarvesterCommand; + +public class ThreadCollectorTest { + + @Test + public void testVMCapabilitiesNotInDAO() throws Exception { + + HostRef agent = mock(HostRef.class); + when(agent.getAgentId()).thenReturn("42"); + + VmRef reference = mock(VmRef.class); + when(reference.getIdString()).thenReturn("00101010"); + when(reference.getAgent()).thenReturn(agent); + ThreadDao threadDao = mock(ThreadDao.class); + + VMThreadCapabilities resCaps = mock(VMThreadCapabilities.class); + when(threadDao.loadCapabilities(reference)).thenReturn(null).thenReturn(resCaps); + + final Request request = mock(Request.class); + final RequestQueue requestQueue = mock(RequestQueue.class); + + final ArgumentCaptor<RequestResponseListener> captor = ArgumentCaptor.forClass(RequestResponseListener.class); + doNothing().when(request).addListener(captor.capture()); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Request req = (Request) invocation.getArguments()[0]; + assertSame(request, req); + + RequestResponseListener listener = captor.getValue(); + listener.fireComplete(null, null); + + return null; + } + + }).when(requestQueue).putRequest(request); + + /* ************* */ + + ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) { + @Override + Request createRequest() { + return request; + } + @Override + RequestQueue getRequestQueue() { + return requestQueue; + } + }; + + VMThreadCapabilities caps = collector.getVMThreadCapabilities(); + + verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.VM_CAPS.name()); + verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010"); + verify(request).setParameter(HarvesterCommand.AGENT_ID.name(), "42"); + + verify(requestQueue).putRequest(request); + + verify(threadDao, times(2)).loadCapabilities(reference); + assertSame(resCaps, caps); + } + + @Test + public void testVMCapabilitiesInDAO() throws Exception { + + VmRef reference = mock(VmRef.class); + ThreadDao threadDao = mock(ThreadDao.class); + + VMThreadCapabilities resCaps = mock(VMThreadCapabilities.class); + when(threadDao.loadCapabilities(reference)).thenReturn(resCaps); + + ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) { + @Override + Request createRequest() { + fail(); + return null; + } + @Override + RequestQueue getRequestQueue() { + fail(); + return null; + } + }; + + VMThreadCapabilities caps = collector.getVMThreadCapabilities(); + + verify(threadDao, times(1)).loadCapabilities(reference); + assertSame(resCaps, caps); + } + + @Test + public void testStart() { + + HostRef agent = mock(HostRef.class); + when(agent.getAgentId()).thenReturn("42"); + + final Request request = mock(Request.class); + final RequestQueue requestQueue = mock(RequestQueue.class); + ThreadDao threadDao = mock(ThreadDao.class); + VmRef reference = mock(VmRef.class); + when(reference.getIdString()).thenReturn("00101010"); + when(reference.getAgent()).thenReturn(agent); + + ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) { + @Override + Request createRequest() { + return request; + } + @Override + RequestQueue getRequestQueue() { + return requestQueue; + } + }; + collector.startHarvester(); + + verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.START.name()); + verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010"); + verify(request).setParameter(HarvesterCommand.AGENT_ID.name(), "42"); + + verify(requestQueue).putRequest(request); + } + + + @Test + public void testStop() { + + final Request request = mock(Request.class); + final RequestQueue requestQueue = mock(RequestQueue.class); + ThreadDao threadDao = mock(ThreadDao.class); + VmRef reference = mock(VmRef.class); + when(reference.getIdString()).thenReturn("00101010"); + + ThreadCollector collector = new ThreadMXBeanCollector(threadDao, reference) { + @Override + Request createRequest() { + return request; + } + @Override + RequestQueue getRequestQueue() { + return requestQueue; + } + }; + collector.stopHarvester(); + + verify(request).setParameter(HarvesterCommand.class.getName(), HarvesterCommand.STOP.name()); + verify(request).setParameter(HarvesterCommand.VM_ID.name(), "00101010"); + + verify(requestQueue).putRequest(request); + } +}
--- a/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/dao/impl/ThreadDaoImplTest.java Wed Aug 29 21:22:47 2012 +0200 @@ -61,7 +61,9 @@ public void testCreateConnectionKey() { Storage storage = mock(Storage.class); + @SuppressWarnings("unused") ThreadDaoImpl dao = new ThreadDaoImpl(storage); + verify(storage).createConnectionKey(ThreadDao.THREAD_CAPABILITIES); verify(storage).createConnectionKey(ThreadDao.THREAD_INFO); verify(storage).createConnectionKey(ThreadDao.THREAD_SUMMARY); @@ -102,14 +104,6 @@ public void testSaveVMCapabilities() { Storage storage = mock(Storage.class); - VmRef ref = mock(VmRef.class); - when(ref.getId()).thenReturn(42); - - HostRef agent = mock(HostRef.class); - when(agent.getAgentId()).thenReturn("0xcafe"); - - when(ref.getAgent()).thenReturn(agent); - Chunk answer = mock(Chunk.class); when(answer.get(ThreadDao.CONTENTION_MONITOR_KEY)).thenReturn(false); when(answer.get(ThreadDao.CPU_TIME_KEY)).thenReturn(true); @@ -120,7 +114,7 @@ when(caps.supportThreadAllocatedMemory()).thenReturn(true); ThreadDaoImpl dao = new ThreadDaoImpl(storage); - dao.saveCapabilities(ref, caps); + dao.saveCapabilities("42", "0xcafe", caps); ArgumentCaptor<Chunk> queryCaptor = ArgumentCaptor.forClass(Chunk.class); verify(storage).putChunk(queryCaptor.capture());
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/HarvesterTest.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,400 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.harvester; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; + +import static org.junit.Assert.*; + +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.management.MalformedObjectNameException; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.thread.collector.impl.ThreadMXInfo; +import com.redhat.thermostat.thread.collector.impl.ThreadMXSummary; +import com.redhat.thermostat.thread.collector.impl.VMThreadMXCapabilities; +import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.utils.management.MXBeanConnection; +import com.redhat.thermostat.utils.management.MXBeanConnector; + +public class HarvesterTest { + + @Test + public void testStart() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + when(mockedConnector.isAttached()).thenReturn(false); + + ArgumentCaptor<Runnable> arg0 = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<Long> arg2 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<TimeUnit> arg3 = ArgumentCaptor.forClass(TimeUnit.class); + + final boolean [] harvestDataCalled = new boolean[1]; + + when(executor.scheduleAtFixedRate(arg0.capture(), arg1.capture(), arg2.capture(), arg3.capture())).thenReturn(null); + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") { + { connector = mockedConnector; } + @Override + synchronized void harvestData() { + harvestDataCalled[0] = true; + } + }; + + harvester.start(); + + verify(executor).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + assertTrue(arg1.getValue() == 0); + assertTrue(arg2.getValue() == 1); + assertEquals(TimeUnit.SECONDS, arg3.getValue()); + + Runnable action = arg0.getValue(); + assertNotNull(action); + + action.run(); + + assertTrue(harvestDataCalled[0]); + + assertTrue(harvester.isConnected()); + } + + /** + * Mostly the same as testStart, but we call harvester.start() twice + */ + @Test + public void testStartOnce() throws Exception { + + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + when(mockedConnector.isAttached()).thenReturn(false); + + ArgumentCaptor<Runnable> arg0 = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<Long> arg2 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<TimeUnit> arg3 = ArgumentCaptor.forClass(TimeUnit.class); + + final boolean [] harvestDataCalled = new boolean[1]; + + when(executor.scheduleAtFixedRate(arg0.capture(), arg1.capture(), arg2.capture(), arg3.capture())).thenReturn(null); + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") { + { connector = mockedConnector; } + @Override + synchronized void harvestData() { + harvestDataCalled[0] = true; + } + }; + + harvester.start(); + harvester.start(); + + verify(mockedConnector, times(1)).isAttached(); + verify(mockedConnector, times(1)).attach(); + verify(executor, times(1)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + assertTrue(arg1.getValue() == 0); + assertTrue(arg2.getValue() == 1); + assertEquals(TimeUnit.SECONDS, arg3.getValue()); + + Runnable action = arg0.getValue(); + assertNotNull(action); + + action.run(); + + assertTrue(harvestDataCalled[0]); + + assertTrue(harvester.isConnected()); + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testStopAfterStarting() throws Exception { + + ScheduledFuture future = mock(ScheduledFuture.class); + + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + + MXBeanConnection connection = mock(MXBeanConnection.class); + + when(mockedConnector.connect()).thenReturn(connection); + + when(mockedConnector.isAttached()).thenReturn(true); + + when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") + {{ connector = mockedConnector; }}; + + harvester.start(); + + assertTrue(harvester.isConnected()); + + harvester.stop(); + + verify(future).cancel(false); + verify(connection).close(); + + // needs to be 2 times, since is called once in start + verify(mockedConnector, times(2)).isAttached(); + verify(mockedConnector).close(); + + assertFalse(harvester.isConnected()); + } + + /** + * Mostly the same as testStopAfterStarting, but we call harvester.stop() + * twice + */ + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testStopTwiceAfterStarting() throws Exception { + + ScheduledFuture future = mock(ScheduledFuture.class); + + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + + MXBeanConnection connection = mock(MXBeanConnection.class); + + when(mockedConnector.connect()).thenReturn(connection); + + when(mockedConnector.isAttached()).thenReturn(true); + + when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") + {{ connector = mockedConnector; }}; + + harvester.start(); + + assertTrue(harvester.isConnected()); + + harvester.stop(); + harvester.stop(); + + verify(future, times(1)).cancel(false); + verify(connection, times(1)).close(); + + // needs to be 2 times, since is called once in start + verify(mockedConnector, times(2)).isAttached(); + + verify(mockedConnector, times(1)).close(); + + assertFalse(harvester.isConnected()); + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testStopNotStarted() throws Exception { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + + ScheduledFuture future = mock(ScheduledFuture.class); + when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); + + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + MXBeanConnection connection = mock(MXBeanConnection.class); + when(mockedConnector.connect()).thenReturn(connection); + when(mockedConnector.isAttached()).thenReturn(true); + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") + {{ connector = mockedConnector; }}; + + verify(executor, times(0)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + assertFalse(harvester.isConnected()); + + harvester.stop(); + + assertFalse(harvester.isConnected()); + verify(mockedConnector, times(0)).isAttached(); + + verify(future, times(0)).cancel(false); + } + + @Test + public void testHarvestData() { + + long ids[] = new long [] { + 0, 1 + }; + + ThreadInfo info1 = mock(ThreadInfo.class); + when(info1.getThreadName()).thenReturn("fluff1"); + when(info1.getThreadId()).thenReturn(1l); + + ThreadInfo info2 = mock(ThreadInfo.class); + when(info2.getThreadName()).thenReturn("fluff2"); + when(info2.getThreadId()).thenReturn(2l); + + ThreadInfo[] infos = new ThreadInfo[] { + info1, + info2 + }; + + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + ArgumentCaptor<String> vmCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> agentCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<ThreadMXSummary> summaryCapture = ArgumentCaptor.forClass(ThreadMXSummary.class); + + ThreadDao dao = mock(ThreadDao.class); + doNothing().when(dao).saveSummary(vmCapture.capture(), agentCapture.capture(), summaryCapture.capture()); + + ArgumentCaptor<String> vmCapture2 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> agentCapture2 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<ThreadMXInfo> threadInfoCapture = ArgumentCaptor.forClass(ThreadMXInfo.class); + doNothing().when(dao).saveThreadInfo(vmCapture2.capture(), agentCapture2.capture(), threadInfoCapture.capture()); + + final ThreadMXBean collectorBean = mock(ThreadMXBean.class); + + when(collectorBean.getThreadCount()).thenReturn(42); + when(collectorBean.getAllThreadIds()).thenReturn(ids); + when(collectorBean.getThreadInfo(ids, true, true)).thenReturn(infos); + + final boolean [] getDataCollectorBeanCalled = new boolean[1]; + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") { + @Override + ThreadMXBean getDataCollectorBean(MXBeanConnection connection) + throws MalformedObjectNameException { + getDataCollectorBeanCalled[0] = true; + return collectorBean; + } + }; + + harvester.harvestData(); + + assertTrue(getDataCollectorBeanCalled[0]); + + verify(collectorBean).getThreadInfo(ids, true, true); + + verify(dao).saveSummary(anyString(), anyString(), any(ThreadMXSummary.class)); + + // once for each thread info + verify(dao, times(2)).saveThreadInfo(anyString(), anyString(), any(ThreadMXInfo.class)); + + assertEquals(42, summaryCapture.getValue().currentLiveThreads()); + assertEquals("42", vmCapture.getValue()); + assertEquals("0xcafe", agentCapture.getValue()); + + assertEquals(42, summaryCapture.getValue().currentLiveThreads()); + assertEquals("42", vmCapture2.getAllValues().get(0)); + assertEquals("42", vmCapture2.getAllValues().get(1)); + + assertEquals("0xcafe", agentCapture2.getAllValues().get(0)); + assertEquals("0xcafe", agentCapture2.getAllValues().get(1)); + + List<ThreadMXInfo> threadInfos = threadInfoCapture.getAllValues(); + assertEquals(2, threadInfos.size()); + + assertEquals("fluff1", threadInfos.get(0).getName()); + assertEquals("fluff2", threadInfos.get(1).getName()); + + verify(collectorBean, times(1)).getThreadCpuTime(1l); + verify(collectorBean, times(1)).getThreadCpuTime(2l); + } + + @Test + public void testSaveVmCaps() { + + final MXBeanConnector mockedConnector = mock(MXBeanConnector.class); + when(mockedConnector.isAttached()).thenReturn(true); + + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + ArgumentCaptor<String> vmCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> agentCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<VMThreadMXCapabilities> capsCapture = ArgumentCaptor.forClass(VMThreadMXCapabilities.class); + + ThreadDao dao = mock(ThreadDao.class); + doNothing().when(dao).saveCapabilities(vmCapture.capture(), agentCapture.capture(), capsCapture.capture()); + + final ThreadMXBean collectorBean = mock(ThreadMXBean.class); + when(collectorBean.isThreadCpuTimeSupported()).thenReturn(true); + when(collectorBean.isThreadContentionMonitoringSupported()).thenReturn(true); + + final boolean [] getDataCollectorBeanCalled = new boolean[1]; + + Harvester harvester = new Harvester(dao, executor, "42", "0xcafe") { + { connector = mockedConnector; } + @Override + ThreadMXBean getDataCollectorBean(MXBeanConnection connection) + throws MalformedObjectNameException { + getDataCollectorBeanCalled[0] = true; + return collectorBean; + } + }; + + harvester.saveVmCaps(); + assertTrue(getDataCollectorBeanCalled[0]); + + verify(dao, times(1)).saveCapabilities(anyString(), anyString(), any(VMThreadMXCapabilities.class)); + assertEquals("42", vmCapture.getValue()); + assertEquals("0xcafe", agentCapture.getValue()); + + List<String> features = capsCapture.getValue().getSupportedFeaturesList(); + assertEquals(2, features.size()); + assertTrue(features.contains(ThreadDao.CPU_TIME)); + assertTrue(features.contains(ThreadDao.CONTENTION_MONITOR)); + assertFalse(features.contains(ThreadDao.THREAD_ALLOCATED_MEMORY)); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/collector/src/test/java/com/redhat/thermostat/thread/harvester/ThreadHarvesterTest.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,165 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.harvester; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.redhat.thermostat.common.command.Request; +import com.redhat.thermostat.thread.dao.ThreadDao; + +public class ThreadHarvesterTest { + + @Test + public void testStart() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + Request request = mock(Request.class); + + final boolean[] getHarvesterCalled = new boolean[1]; + final Harvester harverster = mock(Harvester.class); + + ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); + + when(request.getParameter(captor.capture())). + thenReturn(HarvesterCommand.START.name()). + thenReturn("42"). + thenReturn("0xcafe"); + + ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) { + @Override + Harvester getHarvester(String vmId, String agentId) { + + getHarvesterCalled[0] = true; + assertEquals("42", vmId); + assertEquals("0xcafe", agentId); + + return harverster; + } + }; + threadHarvester.receive(request); + + List<String> values = captor.getAllValues(); + assertEquals(3, values.size()); + + assertEquals(HarvesterCommand.class.getName(), values.get(0)); + assertEquals(HarvesterCommand.VM_ID.name(), values.get(1)); + assertEquals(HarvesterCommand.AGENT_ID.name(), values.get(2)); + + assertTrue(getHarvesterCalled[0]); + + verify(harverster).start(); + } + + @Test + public void testStop() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + Request request = mock(Request.class); + + final Harvester harverster = mock(Harvester.class); + + ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); + + when(request.getParameter(captor.capture())). + thenReturn(HarvesterCommand.STOP.name()). + thenReturn("42"); + + ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) { + { connectors.put("42", harverster); } + }; + threadHarvester.receive(request); + + List<String> values = captor.getAllValues(); + assertEquals(2, values.size()); + + assertEquals(HarvesterCommand.class.getName(), values.get(0)); + assertEquals(HarvesterCommand.VM_ID.name(), values.get(1)); + + verify(harverster).stop(); + } + + @Test + public void testSaveVmCaps() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + ThreadDao dao = mock(ThreadDao.class); + Request request = mock(Request.class); + + final boolean[] getHarvesterCalled = new boolean[1]; + final Harvester harverster = mock(Harvester.class); + + ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); + + when(request.getParameter(captor.capture())). + thenReturn(HarvesterCommand.VM_CAPS.name()). + thenReturn("42"). + thenReturn("0xcafe"); + + ThreadHarvester threadHarvester = new ThreadHarvester(executor, dao) { + @Override + Harvester getHarvester(String vmId, String agentId) { + + getHarvesterCalled[0] = true; + assertEquals("42", vmId); + assertEquals("0xcafe", agentId); + + return harverster; + } + }; + threadHarvester.receive(request); + + List<String> values = captor.getAllValues(); + assertEquals(3, values.size()); + + assertEquals(HarvesterCommand.class.getName(), values.get(0)); + assertEquals(HarvesterCommand.VM_ID.name(), values.get(1)); + assertEquals(HarvesterCommand.AGENT_ID.name(), values.get(2)); + + assertTrue(getHarvesterCalled[0]); + + verify(harverster).saveVmCaps(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/harvester/pom.xml Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright 2012 Red Hat, Inc. + + This file is part of Thermostat. + + Thermostat is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + Thermostat is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Thermostat; see the file COPYING. If not see + <http://www.gnu.org/licenses />. + + Linking this code with other modules is making a combined work + based on this code. Thus, the terms and conditions of the GNU + General Public License cover the whole combination. + + As a special exception, the copyright holders of this code give + you permission to link this code with independent modules to + produce an executable, regardless of the license terms of these + independent modules, and to copy and distribute the resulting + executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions + of the license of that module. An independent module is a module + which is not derived from or based on this code. If you modify + this code, you may extend this exception to your version of the + library, but you are not obligated to do so. If you do not wish + to do so, delete this exception statement from your version. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-thread</artifactId> + <version>0.4.0-SNAPSHOT</version> + </parent> + + <artifactId>thermostat-thread-harvester</artifactId> + <packaging>bundle</packaging> + + <name>Thermostat Thread Info Harvester</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.redhat.thermostat</groupId> + <artifactId>thermostat-thread-collector</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-Vendor>Red Hat, Inc.</Bundle-Vendor> + <Bundle-Activator>com.redhat.thermostat.thread.harvester.osgi.Activator</Bundle-Activator> + <Bundle-SymbolicName>com.redhat.thermostat.thread.harvester</Bundle-SymbolicName> + <Private-Package> + com.redhat.thermostat.thread.harvester.osgi, + </Private-Package> + <!-- Do not autogenerate uses clauses in Manifests --> + <_nouses>true</_nouses> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + +</project>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thread/harvester/src/main/java/com/redhat/thermostat/thread/harvester/osgi/Activator.java Wed Aug 29 21:22:47 2012 +0200 @@ -0,0 +1,80 @@ +/* + * Copyright 2012 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.thread.harvester.osgi; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; + +import com.redhat.thermostat.agent.command.ReceiverRegistry; +import com.redhat.thermostat.thread.dao.ThreadDao; +import com.redhat.thermostat.thread.harvester.ThreadHarvester; + +public class Activator implements BundleActivator { + + private ScheduledExecutorService executor = Executors.newScheduledThreadPool(24); + + @Override + public void start(final BundleContext context) throws Exception { + @SuppressWarnings({ "rawtypes", "unchecked" }) + ServiceTracker tracker = new ServiceTracker(context, ThreadDao.class.getName(), null) { + @Override + public Object addingService(ServiceReference reference) { + ThreadDao threadDao = (ThreadDao) context.getService(reference); + + ThreadHarvester harvester = new ThreadHarvester(executor, threadDao); + + ReceiverRegistry registry = new ReceiverRegistry(context); + registry.registerReceiver(harvester); + + return super.addingService(reference); + } + }; + tracker.open(); + } + + @Override + public void stop(BundleContext context) throws Exception { + if (executor != null) { + executor.shutdown(); + } + } +}
--- a/thread/pom.xml Wed Aug 29 14:32:51 2012 -0400 +++ b/thread/pom.xml Wed Aug 29 21:22:47 2012 +0200 @@ -60,10 +60,10 @@ <modules> <module>collector</module> + <module>harvester</module> <module>client-common</module> <module>client-swing</module> <module>client-controllers</module> </modules> - </project>