Mercurial > hg > release > thermostat-1.4
changeset 1514:f1eeee8d9f80
Make thread pool size in QueuedStorage configurable.
Reviewed-by: neugens, jkang
Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2014-September/011078.html
author | Severin Gehwolf <sgehwolf@redhat.com> |
---|---|
date | Tue, 30 Sep 2014 11:57:25 +0200 |
parents | b201e8db8a63 |
children | 1744e42fc8b5 |
files | storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java storage/core/src/main/java/com/redhat/thermostat/storage/internal/ThreadPoolSizeRetriever.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java storage/core/src/test/java/com/redhat/thermostat/storage/internal/ThreadPoolSizeRetrieverTest.java |
diffstat | 4 files changed, 244 insertions(+), 137 deletions(-) [+] |
line wrap: on
line diff
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java Thu Oct 09 10:30:44 2014 -0400 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java Tue Sep 30 11:57:25 2014 +0200 @@ -50,6 +50,7 @@ import com.redhat.thermostat.shared.perflog.PerformanceLogFormatter; import com.redhat.thermostat.shared.perflog.PerformanceLogFormatter.LogTag; import com.redhat.thermostat.storage.internal.CountingDecorator; +import com.redhat.thermostat.storage.internal.ThreadPoolSizeRetriever; import com.redhat.thermostat.storage.model.Pojo; import com.redhat.thermostat.storage.query.Expression; @@ -493,15 +494,12 @@ this(delegate, null); } - /* - * NOTE: We intentionally use single-thread executor. All updates are put into - * a queue, from which a single dispatch thread calls the underlying - * storage. Using multiple dispatch threads could cause out-of-order issues, - * e.g. a VM death being reported before its VM start, which could confuse - * the heck out of clients. - */ public QueuedStorage(Storage delegate, PerformanceLogFormatter perfLogFormatter) { - this(delegate, Executors.newFixedThreadPool(250), Executors.newFixedThreadPool(250)); + this(delegate, perfLogFormatter, new ThreadPoolSizeRetriever().getPoolSize()); + } + + QueuedStorage(Storage delegate, PerformanceLogFormatter perflogFormatter, int poolSize) { + this(delegate, Executors.newFixedThreadPool(poolSize), Executors.newFixedThreadPool(poolSize), perflogFormatter); } QueuedStorage(Storage delegate, ExecutorService executor, ExecutorService fileExecutor) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/storage/core/src/main/java/com/redhat/thermostat/storage/internal/ThreadPoolSizeRetriever.java Tue Sep 30 11:57:25 2014 +0200 @@ -0,0 +1,111 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.storage.internal; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.redhat.thermostat.common.utils.LoggingUtils; +import com.redhat.thermostat.shared.config.InvalidConfigurationException; + +public class ThreadPoolSizeRetriever { + + private static final Logger logger = LoggingUtils.getLogger(ThreadPoolSizeRetriever.class); + + /* + * Integer. The size of the desired thread pool size. + */ + static final String THREAD_POOL_SIZE = "com.redhat.thermostat.storage.queue.poolSize"; + + /* + * Boolean. If set to true no bound is enforced. Otherwise an upper bound + * of 100 (DEFAULT_THREAD_POOL_SIZE) is enforced. + */ + static final String THREAD_POOL_SIZE_UNBOUNDED = "com.redhat.thermostat.storage.queue.unbounded"; + + /* + * Default process limit (ulimit -u) on Linux is 1024. Since this includes + * threads (threads are tasks on Linux), consider the following reasoning. + * + * We assume that for a current Linux desktop system one needs approximately + * 400 processes to run. Thus, leaves 624 processes for a user system with + * a limit of 1024. Next, MongoClient uses a default thread pool with a max + * thread limit of 100. Jetty 9 (web-storage-service) uses a default thread + * pool limit of 200. + * + * So if we were to use a thread limit of 250 for queued storage this would + * mean for the web-storage-service case (ignoring other Java threads): + * 250 threads for web-client (agent) + 250 threads for backing storage + * (the webapp) + 200 threads Jetty uses as dispatch threads + 100 threads + * (mongodb creates one thread per connection) equals to 800 threads. + * 800 + 400 = 1200 which is beyond the default user process + * ulimit of 1024. Note that thermostat clients would likely create only + * a few threads, since they mostly do reads (and queries are not queued). + * + * Hence a default of 100 should work well: 100 (agent) + 100 (webapp) + + * 100 (mongodb) + 200 (jetty) = 500. 500 + 400 = 900. 900 < 1024 + */ + static final int DEFAULT_THREAD_POOL_SIZE = 100; + + public int getPoolSize() { + Integer candidate = getPoolSizeFromProperty(); + if (candidate == null) { + logger.log(Level.CONFIG, THREAD_POOL_SIZE + " system property unset." + + " Using default: " + DEFAULT_THREAD_POOL_SIZE); + return DEFAULT_THREAD_POOL_SIZE; + } + if (candidate <= 0) { + throw new InvalidConfigurationException("Value of property " + + THREAD_POOL_SIZE +": " + candidate + " <= 0"); + } + if (isPoolSizeCapped() && candidate > DEFAULT_THREAD_POOL_SIZE) { + throw new InvalidConfigurationException("Value of property " + + THREAD_POOL_SIZE +": " + candidate + " > " + DEFAULT_THREAD_POOL_SIZE + + " and property " + THREAD_POOL_SIZE_UNBOUNDED + " unset or set to false"); + } + logger.log(Level.CONFIG, "Using a thread pool size of " + candidate + " for QueuedStorage"); + return candidate; + } + + private Integer getPoolSizeFromProperty() { + return Integer.getInteger(THREAD_POOL_SIZE); + } + + private boolean isPoolSizeCapped() { + return !Boolean.getBoolean(THREAD_POOL_SIZE_UNBOUNDED); + } +}
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java Thu Oct 09 10:30:44 2014 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,129 +0,0 @@ -/* - * Copyright 2012-2014 Red Hat, Inc. - * - * This file is part of Thermostat. - * - * Thermostat is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation; either version 2, or (at your - * option) any later version. - * - * Thermostat is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Thermostat; see the file COPYING. If not see - * <http://www.gnu.org/licenses/>. - * - * Linking this code with other modules is making a combined work - * based on this code. Thus, the terms and conditions of the GNU - * General Public License cover the whole combination. - * - * As a special exception, the copyright holders of this code give - * you permission to link this code with independent modules to - * produce an executable, regardless of the license terms of these - * independent modules, and to copy and distribute the resulting - * executable under terms of your choice, provided that you also - * meet, for each linked independent module, the terms and conditions - * of the license of that module. An independent module is a module - * which is not derived from or based on this code. If you modify - * this code, you may extend this exception to your version of the - * library, but you are not obligated to do so. If you do not wish - * to do so, delete this exception statement from your version. - */ - - -package com.redhat.thermostat.storage.core; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -public class QueuedStorageExecutorTest { - - private static final int NUM_THREADS = 50; - private static final int TASK_DURATION_MS = 50; - - private QueuedStorage queuedStorage; - - private volatile int activeTasks; - - private CountDownLatch latch; - - private class TestRunnable implements Runnable { - public void run() { - synchronized (QueuedStorageExecutorTest.class) { - activeTasks++; - try { - Thread.sleep(TASK_DURATION_MS); - } catch (InterruptedException e) { - // Get out of here ASAP. - } - activeTasks--; - } - latch.countDown(); - } - } - - @Before - public void setUp() { - Storage mockStorage = mock(Storage.class); - queuedStorage = new QueuedStorage(mockStorage); - activeTasks = 0; - latch = null; - } - - @After - public void tearDown() { - queuedStorage = null; - activeTasks = 0; - latch = null; - } - - @Test - public void testMainExecutor() { - testExecutor(queuedStorage.getExecutor()); - } - - @Test - public void testFileExecutor() { - testExecutor(queuedStorage.getFileExecutor()); - } - - private void testExecutor(final Executor executor) { - latch = new CountDownLatch(NUM_THREADS); - Thread[] threads = new Thread[NUM_THREADS]; - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread() { - public void run() { - executor.execute(new TestRunnable()); - } - }; - } - for (int i = 0; i < NUM_THREADS; i++) { - threads[i].start(); - } - try { - latch.await(); - } catch (InterruptedException e) { - // Get out as soon as possible. - } - assertTrue(activeTasks == 0); - } - - - @Test - public void testShutdown() { - queuedStorage.shutdown(); - assertTrue(queuedStorage.getExecutor().isShutdown()); - assertTrue(queuedStorage.getFileExecutor().isShutdown()); - } -} -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/storage/core/src/test/java/com/redhat/thermostat/storage/internal/ThreadPoolSizeRetrieverTest.java Tue Sep 30 11:57:25 2014 +0200 @@ -0,0 +1,127 @@ +/* + * Copyright 2012-2014 Red Hat, Inc. + * + * This file is part of Thermostat. + * + * Thermostat is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation; either version 2, or (at your + * option) any later version. + * + * Thermostat is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Thermostat; see the file COPYING. If not see + * <http://www.gnu.org/licenses/>. + * + * Linking this code with other modules is making a combined work + * based on this code. Thus, the terms and conditions of the GNU + * General Public License cover the whole combination. + * + * As a special exception, the copyright holders of this code give + * you permission to link this code with independent modules to + * produce an executable, regardless of the license terms of these + * independent modules, and to copy and distribute the resulting + * executable under terms of your choice, provided that you also + * meet, for each linked independent module, the terms and conditions + * of the license of that module. An independent module is a module + * which is not derived from or based on this code. If you modify + * this code, you may extend this exception to your version of the + * library, but you are not obligated to do so. If you do not wish + * to do so, delete this exception statement from your version. + */ + +package com.redhat.thermostat.storage.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.After; +import org.junit.Test; + +import com.redhat.thermostat.shared.config.InvalidConfigurationException; + +public class ThreadPoolSizeRetrieverTest { + + @After + public void tearDown() { + System.clearProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE); + System.clearProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE_UNBOUNDED); + } + + /* + * No properties set. Should return the default pool size. + */ + @Test + public void canGetDefaultPoolSize() { + ThreadPoolSizeRetriever retriever = new ThreadPoolSizeRetriever(); + assertEquals(ThreadPoolSizeRetriever.DEFAULT_THREAD_POOL_SIZE, retriever.getPoolSize()); + } + + /* + * The pool size may be set via a property. However, if the parsed value + * from that property is too large, we fall back to the default pool size. + */ + @Test + public void getPoolSizeWithTooLargeValue() { + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE, "300"); + ThreadPoolSizeRetriever retriever = new ThreadPoolSizeRetriever(); + try { + retriever.getPoolSize(); + fail("Pool size capped at 100"); + } catch (InvalidConfigurationException e) { + // pass + assertEquals("Value of property com.redhat.thermostat.storage.queue.poolSize: " + + "300 > 100 and property com.redhat.thermostat.storage.queue.unbounded unset or set to false", e.getMessage()); + } + } + + @Test + public void testInvalidPoolSize() { + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE, "-2"); + ThreadPoolSizeRetriever retriever = new ThreadPoolSizeRetriever(); + try { + retriever.getPoolSize(); + fail("Pool size invalid: <= 0"); + } catch (InvalidConfigurationException e) { + // pass + assertEquals("Value of property com.redhat.thermostat.storage.queue.poolSize: -2 <= 0", + e.getMessage()); + } + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE, "0"); + try { + retriever.getPoolSize(); + fail("Pool size invalid: <= 0"); + } catch (InvalidConfigurationException e) { + // pass + assertEquals("Value of property com.redhat.thermostat.storage.queue.poolSize: 0 <= 0", + e.getMessage()); + } + } + + /* + * Value in range: > 0 && <= 100 + */ + @Test + public void validCappedValue() { + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE, "59"); + ThreadPoolSizeRetriever retriever = new ThreadPoolSizeRetriever(); + assertEquals(59, retriever.getPoolSize()); + } + + /* + * Out of range but overriden via unbounded property set to true. + */ + @Test + public void validUnboundedValue() { + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE_UNBOUNDED, "true"); + System.setProperty(ThreadPoolSizeRetriever.THREAD_POOL_SIZE, "300"); + ThreadPoolSizeRetriever retriever = new ThreadPoolSizeRetriever(); + assertTrue(retriever.getPoolSize() > ThreadPoolSizeRetriever.DEFAULT_THREAD_POOL_SIZE); + assertEquals(300, retriever.getPoolSize()); + } +}