changeset 2037:15c2184f097b

Better error checking in Backend code. Reviewed-by: neugens Review-thread: http://icedtea.classpath.org/pipermail/thermostat/2016-December/021795.html PR3257
author Severin Gehwolf <sgehwolf@redhat.com>
date Mon, 05 Dec 2016 12:22:44 +0100
parents b188a7f445f2
children d9578542296b
files agent/core/src/main/java/com/redhat/thermostat/backend/HostPollingBackend.java agent/core/src/main/java/com/redhat/thermostat/backend/PollingBackend.java agent/core/src/main/java/com/redhat/thermostat/backend/VmListenerBackend.java agent/core/src/main/java/com/redhat/thermostat/backend/VmPollingBackend.java agent/core/src/main/java/com/redhat/thermostat/backend/internal/VmListenerWrapper.java agent/core/src/test/java/com/redhat/thermostat/backend/HostPollingBackendTest.java agent/core/src/test/java/com/redhat/thermostat/backend/VmPollingBackendTest.java agent/core/src/test/java/com/redhat/thermostat/backend/internal/VmListenerWrapperTest.java
diffstat 8 files changed, 228 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/agent/core/src/main/java/com/redhat/thermostat/backend/HostPollingBackend.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/main/java/com/redhat/thermostat/backend/HostPollingBackend.java	Mon Dec 05 12:22:44 2016 +0100
@@ -36,12 +36,16 @@
 
 package com.redhat.thermostat.backend;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
 
 import com.redhat.thermostat.common.NotImplementedException;
 import com.redhat.thermostat.common.Version;
+import com.redhat.thermostat.common.utils.LoggingUtils;
 
 /**
  * Convenience {@link Backend} class for implementations that will take some
@@ -51,17 +55,41 @@
  */
 public abstract class HostPollingBackend extends PollingBackend {
 
+    private static final Logger logger = LoggingUtils.getLogger(HostPollingBackend.class);
+    private static final int EXCEPTIONS_THRESHOLD = 10;
     private final Set<HostPollingAction> actions;
+    private final Map<String, Integer> badActions;
 
     public HostPollingBackend(String name, String description,
                               String vendor, Version version, ScheduledExecutorService executor) {
         super(name, description, vendor, version, executor);
         actions = new CopyOnWriteArraySet<>();
+        badActions = new HashMap<>();
     }
 
     final void doScheduledActions() {
         for (HostPollingAction action : actions) {
-            action.run();
+            try {
+                action.run();
+            } catch (Throwable t) {
+                handleActionException(action);
+            }
+        }
+    }
+
+    private synchronized void handleActionException(HostPollingAction action) {
+    	final String actionName = action.getClass().getName();
+    	Integer count = badActions.remove(actionName);
+    	if (count == null) {
+    	    count = Integer.valueOf(1); // Already called once when this happens
+    	}
+    	if (count < EXCEPTIONS_THRESHOLD) {
+    	    logger.info(HostPollingAction.class.getSimpleName() + " " + actionName + " threw an exception.");
+    	    count++;
+    	    badActions.put(actionName, count);
+    	} else {
+    	    logger.fine("Removing " + actionName + " due to too many repeated exceptions.");
+    	    unregisterAction(action);
         }
     }
 
--- a/agent/core/src/main/java/com/redhat/thermostat/backend/PollingBackend.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/main/java/com/redhat/thermostat/backend/PollingBackend.java	Mon Dec 05 12:22:44 2016 +0100
@@ -38,8 +38,11 @@
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import com.redhat.thermostat.common.Version;
+import com.redhat.thermostat.common.utils.LoggingUtils;
 
 /*
  * Convenience {@link Backend} class for implementations that will take action
@@ -49,6 +52,7 @@
  */
 abstract class PollingBackend extends BaseBackend {
 
+    private static final Logger logger = LoggingUtils.getLogger(PollingBackend.class);
     static final long DEFAULT_INTERVAL = 1000; // TODO make this configurable.
 
     private ScheduledExecutorService executor;
@@ -69,7 +73,11 @@
             executor.scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
-                    doScheduledActions();
+                    try {
+                        doScheduledActions();
+                    } catch (Throwable e) {
+                        logger.log(Level.WARNING, "Polling action threw exception");
+                    }
                 }
             }, 0, DEFAULT_INTERVAL, TimeUnit.MILLISECONDS);
 
