changeset 875:29f815f1bbed

Cleanly shutdown QueuedStorage. Reviewed-by: neugens Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2012-December/004871.html
author Roman Kennke <rkennke@redhat.com>
date Tue, 18 Dec 2012 19:57:03 +0100
parents d1641e56f1ad
children 948273e69e5b
files launcher/src/main/java/com/redhat/thermostat/launcher/internal/LauncherImpl.java launcher/src/test/java/com/redhat/thermostat/launcher/internal/LauncherTest.java storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java storage/core/src/main/java/com/redhat/thermostat/storage/core/Storage.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java
diffstat 8 files changed, 149 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/launcher/src/main/java/com/redhat/thermostat/launcher/internal/LauncherImpl.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/launcher/src/main/java/com/redhat/thermostat/launcher/internal/LauncherImpl.java	Tue Dec 18 19:57:03 2012 +0100
@@ -44,8 +44,10 @@
 import java.util.logging.Level;
 
 import org.apache.commons.cli.Options;
+import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleException;
+import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.ServiceReference;
 
 import com.redhat.thermostat.bundles.OSGiRegistry;
@@ -71,6 +73,7 @@
 import com.redhat.thermostat.launcher.CommonCommandOptions;
 import com.redhat.thermostat.launcher.Launcher;
 import com.redhat.thermostat.storage.core.ConnectionException;
+import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.storage.core.StorageException;
 import com.redhat.thermostat.utils.keyring.Keyring;
 
