changeset 2201:85e376bde322

Liveconnect message processing design changes. - Redesigned message consumption/processing model to allow arbitrary number of applets to load without increasing MAX_WORKERS count. - Implemented priority queuing to allow blocking threads to get responses first. - Made message consumption non-blocking and fixed thread-safety issues therein 2010-03-13 Deepak Bhole <dbhole@redhat.com> * plugin/icedteanp/java/sun/applet/PluginAppletViewer.java (requestPluginCookieInfo): Register cookie info as a priority message. (requestPluginProxyInfo): Register proxy info as a priority message. * plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java: Re-designed message consumption to implement priority queuing and parallel initialization limits so that an arbitrary number of applets can load without blocking one another, all in a thread-safe manner. (registerPriorityWait): New method. Registers a string that is considered a "priority" string, which gets delegated to dedicated worker threads. (unRegisterPriorityWait): Unregisters a string so that it is no longer a priority. (PluginMessageConsumer): Do not create any workers when starting. (consume): Remove method. Consumption is now done in a separate dedicated thread to prevent blocking. (getPriorityStrIfPriority): New method. If the given message is priority, return why (i.e the 'priority string'it matched). (isInInit): New method. Returns if the plugin is currently initializing an applet. (addToInitWorkers): New method. Adds given worker to list of workers currently initializting applets. (okayToProcess): New method. Returns whether or not it is okay to process the given applet. (notifyWorkerIsFree): New method. Notifies this class that a worker has just become free. (queue): Queues the given message for consumption. (ConsumerThread): New protected inner (thread) class. Responsible for consuming messages in the queue, and regueuing them if consumption if not possible. (getFreeWorker): Changed to be non-blocking, and return either a priority worker or a normal worker depending on what is requested. * plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java (PluginMessageHandlerWorker): Set new priority and consumer variables. (busy): Make thread-safe by waiting on same property that free() waits on. (free): Make thread-safe by waiting on same property that busy() waits on. (isPriority): New method. Returns of worker is a priority worker. (isFree): Made thread-safe, and accounts for priority. * plugin/icedteanp/java/sun/applet/PluginStreamHandler.java (startProcessing): Call consumer.queue() instead of consumer.consume(). (postMessage): Remove unused method.
author Deepak Bhole <dbhole@redhat.com>
date Mon, 15 Mar 2010 13:35:18 -0400
parents 53d08a835f60
children 8a1a5c294219
files ChangeLog plugin/icedteanp/java/sun/applet/PluginAppletViewer.java plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java plugin/icedteanp/java/sun/applet/PluginStreamHandler.java
diffstat 5 files changed, 298 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/ChangeLog	Tue Jun 01 18:04:46 2010 +0100
+++ b/ChangeLog	Mon Mar 15 13:35:18 2010 -0400
@@ -1,3 +1,46 @@
+2010-03-13  Deepak Bhole <dbhole@redhat.com>
+
+	* plugin/icedteanp/java/sun/applet/PluginAppletViewer.java
+	(requestPluginCookieInfo): Register cookie info as a priority message.
+	(requestPluginProxyInfo): Register proxy info as a priority message.
+	* plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java: Re-designed
+	message consumption to implement priority queuing and parallel
+	initialization limits so that an arbitrary number of applets can load
+	without blocking one another, all in a thread-safe manner.
+	(registerPriorityWait): New method. Registers a string that is
+	considered a "priority" string, which gets delegated to dedicated worker
+	threads.
+	(unRegisterPriorityWait): Unregisters a string so that it is no longer
+	a priority.
+	(PluginMessageConsumer): Do not create any workers when starting.
+	(consume): Remove method. Consumption is now done in a separate dedicated
+	thread to prevent blocking.
+	(getPriorityStrIfPriority):	New method. If the given message is priority,
+	return why (i.e the 'priority string'it matched).
+	(isInInit): New method. Returns if the plugin is currently initializing an
+	applet.
+	(addToInitWorkers): New method. Adds given worker to list of workers
+	currently initializting applets.
+	(okayToProcess): New method. Returns whether or not it is okay to process
+	the given applet.
+	(notifyWorkerIsFree): New method. Notifies this class that a worker has
+	just become free.
+	(queue): Queues the given message for consumption.
+	(ConsumerThread): New protected inner (thread) class. Responsible for
+	consuming messages in the queue, and regueuing them if consumption if not
+	possible.
+	(getFreeWorker): Changed to be non-blocking, and return either a priority
+	worker or a normal worker depending on what is requested.
+	* plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java
+	(PluginMessageHandlerWorker): Set new priority and consumer variables.
+	(busy): Make thread-safe by waiting on same property that free() waits on.
+	(free): Make thread-safe by waiting on same property that busy() waits on.
+	(isPriority): New method. Returns of worker is a priority worker.
+	(isFree): Made thread-safe, and accounts for priority.
+	* plugin/icedteanp/java/sun/applet/PluginStreamHandler.java
+	(startProcessing): Call consumer.queue() instead of consumer.consume().
+	(postMessage): Remove unused method.
+
 2010-03-12  Andrew John Hughes  <ahughes@redhat.com>
 
 	* Makefile.am:
--- a/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java	Tue Jun 01 18:04:46 2010 +0100
+++ b/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java	Mon Mar 15 13:35:18 2010 -0400
@@ -1278,6 +1278,7 @@
              return null;
          }
 
+         PluginMessageConsumer.registerPriorityWait(reference);
          streamhandler.postCallRequest(request);
          streamhandler.write(request.getMessage());
          try {
@@ -1323,6 +1324,7 @@
              "plugin PluginProxyInfo reference " + reference + " " +
              requestURI, reference);
 
+         PluginMessageConsumer.registerPriorityWait(reference);
          streamhandler.postCallRequest(request);
          streamhandler.write(request.getMessage());
          try {
--- a/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java	Tue Jun 01 18:04:46 2010 +0100
+++ b/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java	Mon Mar 15 13:35:18 2010 -0400
@@ -38,82 +38,257 @@
 package sun.applet;
 
 import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
 import java.util.LinkedList;
-
-import sun.applet.AppletSecurity;
+import java.util.Set;
 
 class PluginMessageConsumer {
 
-	int MAX_WORKERS = 20;
+	private static int MAX_PARALLEL_INITS = 1;
+
+	// Each initialization requires 5 responses (tag, handle, width, proxy, cookie) 
+	// before the message stack unlocks/collapses. This works out well because we 
+	// want to allow upto 5 parallel tasks anyway
+	private static int MAX_WORKERS = MAX_PARALLEL_INITS*5;
+	private static int PRIORITY_WORKERS = MAX_PARALLEL_INITS*2;
+
+	private static Hashtable<Integer, PluginMessageHandlerWorker> initWorkers = new Hashtable<Integer, PluginMessageHandlerWorker>(2);
+
 	LinkedList<String> readQueue = new LinkedList<String>();
+	private static LinkedList<String> priorityWaitQueue = new LinkedList<String>();
 	ArrayList<PluginMessageHandlerWorker> workers = new ArrayList<PluginMessageHandlerWorker>();
 	PluginStreamHandler streamHandler = null;
 	AppletSecurity as;
+	ConsumerThread consumerThread = new ConsumerThread();
 
+	/** 
+	 * Registers a reference to wait for. Responses to registered priority 
+	 * references get handled by priority worker if normal workers are busy.
+	 *
+	 * @param reference The reference to give priority to
+	 */
+	public static void registerPriorityWait(Long reference) {
+	    PluginDebug.debug("Registering priority for reference " + reference);
+	    registerPriorityWait("reference " + reference.toString());
+	}
+
+	/** 
+     * Registers a string to wait for.
+     *
+     * @param searchString the string to look for in a response
+     */
+    public static void registerPriorityWait(String searchString) {
+        PluginDebug.debug("Registering priority for string " + searchString);
+        synchronized (priorityWaitQueue) {
+            if (!priorityWaitQueue.contains(searchString))
+                priorityWaitQueue.add(searchString);
+        }
+    }
+
+	/** 
+     * Unregisters a priority reference to wait for.
+     *
+     * @param reference The reference to remove
+     */
+    public static void unRegisterPriorityWait(Long reference) {
+        unRegisterPriorityWait(reference.toString());
+    }
+
+    /** 
+     * Unregisters a priority string to wait for.
+     *
+     * @param searchString The string to unregister from the priority list
+     */
+    public static void unRegisterPriorityWait(String searchString) {
+        synchronized (priorityWaitQueue) {
+            priorityWaitQueue.remove(searchString);
+        }
+    }
+
+    /**
+     * Returns the reference for this message. This method assumes that 
+     * the message has a reference number.
+     * 
+     * @param The message
+     * @return the reference number
+     */
+    private Long getReference(String[] msgParts) {
+        return Long.parseLong(msgParts[3]);
+    }
+    
 	public PluginMessageConsumer(PluginStreamHandler streamHandler) {
 		
 		as = new AppletSecurity();
 		this.streamHandler = streamHandler;
+		this.consumerThread.start();
+	}
 
-		// create some workers at the start...
-		for (int i=0; i < 3; i++) {
-			PluginDebug.debug("Creating worker " + i);
-			PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, i, as);
-			worker.start();
-			workers.add(worker);
-		}
+	private String getPriorityStrIfPriority(String message) {
+
+	    synchronized (priorityWaitQueue) {
+	        Iterator<String> it = priorityWaitQueue.iterator();
+
+	        while (it.hasNext()) {
+	            String priorityStr = it.next();
+	            if (message.indexOf(priorityStr) > 0)
+	                return priorityStr;
+	        }
+	    }
+
+	    return null;
+	}
+
+	private boolean isInInit(Integer instanceNum) {
+	    return initWorkers.containsKey(instanceNum);
+	}
+
+	private void addToInitWorkers(Integer instanceNum, PluginMessageHandlerWorker worker) {
+        synchronized(initWorkers) {
+            initWorkers.put(instanceNum, worker);
+        }
+	}
+
+	private boolean okayToProcess(String[] msgParts) {
+
+	    if (msgParts[2].equals("tag")) {
+
+	        Integer instanceNum = new Integer(msgParts[1]);
+
+	        synchronized(initWorkers) {
+	            if (initWorkers.size() >= MAX_PARALLEL_INITS) {
+	                return false;
+	            }
+	        }
+	        
+	        registerPriorityWait("instance " + instanceNum + " handle");
+	        registerPriorityWait("instance " + instanceNum + " width");
+
+	    } else if (msgParts[2].equals("handle") || msgParts[2].equals("width")) {
+	            Integer instanceNum = new Integer(msgParts[1]);
+
+	            // If this instance is not in init, return false immediately. 
+	            // Handle/Width messages should NEVER go before tag messages
+	            if (!isInInit(instanceNum))
+	                return false;
+	    }
+
+	    return true;
+	}
+
+	public void notifyWorkerIsFree(PluginMessageHandlerWorker worker) {
+	    synchronized (initWorkers) {
+	        Iterator<Integer> i = initWorkers.keySet().iterator();
+            while (i.hasNext()) {
+                Integer key = i.next();
+                if (initWorkers.get(key).equals(worker))
+                    initWorkers.remove(key);
+            }
+	    }
+	    
+	    consumerThread.interrupt();
 	}
 
-	public void consume(String message) {
-		
-		PluginDebug.debug("Consumer received message " + message);
-		
-		synchronized(readQueue) {
-			readQueue.add(message);
-		}
-
-		PluginDebug.debug("Message " + message + " added to queue. Looking for free worker...");
-		final PluginMessageHandlerWorker worker = getFreeWorker();
-
-		synchronized(readQueue) {
-			if (readQueue.size() > 0) {
-				worker.setmessage(readQueue.poll());
-			}
-		}
-
-		worker.interrupt();
+	public void queue(String message) {
+	    synchronized(readQueue) {
+	        readQueue.addLast(message);
+	    }
+	    
+	    // Wake that lazy consumer thread
+	    consumerThread.interrupt();
 	}
 
-	private PluginMessageHandlerWorker getFreeWorker() {
-		
-		// FIXME: Can be made more efficient by having an idle worker pool
+	protected class ConsumerThread extends Thread { 
+	    public void run() {
+
+	        while (true) {
+
+                String message = null;
+
+	            synchronized(readQueue) {
+	                message = readQueue.poll();
+	            }
+
+	            if (message != null) {
+
+	                String[] msgParts = message.split(" ");
+
+	                // if it is no okay to process just yet, push it back and 
+	                if (!okayToProcess(msgParts)) {
+	                    synchronized(readQueue) {
+	                        readQueue.addLast(message);
+	                    }
+	                    
+	                    continue; // re-loop to try next msg
+	                }
+
+	                String priorityStr = getPriorityStrIfPriority(message);
+	                boolean isPriorityResponse = (priorityStr != null);
 		
-		while (true) {
+	                //PluginDebug.debug("Message " + message + " (priority=" + isPriorityResponse + ") ready to be processed. Looking for free worker...");
+	                final PluginMessageHandlerWorker worker = getFreeWorker(isPriorityResponse);
+	                
+	                if (worker == null) {
+	                    synchronized(readQueue) {
+                            readQueue.addLast(message);
+                        }
+
+	                    continue; // re-loop to try next msg
+	                }
+
+	                if (msgParts[2].equals("tag"))
+	                    addToInitWorkers((new Integer(msgParts[1])), worker);
+
+	                if (isPriorityResponse) {
+	                    unRegisterPriorityWait(priorityStr);
+	                }
+
+                    worker.setmessage(message);
+	                worker.interrupt();
+
+	            } else {
+	                try {
+	                    Thread.sleep(1000);
+	                } catch (InterruptedException ie) {}
+	            }
+	        }
+	    }
+	}
+
+	private PluginMessageHandlerWorker getFreeWorker(boolean prioritized) {
+
 			for (PluginMessageHandlerWorker worker: workers) {
-				if (worker.isFree()) {
-					PluginDebug.debug("Found free worker with id " + worker.getWorkerId());
+				if (worker.isFree(prioritized)) {
+					PluginDebug.debug("Found free worker (" + worker.isPriority() + ") with id " + worker.getWorkerId());
 					// mark it busy before returning
 					worker.busy();
 					return worker;
 				}
 			}
-			
+
 			// If we have less than MAX_WORKERS, create a new worker
 			if (workers.size() <= MAX_WORKERS) {
-			    PluginDebug.debug("Cannot find free worker, creating worker " + workers.size());
-			    PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, workers.size(), as);
-			    worker.start();
-			    workers.add(worker);
-			    worker.busy();
-			    return worker;
-			} else {
-			    // else wait
+			    PluginMessageHandlerWorker worker = null;
+			    
+			    if (workers.size() <= (MAX_WORKERS - PRIORITY_WORKERS)) {
+			        PluginDebug.debug("Cannot find free worker, creating worker " + workers.size());
+			        worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, false);
+			    } else if (prioritized) {
+			        PluginDebug.debug("Cannot find free worker, creating priority worker " + workers.size());
+			        worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, true);
+			    } else {
+			        return null;
+			    }
+
+		        worker.start();
+		        worker.busy();
+		        workers.add(worker);
+		        return worker;
+
 			}
-
-			Thread.yield();
-		}
-
-		//throw new RuntimeException("Out of message handler workers");
+			
+			// No workers available. Better luck next time! 
+			return null;
 	}
 	
 }