--- a/agent/core/src/main/java/com/redhat/thermostat/backend/VmListenerBackend.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/main/java/com/redhat/thermostat/backend/VmListenerBackend.java	Mon Dec 05 12:22:44 2016 +0100
@@ -129,8 +129,15 @@
         case VM_ACTIVE:
             if (getObserveNewJvm()) {
                 String wId = writerId.getWriterID();
-                VmUpdateListener listener = createVmListener(wId, vmId, pid);
-                monitor.handleNewVm(listener, pid);
+                VmUpdateListener listener = null;
+                try {
+                    listener = createVmListener(wId, vmId, pid);
+                } catch (Throwable t) {
+                    logger.log(Level.INFO, "Creating the VM listener for a VmListenerBackend threw an exception. Going to ignore the backend!", t);
+                }
+                if (listener != null) {
+                    monitor.handleNewVm(listener, pid);
+                }
             } else {
                 logger.log(Level.FINE, "skipping new vm " + pid);
             }
--- a/agent/core/src/main/java/com/redhat/thermostat/backend/VmPollingBackend.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/main/java/com/redhat/thermostat/backend/VmPollingBackend.java	Mon Dec 05 12:22:44 2016 +0100
@@ -36,6 +36,7 @@
 
 package com.redhat.thermostat.backend;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -47,6 +48,7 @@
 
 import com.redhat.thermostat.agent.VmStatusListener;
 import com.redhat.thermostat.agent.VmStatusListenerRegistrar;
+import com.redhat.thermostat.common.Pair;
 import com.redhat.thermostat.common.Version;
 import com.redhat.thermostat.common.utils.LoggingUtils;
 
@@ -58,9 +60,11 @@
  */
 public abstract class VmPollingBackend extends PollingBackend implements VmStatusListener {
 
+    private final Map<String, Pair<Integer, VmPollingAction>> badActions;
     private final Set<VmPollingAction> actions;
     private final Map<Integer, String> pidsToMonitor = new ConcurrentHashMap<>();
     private final VmStatusListenerRegistrar registrar;
+    private static final int EXCEPTIONS_THRESHOLD = 10;
     private static final Logger logger = LoggingUtils.getLogger(VmPollingBackend.class);
 
     public VmPollingBackend(String name, String description,
@@ -68,7 +72,8 @@
                             VmStatusListenerRegistrar registrar) {
         super(name, description, vendor, version, executor);
         this.registrar = registrar;
-        actions = new CopyOnWriteArraySet<>();
+        this.actions = new CopyOnWriteArraySet<>();
+        this.badActions = new HashMap<>();
     }
 
     @Override
@@ -87,11 +92,34 @@
             int pid = entry.getKey();
             String vmId = entry.getValue();
             for (VmPollingAction action : actions) {
-                action.run(vmId, pid);
+                try {
+                    action.run(vmId, pid);
+                } catch (Throwable t) {
+                    handleActionException(action, vmId);
+                }
             }
         }
     }
 