@@ -110,6 +113,7 @@
 
     @Override
     public synchronized void run(Collection<ActionListener<ApplicationState>> listeners) {
+
         usageCount++;
         waitForArgs();
 
@@ -157,6 +161,20 @@
             appSvc.getApplicationExecutor().shutdown();
             appSvc.getTimerFactory().shutdown();
 
+            Bundle bundle = FrameworkUtil.getBundle(LauncherImpl.class);
+            if (bundle != null) {
+                BundleContext ctx = bundle.getBundleContext();
+                if (ctx != null) {
+                    ServiceReference storageRef = ctx.getServiceReference(Storage.class);
+                    if (storageRef != null) {
+                        Storage storage = (Storage) ctx.getService(storageRef);
+                        if (storage != null) {
+                            storage.shutdown();
+                        }
+                    }
+                }
+            }
+
             context.getBundle(0).stop();
         } catch (BundleException e) {
             throw (InternalError) new InternalError().initCause(e);
--- a/launcher/src/test/java/com/redhat/thermostat/launcher/internal/LauncherTest.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/launcher/src/test/java/com/redhat/thermostat/launcher/internal/LauncherTest.java	Tue Dec 18 19:57:03 2012 +0100
@@ -94,6 +94,7 @@
 import com.redhat.thermostat.common.tools.BasicCommand;
 import com.redhat.thermostat.common.utils.OSGIUtils;
 import com.redhat.thermostat.launcher.internal.LauncherImpl.LoggingInitializer;
+import com.redhat.thermostat.storage.core.Storage;
 import com.redhat.thermostat.test.StubBundleContext;
 import com.redhat.thermostat.test.TestCommandContextFactory;
 import com.redhat.thermostat.test.TestTimerFactory;
@@ -150,6 +151,7 @@
     private ActionNotifier<ApplicationState> notifier;
 
     private LauncherImpl launcher;
+    private Storage storage;
 
     @Before
     public void setUp() {
@@ -235,6 +237,15 @@
         when(bCtx.getService(infosRef)).thenReturn(infos);
         when(FrameworkUtil.getBundle(isA(HelpCommand.class.getClass()))).thenReturn(bundle);
 
+        storage = mock(Storage.class);
+        ServiceReference storageRef = mock(ServiceReference.class);
+        Bundle launcherBundle = mock(Bundle.class);
+        BundleContext launcherBundleCtx = mock(BundleContext.class);
+        when(launcherBundleCtx.getServiceReference(Storage.class)).thenReturn(storageRef);
+        when(launcherBundleCtx.getService(storageRef)).thenReturn(storage);
+        when(launcherBundle.getBundleContext()).thenReturn(launcherBundleCtx);
+        when(FrameworkUtil.getBundle(LauncherImpl.class)).thenReturn(launcherBundle);
+
         timerFactory = new TestTimerFactory();
         ExecutorService exec = mock(ExecutorService.class);
         ApplicationService appSvc = mock(ApplicationService.class);
@@ -451,7 +462,6 @@
         
         PowerMockito.mockStatic(FrameworkUtil.class);
         when(FrameworkUtil.getBundle(Version.class)).thenReturn(sysBundle);
-        
         launcher.setArgs(new String[] {Version.VERSION_OPTION});
         launcher.run();
 
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/QueuedStorage.java	Tue Dec 18 19:57:03 2012 +0100
@@ -39,7 +39,7 @@
 
 import java.io.InputStream;
 import java.util.UUID;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import com.redhat.thermostat.storage.model.AgentIdPojo;
@@ -48,8 +48,8 @@
 public final class QueuedStorage implements Storage {
 
     private Storage delegate;
-    private Executor executor;
-    private Executor fileExecutor;
+    private ExecutorService executor;
+    private ExecutorService fileExecutor;
 
     /*
      * NOTE: We intentially use single-thread executor. All updates are put into a queue, from which
@@ -64,17 +64,17 @@
     /*
      * This is here solely for use by tests.
      */
-    QueuedStorage(Storage delegate, Executor executor, Executor fileExecutor) {
+    QueuedStorage(Storage delegate, ExecutorService executor, ExecutorService fileExecutor) {
         this.delegate = delegate;
         this.executor = executor;
         this.fileExecutor = fileExecutor;
     }
 
-    Executor getExecutor() {
+    ExecutorService getExecutor() {
         return executor;
     }
 
-    Executor getFileExecutor() {
+    ExecutorService getFileExecutor() {
         return fileExecutor;
     }
 
@@ -221,4 +221,10 @@
         return delegate.getConnection();
     }
 
+    @Override
+    public void shutdown() {
+        executor.shutdown();
+        fileExecutor.shutdown();
+    }
+
 }
--- a/storage/core/src/main/java/com/redhat/thermostat/storage/core/Storage.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/storage/core/src/main/java/com/redhat/thermostat/storage/core/Storage.java	Tue Dec 18 19:57:03 2012 +0100
@@ -83,4 +83,7 @@
     Query createQuery();
     Update createUpdate();
     Remove createRemove();
+
+    void shutdown();
+
 }
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageExecutorTest.java	Tue Dec 18 19:57:03 2012 +0100
@@ -136,4 +136,10 @@
     }
 
 
+    @Test
+    public void testShutdown() {
+        queuedStorage.shutdown();
+        assertTrue(queuedStorage.getExecutor().isShutdown());
+        assertTrue(queuedStorage.getFileExecutor().isShutdown());
+    }
 }
--- a/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/storage/core/src/test/java/com/redhat/thermostat/storage/core/QueuedStorageTest.java	Tue Dec 18 19:57:03 2012 +0100
@@ -53,8 +53,15 @@
 import static org.mockito.Mockito.when;
 
 import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.junit.After;
 import org.junit.Before;
@@ -66,9 +73,10 @@
 
 public class QueuedStorageTest {
 
-    private static class TestExecutor implements Executor {
+    private static class TestExecutor implements ExecutorService {
 
         private Runnable task;
+        private boolean shutdown;
 
         @Override
         public void execute(Runnable task) {
@@ -78,6 +86,85 @@
         Runnable getTask() {
             return task;
         }
+
+        @Override
+        public void shutdown() {
+            shutdown = true;
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            // Not used.
+            shutdown = true;
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return shutdown;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            // Not used.
+            return shutdown;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit)
+                throws InterruptedException {
+            // Not used.
+            return true;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks)
+                throws InterruptedException {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout,
+                TimeUnit unit) throws InterruptedException {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException, ExecutionException {
+            // Not used.
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+                long timeout, TimeUnit unit) throws InterruptedException,
+                ExecutionException, TimeoutException {
+            // Not used.
+            return null;
+        }
     }
 
     private static class TestPojo implements Pojo {
--- a/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/storage/mongo/src/main/java/com/redhat/thermostat/storage/mongodb/internal/MongoStorage.java	Tue Dec 18 19:57:03 2012 +0100
@@ -312,4 +312,9 @@
         }
     }
 
+    @Override
+    public void shutdown() {
+        // Nothing to do here.
+    }
+
 }
--- a/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Tue Dec 18 19:25:39 2012 +0100
+++ b/web/client/src/main/java/com/redhat/thermostat/web/client/internal/WebStorage.java	Tue Dec 18 19:57:03 2012 +0100
@@ -594,4 +594,9 @@
         }
     }
 
+    @Override
+    public void shutdown() {
+        // Nothing to do here.
+    }
+
 }