--- a/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java	Tue Jun 01 18:04:46 2010 +0100
+++ b/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java	Mon Mar 15 13:35:18 2010 -0400
@@ -42,15 +42,25 @@
 class PluginMessageHandlerWorker extends Thread {
 
 	private boolean free = true;
+	private boolean isPriorityWorker = false;
 	private int id;
 	private String message = null;
 	private SecurityManager sm;
 	PluginStreamHandler streamHandler = null;
+	PluginMessageConsumer consumer = null;
 
-	public PluginMessageHandlerWorker(PluginStreamHandler streamHandler, int id, SecurityManager sm) {
+	public PluginMessageHandlerWorker(
+	            PluginMessageConsumer consumer, 
+	            PluginStreamHandler streamHandler, int id, 
+	            SecurityManager sm, boolean isPriorityWorker) {
+
 		this.id = id;
 		this.streamHandler = streamHandler;
 		this.sm = sm;
+		this.isPriorityWorker = isPriorityWorker;
+		this.consumer = consumer;
+		
+		PluginDebug.debug("Worker " + this.id + " (priority=" + isPriorityWorker + ") created."); 
 	}
 
 	public void setmessage(String message) {
@@ -107,15 +117,27 @@
 	}
 
 	public void busy() {
-		this.free = false;
+	    synchronized (this) {
+	        this.free = false;            
+        }
 	}
-
 	
 	public void free() {
-		this.free = true;
+	    synchronized (this) {
+	        this.free = true;
+
+	        // Signal the consumer that we are done in case it was waiting
+	        consumer.notifyWorkerIsFree(this); 
+	    }
 	}
-	
-	public boolean isFree() {
-		return free;
+
+	public boolean isPriority() {
+	    return isPriorityWorker;
+	}
+
+	public boolean isFree(boolean prioritized) {
+	    synchronized (this) {
+	        return free && (prioritized == isPriorityWorker);
+	    }
 	}
 }
--- a/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java	Tue Jun 01 18:04:46 2010 +0100
+++ b/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java	Mon Mar 15 13:35:18 2010 -0400
@@ -141,7 +141,7 @@
     				//System.err.println("Total wait time: " + totalWait);
 
     				if (s != null) {
-    					consumer.consume(s);
+    					consumer.queue(s);
     				} else {
     					try {
     						// Close input/output channels to plugin.
@@ -191,31 +191,6 @@
     	listenerThread.start();
     }
     
-    public void postMessage(String s) {
-
-    	if (s == null || s.equals("shutdown")) {
-    	    try {
-    		// Close input/output channels to plugin.
-    		pluginInputReader.close();
-    		pluginOutputWriter.close();
-    	    } catch (IOException exception) {
-    		// Deliberately ignore IOException caused by broken
-    		// pipe since plugin may have already detached.
-    	    }
-    	    AppletSecurityContextManager.dumpStore(0);
-    	    PluginDebug.debug("APPLETVIEWER: exiting appletviewer");
-    	    System.exit(0);
-    	}
-
-   		//PluginAppletSecurityContext.contexts.get(0).store.dump();
-   		PluginDebug.debug("Plugin posted: " + s);
-
-		PluginDebug.debug("Consuming " + s);
-		consumer.consume(s);
-
-   		PluginDebug.debug("Added to queue");
-    }
-    
     public void handleMessage(String message) throws PluginException {
 
     	int nextIndex = 0;