+    private synchronized void handleActionException(VmPollingAction action, String vmId) {
+        final String actionName = action.getClass().getName();
+        final String actionKey = actionName + vmId;
+        Pair<Integer, VmPollingAction> actionPair = badActions.remove(actionKey);
+        if (actionPair == null) {
+            actionPair = new Pair<>(Integer.valueOf(1), action);
+        }
+        int exceptionsPerAction = actionPair.getFirst();
+        if (exceptionsPerAction < EXCEPTIONS_THRESHOLD) {
+            exceptionsPerAction++;
+            logger.info(VmPollingAction.class.getSimpleName() + " " +
+                    actionName + " threw an exception");
+            badActions.put(actionKey, new Pair<>(exceptionsPerAction, action));
+        } else {
+            logger.fine("Removing " + actionName + " due to too many repeated exceptions.");
+            unregisterAction(action);
+        }
+    }
+
     /**
      * Register an action to be performed at each polling interval.  It is
      * recommended that implementations register all such actions during
--- a/agent/core/src/main/java/com/redhat/thermostat/backend/internal/VmListenerWrapper.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/main/java/com/redhat/thermostat/backend/internal/VmListenerWrapper.java	Mon Dec 05 12:22:44 2016 +0100
@@ -36,6 +36,14 @@
 
 package com.redhat.thermostat.backend.internal;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.redhat.thermostat.backend.VmUpdate;
+import com.redhat.thermostat.backend.VmUpdateException;
+import com.redhat.thermostat.backend.VmUpdateListener;
+import com.redhat.thermostat.common.utils.LoggingUtils;
+
 import sun.jvmstat.monitor.Monitor;
 import sun.jvmstat.monitor.MonitorException;
 import sun.jvmstat.monitor.MonitoredVm;
@@ -43,15 +51,16 @@
 import sun.jvmstat.monitor.event.VmEvent;
 import sun.jvmstat.monitor.event.VmListener;
 
-import com.redhat.thermostat.backend.VmUpdate;
-import com.redhat.thermostat.backend.VmUpdateException;
-import com.redhat.thermostat.backend.VmUpdateListener;
-
 public class VmListenerWrapper implements VmListener {
     
-    private VmUpdateListener listener;
-    private MonitoredVm vm;
-    private VmUpdate update;
+    private static final Logger logger = LoggingUtils.getLogger(VmListenerWrapper.class);
+    // Threshold until this listener gets removed from the JVM in case of it throwing
+    // exceptions on countersUpdated()
+    private static final int EXCEPTION_THRESHOLD = 10;
+    private final VmUpdateListener listener;
+    private final MonitoredVm vm;
+    private final VmUpdate update;
+    private int exceptionCount;
 
     public VmListenerWrapper(VmUpdateListener listener, MonitoredVm vm) {
         this.listener = listener;
@@ -64,7 +73,26 @@
         if (!vm.equals(event.getMonitoredVm())) {
             throw new AssertionError("Received change event for wrong VM");
         }
-        listener.countersUpdated(update);
+        try {
+            listener.countersUpdated(update);
+        } catch (Throwable t) {
+            handleListenerException(t);
+        }
+    }
+    
+    private void handleListenerException(Throwable t) {
+        final String listenerName = listener.getClass().getName();
+        if (exceptionCount < EXCEPTION_THRESHOLD) {
+            logger.log(Level.INFO, "VM listener " + listenerName + " threw an exception", t);
+            exceptionCount++;
+        } else {
+            logger.fine("Removing bad listener " + listenerName + " due to too many repeated exceptions.");
+            try {
+                vm.removeVmListener(this);
+            } catch (MonitorException e) {
+                // ignore remove failures for bad listeners
+            }
+        }
     }
 
     @Override
--- a/agent/core/src/test/java/com/redhat/thermostat/backend/HostPollingBackendTest.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/test/java/com/redhat/thermostat/backend/HostPollingBackendTest.java	Mon Dec 05 12:22:44 2016 +0100
@@ -36,6 +36,7 @@
 
 package com.redhat.thermostat.backend;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -75,6 +76,21 @@
             backend.setObserveNewJvm(true);
         }
     }
+    
+    /**
+     * If an action throws exceptions repeatedly, that action shall get
+     * disabled/unregistered.
+     */
+    @Test
+    public void testDoScheduledActionsWithExceptions() {
+        final int beyondExceptionThreshold = 13; // Anything beyond 10 will do
+        BadHostPollingAction badAction = new BadHostPollingAction();
+        backend.registerAction(badAction);
+        for (int i = 0; i < beyondExceptionThreshold; i++) {
+            backend.doScheduledActions();
+        }
+        assertEquals(10, badAction.callCount);
+    }
 
     @Test
     public void verifySetObserveNewJvmThrowsException() {
@@ -113,5 +129,17 @@
         backend.doScheduledActions();
         verify(action, never()).run();
     }
+    
+    private static class BadHostPollingAction implements HostPollingAction {
+        
+        private int callCount;
+
+        @Override
+        public void run() {
+            callCount++;
+            throw new RuntimeException("HostPollingBackend.doScheduledActions() testing!");
+        }
+        
+    }
 
 }
--- a/agent/core/src/test/java/com/redhat/thermostat/backend/VmPollingBackendTest.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/test/java/com/redhat/thermostat/backend/VmPollingBackendTest.java	Mon Dec 05 12:22:44 2016 +0100
@@ -36,6 +36,7 @@
 
 package com.redhat.thermostat.backend;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -43,6 +44,8 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.junit.Before;
@@ -80,6 +83,37 @@
         }
     }
 
