Mercurial > hg > release > icedtea7-2.0
changeset 2201:85e376bde322
Liveconnect message processing design changes.
- Redesigned message consumption/processing model to allow arbitrary number
of applets to load without increasing MAX_WORKERS count.
- Implemented priority queuing to allow blocking threads to get responses first.
- Made message consumption non-blocking and fixed thread-safety issues therein
2010-03-13 Deepak Bhole <dbhole@redhat.com>
* plugin/icedteanp/java/sun/applet/PluginAppletViewer.java
(requestPluginCookieInfo): Register cookie info as a priority message.
(requestPluginProxyInfo): Register proxy info as a priority message.
* plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java: Re-designed
message consumption to implement priority queuing and parallel
initialization limits so that an arbitrary number of applets can load
without blocking one another, all in a thread-safe manner.
(registerPriorityWait): New method. Registers a string that is
considered a "priority" string, which gets delegated to dedicated worker
threads.
(unRegisterPriorityWait): Unregisters a string so that it is no longer
a priority.
(PluginMessageConsumer): Do not create any workers when starting.
(consume): Remove method. Consumption is now done in a separate dedicated
thread to prevent blocking.
(getPriorityStrIfPriority): New method. If the given message is priority,
return why (i.e the 'priority string'it matched).
(isInInit): New method. Returns if the plugin is currently initializing an
applet.
(addToInitWorkers): New method. Adds given worker to list of workers
currently initializting applets.
(okayToProcess): New method. Returns whether or not it is okay to process
the given applet.
(notifyWorkerIsFree): New method. Notifies this class that a worker has
just become free.
(queue): Queues the given message for consumption.
(ConsumerThread): New protected inner (thread) class. Responsible for
consuming messages in the queue, and regueuing them if consumption if not
possible.
(getFreeWorker): Changed to be non-blocking, and return either a priority
worker or a normal worker depending on what is requested.
* plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java
(PluginMessageHandlerWorker): Set new priority and consumer variables.
(busy): Make thread-safe by waiting on same property that free() waits on.
(free): Make thread-safe by waiting on same property that busy() waits on.
(isPriority): New method. Returns of worker is a priority worker.
(isFree): Made thread-safe, and accounts for priority.
* plugin/icedteanp/java/sun/applet/PluginStreamHandler.java
(startProcessing): Call consumer.queue() instead of consumer.consume().
(postMessage): Remove unused method.
author | Deepak Bhole <dbhole@redhat.com> |
---|---|
date | Mon, 15 Mar 2010 13:35:18 -0400 |
parents | 53d08a835f60 |
children | 8a1a5c294219 |
files | ChangeLog plugin/icedteanp/java/sun/applet/PluginAppletViewer.java plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java plugin/icedteanp/java/sun/applet/PluginStreamHandler.java |
diffstat | 5 files changed, 298 insertions(+), 81 deletions(-) [+] |
line wrap: on
line diff
--- a/ChangeLog Tue Jun 01 18:04:46 2010 +0100 +++ b/ChangeLog Mon Mar 15 13:35:18 2010 -0400 @@ -1,3 +1,46 @@ +2010-03-13 Deepak Bhole <dbhole@redhat.com> + + * plugin/icedteanp/java/sun/applet/PluginAppletViewer.java + (requestPluginCookieInfo): Register cookie info as a priority message. + (requestPluginProxyInfo): Register proxy info as a priority message. + * plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java: Re-designed + message consumption to implement priority queuing and parallel + initialization limits so that an arbitrary number of applets can load + without blocking one another, all in a thread-safe manner. + (registerPriorityWait): New method. Registers a string that is + considered a "priority" string, which gets delegated to dedicated worker + threads. + (unRegisterPriorityWait): Unregisters a string so that it is no longer + a priority. + (PluginMessageConsumer): Do not create any workers when starting. + (consume): Remove method. Consumption is now done in a separate dedicated + thread to prevent blocking. + (getPriorityStrIfPriority): New method. If the given message is priority, + return why (i.e the 'priority string'it matched). + (isInInit): New method. Returns if the plugin is currently initializing an + applet. + (addToInitWorkers): New method. Adds given worker to list of workers + currently initializting applets. + (okayToProcess): New method. Returns whether or not it is okay to process + the given applet. + (notifyWorkerIsFree): New method. Notifies this class that a worker has + just become free. + (queue): Queues the given message for consumption. + (ConsumerThread): New protected inner (thread) class. Responsible for + consuming messages in the queue, and regueuing them if consumption if not + possible. + (getFreeWorker): Changed to be non-blocking, and return either a priority + worker or a normal worker depending on what is requested. + * plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java + (PluginMessageHandlerWorker): Set new priority and consumer variables. + (busy): Make thread-safe by waiting on same property that free() waits on. + (free): Make thread-safe by waiting on same property that busy() waits on. + (isPriority): New method. Returns of worker is a priority worker. + (isFree): Made thread-safe, and accounts for priority. + * plugin/icedteanp/java/sun/applet/PluginStreamHandler.java + (startProcessing): Call consumer.queue() instead of consumer.consume(). + (postMessage): Remove unused method. + 2010-03-12 Andrew John Hughes <ahughes@redhat.com> * Makefile.am:
--- a/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java Tue Jun 01 18:04:46 2010 +0100 +++ b/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java Mon Mar 15 13:35:18 2010 -0400 @@ -1278,6 +1278,7 @@ return null; } + PluginMessageConsumer.registerPriorityWait(reference); streamhandler.postCallRequest(request); streamhandler.write(request.getMessage()); try { @@ -1323,6 +1324,7 @@ "plugin PluginProxyInfo reference " + reference + " " + requestURI, reference); + PluginMessageConsumer.registerPriorityWait(reference); streamhandler.postCallRequest(request); streamhandler.write(request.getMessage()); try {
--- a/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java Tue Jun 01 18:04:46 2010 +0100 +++ b/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java Mon Mar 15 13:35:18 2010 -0400 @@ -38,82 +38,257 @@ package sun.applet; import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; import java.util.LinkedList; - -import sun.applet.AppletSecurity; +import java.util.Set; class PluginMessageConsumer { - int MAX_WORKERS = 20; + private static int MAX_PARALLEL_INITS = 1; + + // Each initialization requires 5 responses (tag, handle, width, proxy, cookie) + // before the message stack unlocks/collapses. This works out well because we + // want to allow upto 5 parallel tasks anyway + private static int MAX_WORKERS = MAX_PARALLEL_INITS*5; + private static int PRIORITY_WORKERS = MAX_PARALLEL_INITS*2; + + private static Hashtable<Integer, PluginMessageHandlerWorker> initWorkers = new Hashtable<Integer, PluginMessageHandlerWorker>(2); + LinkedList<String> readQueue = new LinkedList<String>(); + private static LinkedList<String> priorityWaitQueue = new LinkedList<String>(); ArrayList<PluginMessageHandlerWorker> workers = new ArrayList<PluginMessageHandlerWorker>(); PluginStreamHandler streamHandler = null; AppletSecurity as; + ConsumerThread consumerThread = new ConsumerThread(); + /** + * Registers a reference to wait for. Responses to registered priority + * references get handled by priority worker if normal workers are busy. + * + * @param reference The reference to give priority to + */ + public static void registerPriorityWait(Long reference) { + PluginDebug.debug("Registering priority for reference " + reference); + registerPriorityWait("reference " + reference.toString()); + } + + /** + * Registers a string to wait for. + * + * @param searchString the string to look for in a response + */ + public static void registerPriorityWait(String searchString) { + PluginDebug.debug("Registering priority for string " + searchString); + synchronized (priorityWaitQueue) { + if (!priorityWaitQueue.contains(searchString)) + priorityWaitQueue.add(searchString); + } + } + + /** + * Unregisters a priority reference to wait for. + * + * @param reference The reference to remove + */ + public static void unRegisterPriorityWait(Long reference) { + unRegisterPriorityWait(reference.toString()); + } + + /** + * Unregisters a priority string to wait for. + * + * @param searchString The string to unregister from the priority list + */ + public static void unRegisterPriorityWait(String searchString) { + synchronized (priorityWaitQueue) { + priorityWaitQueue.remove(searchString); + } + } + + /** + * Returns the reference for this message. This method assumes that + * the message has a reference number. + * + * @param The message + * @return the reference number + */ + private Long getReference(String[] msgParts) { + return Long.parseLong(msgParts[3]); + } + public PluginMessageConsumer(PluginStreamHandler streamHandler) { as = new AppletSecurity(); this.streamHandler = streamHandler; + this.consumerThread.start(); + } - // create some workers at the start... - for (int i=0; i < 3; i++) { - PluginDebug.debug("Creating worker " + i); - PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, i, as); - worker.start(); - workers.add(worker); - } + private String getPriorityStrIfPriority(String message) { + + synchronized (priorityWaitQueue) { + Iterator<String> it = priorityWaitQueue.iterator(); + + while (it.hasNext()) { + String priorityStr = it.next(); + if (message.indexOf(priorityStr) > 0) + return priorityStr; + } + } + + return null; + } + + private boolean isInInit(Integer instanceNum) { + return initWorkers.containsKey(instanceNum); + } + + private void addToInitWorkers(Integer instanceNum, PluginMessageHandlerWorker worker) { + synchronized(initWorkers) { + initWorkers.put(instanceNum, worker); + } + } + + private boolean okayToProcess(String[] msgParts) { + + if (msgParts[2].equals("tag")) { + + Integer instanceNum = new Integer(msgParts[1]); + + synchronized(initWorkers) { + if (initWorkers.size() >= MAX_PARALLEL_INITS) { + return false; + } + } + + registerPriorityWait("instance " + instanceNum + " handle"); + registerPriorityWait("instance " + instanceNum + " width"); + + } else if (msgParts[2].equals("handle") || msgParts[2].equals("width")) { + Integer instanceNum = new Integer(msgParts[1]); + + // If this instance is not in init, return false immediately. + // Handle/Width messages should NEVER go before tag messages + if (!isInInit(instanceNum)) + return false; + } + + return true; + } + + public void notifyWorkerIsFree(PluginMessageHandlerWorker worker) { + synchronized (initWorkers) { + Iterator<Integer> i = initWorkers.keySet().iterator(); + while (i.hasNext()) { + Integer key = i.next(); + if (initWorkers.get(key).equals(worker)) + initWorkers.remove(key); + } + } + + consumerThread.interrupt(); } - public void consume(String message) { - - PluginDebug.debug("Consumer received message " + message); - - synchronized(readQueue) { - readQueue.add(message); - } - - PluginDebug.debug("Message " + message + " added to queue. Looking for free worker..."); - final PluginMessageHandlerWorker worker = getFreeWorker(); - - synchronized(readQueue) { - if (readQueue.size() > 0) { - worker.setmessage(readQueue.poll()); - } - } - - worker.interrupt(); + public void queue(String message) { + synchronized(readQueue) { + readQueue.addLast(message); + } + + // Wake that lazy consumer thread + consumerThread.interrupt(); } - private PluginMessageHandlerWorker getFreeWorker() { - - // FIXME: Can be made more efficient by having an idle worker pool + protected class ConsumerThread extends Thread { + public void run() { + + while (true) { + + String message = null; + + synchronized(readQueue) { + message = readQueue.poll(); + } + + if (message != null) { + + String[] msgParts = message.split(" "); + + // if it is no okay to process just yet, push it back and + if (!okayToProcess(msgParts)) { + synchronized(readQueue) { + readQueue.addLast(message); + } + + continue; // re-loop to try next msg + } + + String priorityStr = getPriorityStrIfPriority(message); + boolean isPriorityResponse = (priorityStr != null); - while (true) { + //PluginDebug.debug("Message " + message + " (priority=" + isPriorityResponse + ") ready to be processed. Looking for free worker..."); + final PluginMessageHandlerWorker worker = getFreeWorker(isPriorityResponse); + + if (worker == null) { + synchronized(readQueue) { + readQueue.addLast(message); + } + + continue; // re-loop to try next msg + } + + if (msgParts[2].equals("tag")) + addToInitWorkers((new Integer(msgParts[1])), worker); + + if (isPriorityResponse) { + unRegisterPriorityWait(priorityStr); + } + + worker.setmessage(message); + worker.interrupt(); + + } else { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } + } + } + } + + private PluginMessageHandlerWorker getFreeWorker(boolean prioritized) { + for (PluginMessageHandlerWorker worker: workers) { - if (worker.isFree()) { - PluginDebug.debug("Found free worker with id " + worker.getWorkerId()); + if (worker.isFree(prioritized)) { + PluginDebug.debug("Found free worker (" + worker.isPriority() + ") with id " + worker.getWorkerId()); // mark it busy before returning worker.busy(); return worker; } } - + // If we have less than MAX_WORKERS, create a new worker if (workers.size() <= MAX_WORKERS) { - PluginDebug.debug("Cannot find free worker, creating worker " + workers.size()); - PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, workers.size(), as); - worker.start(); - workers.add(worker); - worker.busy(); - return worker; - } else { - // else wait + PluginMessageHandlerWorker worker = null; + + if (workers.size() <= (MAX_WORKERS - PRIORITY_WORKERS)) { + PluginDebug.debug("Cannot find free worker, creating worker " + workers.size()); + worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, false); + } else if (prioritized) { + PluginDebug.debug("Cannot find free worker, creating priority worker " + workers.size()); + worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, true); + } else { + return null; + } + + worker.start(); + worker.busy(); + workers.add(worker); + return worker; + } - - Thread.yield(); - } - - //throw new RuntimeException("Out of message handler workers"); + + // No workers available. Better luck next time! + return null; } }
--- a/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java Tue Jun 01 18:04:46 2010 +0100 +++ b/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java Mon Mar 15 13:35:18 2010 -0400 @@ -42,15 +42,25 @@ class PluginMessageHandlerWorker extends Thread { private boolean free = true; + private boolean isPriorityWorker = false; private int id; private String message = null; private SecurityManager sm; PluginStreamHandler streamHandler = null; + PluginMessageConsumer consumer = null; - public PluginMessageHandlerWorker(PluginStreamHandler streamHandler, int id, SecurityManager sm) { + public PluginMessageHandlerWorker( + PluginMessageConsumer consumer, + PluginStreamHandler streamHandler, int id, + SecurityManager sm, boolean isPriorityWorker) { + this.id = id; this.streamHandler = streamHandler; this.sm = sm; + this.isPriorityWorker = isPriorityWorker; + this.consumer = consumer; + + PluginDebug.debug("Worker " + this.id + " (priority=" + isPriorityWorker + ") created."); } public void setmessage(String message) { @@ -107,15 +117,27 @@ } public void busy() { - this.free = false; + synchronized (this) { + this.free = false; + } } - public void free() { - this.free = true; + synchronized (this) { + this.free = true; + + // Signal the consumer that we are done in case it was waiting + consumer.notifyWorkerIsFree(this); + } } - - public boolean isFree() { - return free; + + public boolean isPriority() { + return isPriorityWorker; + } + + public boolean isFree(boolean prioritized) { + synchronized (this) { + return free && (prioritized == isPriorityWorker); + } } }
--- a/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java Tue Jun 01 18:04:46 2010 +0100 +++ b/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java Mon Mar 15 13:35:18 2010 -0400 @@ -141,7 +141,7 @@ //System.err.println("Total wait time: " + totalWait); if (s != null) { - consumer.consume(s); + consumer.queue(s); } else { try { // Close input/output channels to plugin. @@ -191,31 +191,6 @@ listenerThread.start(); } - public void postMessage(String s) { - - if (s == null || s.equals("shutdown")) { - try { - // Close input/output channels to plugin. - pluginInputReader.close(); - pluginOutputWriter.close(); - } catch (IOException exception) { - // Deliberately ignore IOException caused by broken - // pipe since plugin may have already detached. - } - AppletSecurityContextManager.dumpStore(0); - PluginDebug.debug("APPLETVIEWER: exiting appletviewer"); - System.exit(0); - } - - //PluginAppletSecurityContext.contexts.get(0).store.dump(); - PluginDebug.debug("Plugin posted: " + s); - - PluginDebug.debug("Consuming " + s); - consumer.consume(s); - - PluginDebug.debug("Added to queue"); - } - public void handleMessage(String message) throws PluginException { int nextIndex = 0;