+    /**
+     * If an action throws exceptions repeatedly, that action shall get
+     * disabled/unregistered.
+     */
+    @Test
+    public void testDoScheduledActionsWithExceptions() {
+        final int beyondExceptionThreshold = 13; // anything beyond 10 will do
+        String vmId1 = "test-vm-id1", vmId2 = "test-vm-id2";
+        int pid1 = 123, pid2 = 456;
+        backend.vmStatusChanged(Status.VM_ACTIVE, vmId1, pid1);
+        backend.vmStatusChanged(Status.VM_ACTIVE, vmId2, pid2);
+        BadVmPollingAction badAction = new BadVmPollingAction();
+        backend.registerAction(badAction);
+        for (int i = 0; i < beyondExceptionThreshold; i++) {
+            backend.doScheduledActions();
+        }
+    
+        // The exceptions thrown for one vmID might disable the action
+        // for *all* other vmIDs too. So the call count for one of the
+        // vmIDs is actually 9, whereas the other one must have reached
+        // the threshold count of 10.
+        int callCountVm1 = badAction.getCallCount(vmId1);
+        int callCountVm2 = badAction.getCallCount(vmId2);
+        int minCallCount = Math.min(callCountVm2, callCountVm1);
+        int maxCallCount = Math.max(callCountVm1, callCountVm2);
+        assertEquals("Must not be called beyond exception threshold",
+                10, maxCallCount);
+        assertEquals("Other action's exception cancels globally",
+                9, minCallCount);
+    }
+
     @Test
     public void verifyCustomActivateRegistersListener() {
         backend.preActivate();
@@ -91,7 +125,7 @@
         backend.postDeactivate();
         verify(mockRegistrar).unregister(backend);
     }
-
+    
     @Test
     public void verifyRegisteredActionPerformed() {
         String vmId = "test-vm-id";
@@ -212,4 +246,26 @@
         verify(action).run(eq(vmId1), eq(pid1));
         verify(action, never()).run(eq(vmId2), eq(pid2));
     }
+    
+    private static class BadVmPollingAction implements VmPollingAction {
+        
+        private final Map<String, Integer> callCounts = new HashMap<>();
+        
+        @Override
+        public void run(String vmId, int pid) {
+            Integer currCount = callCounts.remove(vmId);
+            if (currCount == null) {
+                currCount = Integer.valueOf(1);
+            } else {
+                currCount++;
+            }
+            callCounts.put(vmId, Integer.valueOf(currCount));
+            throw new RuntimeException("doScheduledActions() testing!");
+        }
+        
+        private Integer getCallCount(String vmId) {
+            return callCounts.get(vmId);
+        }
+        
+    }
 }
--- a/agent/core/src/test/java/com/redhat/thermostat/backend/internal/VmListenerWrapperTest.java	Thu Nov 24 12:34:53 2016 -0500
+++ b/agent/core/src/test/java/com/redhat/thermostat/backend/internal/VmListenerWrapperTest.java	Mon Dec 05 12:22:44 2016 +0100
@@ -39,6 +39,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -46,14 +47,15 @@
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.redhat.thermostat.backend.VmUpdate;
+import com.redhat.thermostat.backend.VmUpdateException;
+import com.redhat.thermostat.backend.VmUpdateListener;
+
 import sun.jvmstat.monitor.Monitor;
 import sun.jvmstat.monitor.MonitorException;
 import sun.jvmstat.monitor.MonitoredVm;
 import sun.jvmstat.monitor.event.VmEvent;
 
-import com.redhat.thermostat.backend.VmUpdateException;
-import com.redhat.thermostat.backend.VmUpdateListener;
-
 public class VmListenerWrapperTest {
 
     private VmListenerWrapper wrapper;
@@ -66,6 +68,31 @@
         monitoredVm = mock(MonitoredVm.class);
         wrapper = new VmListenerWrapper(listener, monitoredVm);
     }
+    
+    /**
+     * Verify that a bad listener which throws exceptions gets removed
+     * from the JVM beyond a threshold.
+     * @throws MonitorException 
+     */
+    @Test
+    public void testMonitorsUpdatedListenerExceptions() throws MonitorException {
+        final int beyondThresholdLimit = 11;
+        VmUpdateListener badListener = new VmUpdateListener() {
+            @Override
+            public void countersUpdated(VmUpdate update) {
+                throw new RuntimeException("countersUpdated() testing!");
+            }
+        };
+        VmListenerWrapper vmListenerWrapper = new VmListenerWrapper(badListener, monitoredVm);
+        VmEvent event = mock(VmEvent.class);
+        for (int i = 0; i < beyondThresholdLimit; i++) {
+            when(event.getMonitoredVm()).thenReturn(monitoredVm);
+            
+            vmListenerWrapper.monitorsUpdated(event);
+            
+        }
+        verify(monitoredVm, times(1)).removeVmListener(vmListenerWrapper);
+    }
 
     @Test
     public void testMonitorsUpdated() {