changeset 136:2d384ad19c3e

Ioana Ivan <iivan@redhat.com> * src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java: -split open() into createStream(), addStreamListeners() and connect(), which can be reused when reconnecting the line for synchronization -added recconectForSynchronization() -made some changes to stop()/start() in order to send START/STOP events both when corking and in case of underflow * src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java: -changes to synchronize() * src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java: -changed connectLine to take the masterStream as a parameter in case we want to synchronize the Line * src/java/org/classpath/icedtea/pulseaudio/PulseAudioClip.java: -changed connectLine to take the masterStream as a parameter in case we want to synchronize the Line
author Ioana Ivan <iivan@redhat.com>
date Tue, 23 Sep 2008 17:01:26 -0400
parents c46f6e0e7959
children 438aa072a80d
files ChangeLog src/java/org/classpath/icedtea/pulseaudio/PulseAudioClip.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java src/java/org/classpath/icedtea/pulseaudio/Stream.java src/native/org_classpath_icedtea_pulseaudio_Stream.c unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineRawTest.java unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java
diffstat 11 files changed, 392 insertions(+), 96 deletions(-) [+]
line wrap: on
line diff
--- a/ChangeLog	Tue Sep 23 15:02:55 2008 -0400
+++ b/ChangeLog	Tue Sep 23 17:01:26 2008 -0400
@@ -1,3 +1,23 @@
+2008-09-23 Ioana Ivan <iivan@redhat.com>
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java:
+		-split open() into createStream(), addStreamListeners() and
+		connect(), which can be reused when reconnecting the line for
+		synchronization
+		-added recconectForSynchronization()
+		-made some changes to stop()/start() in order to send
+		START/STOP events both when corking and in case of underflow
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java:
+		-changes to synchronize()
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java:
+	         -changed connectLine to take the masterStream as a
+		 parameter in case we want to synchronize the Line
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioClip.java:
+		 -changed connectLine to take the masterStream as a
+		 parameter in case we want to synchronize the Line
+						                          
+
+			
+
 2008-08-13 Ioana Ivan  <iivan@redhat.com>
 	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioPort.java:
 		superclass for TargetPort and SourcePort
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioClip.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioClip.java	Tue Sep 23 17:01:26 2008 -0400
@@ -192,11 +192,19 @@
 
 	}
 
-	protected void connectLine(int bufferSize) throws LineUnavailableException {
+	protected void connectLine(int bufferSize, Stream masterStream) throws LineUnavailableException{
 		StreamBufferAttributes bufferAttributes = new StreamBufferAttributes(
 				bufferSize, bufferSize / 2, bufferSize / 2, bufferSize / 2, 0);
 
-		stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes);
+		if(masterStream != null) {
+			synchronized (eventLoop.threadLock) {
+				stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes, masterStream.getStreamPointer());
+			}
+		} else {
+			synchronized (eventLoop.threadLock) {
+				stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes, null);
+			}
+		}
 	}
 
 	@Override
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Tue Sep 23 17:01:26 2008 -0400
@@ -37,6 +37,7 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.util.ArrayList;
 import java.util.concurrent.Semaphore;
 
 import javax.sound.sampled.AudioFormat;
@@ -45,6 +46,8 @@
 import javax.sound.sampled.LineEvent;
 import javax.sound.sampled.LineUnavailableException;
 
+import org.classpath.icedtea.pulseaudio.Stream.WriteListener;
+
 public abstract class PulseAudioDataLine extends PulseAudioLine implements
 		DataLine {
 
@@ -56,7 +59,8 @@
 	// true between start() and stop()
 	protected boolean isStarted = false;
 
-	protected boolean isEngagedInIo = false;
+	protected boolean dataWritten = false;
+	protected boolean corked = true;
 
 	// true if a stream has been paused
 	// protected boolean isPaused = false;
@@ -64,12 +68,14 @@
 	protected AudioFormat[] supportedFormats = null;
 	protected AudioFormat currentFormat = null;
 	protected AudioFormat defaultFormat = null;
+	protected boolean sendEvents = true;
 
 	protected int bufferSize = 0;
 
 	protected EventLoop eventLoop = null;
 	protected Semaphore semaphore = new Semaphore(0);
 	protected Stream stream;
+	private ArrayList<PulseAudioDataLine> synchronizedLines;
 
 	public void open(AudioFormat format, int bufferSize)
 			throws LineUnavailableException {
@@ -78,6 +84,16 @@
 			throw new IllegalStateException("DataLine is already open");
 		}
 
+		createStream(format);
+
+		addStreamListeners();
+		connect(null, bufferSize);
+
+	}
+
+	public void createStream(AudioFormat format)
+			throws LineUnavailableException {
+
 		for (AudioFormat myFormat : supportedFormats) {
 			if (format.matches(myFormat)) {
 				synchronized (eventLoop.threadLock) {
@@ -91,11 +107,16 @@
 				super.open();
 			}
 		}
-		// no matches found
+
 		if (!isOpen) {
 			throw new IllegalArgumentException("Invalid format");
 		}
 
+		System.out.println("Stream " + stream + " created");
+
+	}
+
+	public void addStreamListeners() {
 		Stream.StateListener openCloseListener = new Stream.StateListener() {
 
 			@Override
@@ -116,14 +137,24 @@
 					 */
 
 					if (stream.getState() == Stream.State.READY) {
-						fireLineEvent(new LineEvent(PulseAudioDataLine.this,
-								LineEvent.Type.OPEN, AudioSystem.NOT_SPECIFIED));
+						if (sendEvents) {
+							fireLineEvent(new LineEvent(
+									PulseAudioDataLine.this,
+									LineEvent.Type.OPEN,
+									AudioSystem.NOT_SPECIFIED));
+						}
 						semaphore.release();
+
 					} else if (stream.getState() == Stream.State.TERMINATED
 							|| stream.getState() == Stream.State.FAILED) {
-						fireLineEvent((new LineEvent(PulseAudioDataLine.this,
-								LineEvent.Type.CLOSE, AudioSystem.NOT_SPECIFIED)));
+						if (sendEvents) {
+							fireLineEvent((new LineEvent(
+									PulseAudioDataLine.this,
+									LineEvent.Type.CLOSE,
+									AudioSystem.NOT_SPECIFIED)));
+						}
 						semaphore.release();
+
 					}
 				}
 			}
@@ -134,9 +165,13 @@
 		Stream.UnderflowListener stoppedListener = new Stream.UnderflowListener() {
 			@Override
 			public void update() {
-				isEngagedInIo = false;
-				fireLineEvent(new LineEvent(PulseAudioDataLine.this,
-						LineEvent.Type.STOP, AudioSystem.NOT_SPECIFIED));
+				dataWritten = false;
+				
+				if (!corked) {
+					fireLineEvent(new LineEvent(PulseAudioDataLine.this,
+							LineEvent.Type.STOP, AudioSystem.NOT_SPECIFIED));
+				}
+
 			}
 		};
 		stream.addUnderflowListener(stoppedListener);
@@ -144,34 +179,64 @@
 		Stream.PlaybackStartedListener startedListener = new Stream.PlaybackStartedListener() {
 			@Override
 			public void update() {
-				isEngagedInIo = true;
-				fireLineEvent(new LineEvent(PulseAudioDataLine.this,
-						LineEvent.Type.START, AudioSystem.NOT_SPECIFIED));
+
+				dataWritten = true;
+				if (!corked) {
+					fireLineEvent(new LineEvent(PulseAudioDataLine.this,
+							LineEvent.Type.START, AudioSystem.NOT_SPECIFIED));
+				}
+
 			}
-
 		};
 
 		stream.addPlaybackStartedListener(startedListener);
 
-		try {
-			synchronized (eventLoop.threadLock) {
-				connectLine(bufferSize);
+		WriteListener writeNotifier = new WriteListener() {
+
+			@Override
+			public void update() {
+				// System.out.println("can write");
+				eventLoop.threadLock.notifyAll();
 			}
 
+		};
+		stream.addWriteListener(writeNotifier);
+
+		Stream.CorkListener corkListener = new Stream.CorkListener() {
+
+			@Override
+			public void update() {
+
+				eventLoop.threadLock.notifyAll();
+			}
+
+		};
+		stream.addCorkListener(corkListener);
+	}
+
+	public void connect(Stream masterStream, int bufferSize)
+			throws LineUnavailableException {
+
+		try {
+			synchronized (eventLoop.threadLock) {
+				connectLine(bufferSize, masterStream);
+			}
 		} catch (LineUnavailableException e) {
 			// error connecting to the server!
-			stream.removePlaybackStartedListener(startedListener);
-			stream.removeUnderflowListener(stoppedListener);
-			stream.removeStateListener(openCloseListener);
+			//stream.removePlaybackStartedListener(startedListener);
+			//stream.removeUnderflowListener(stoppedListener);
+			//stream.removeStateListener(openCloseListener);
 			stream.free();
 			stream = null;
 			throw e;
+
 		}
 		this.bufferSize = bufferSize;
 		try {
 			semaphore.acquire();
 			synchronized (eventLoop.threadLock) {
 				if (stream.getState() != Stream.State.READY) {
+					System.out.println(stream.getState());
 					stream.disconnect();
 					stream.free();
 					throw new LineUnavailableException(
@@ -200,6 +265,8 @@
 					"Line must be open for close() to work");
 		}
 
+		drain();
+
 		synchronized (eventLoop.threadLock) {
 			stream.disconnect();
 		}
@@ -218,21 +285,88 @@
 
 	}
 
+	public void reconnectforSynchronization(Stream masterStream) {
+		sendEvents = false;
+		drain();
+
+		synchronized (eventLoop.threadLock) {
+			stream.disconnect();
+		}
+		try {
+			semaphore.acquire();
+		} catch (InterruptedException e) {
+			throw new RuntimeException("unable to prepare stream");
+		}
+		try {
+			createStream(getFormat());
+		} catch (LineUnavailableException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		addStreamListeners();
+		try {
+			connect(masterStream, getBufferSize());
+		} catch (LineUnavailableException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		sendEvents = true;
+	}
+
 	public void start() {
 		if (!isOpen) {
 			throw new IllegalStateException(
 					"Line must be open()ed before it can be start()ed");
 		}
 
+		if (!corked) {
+			System.out.println("Already started, returning");
+			return;
+		}
+
+		corked = false;
+		Operation op;
+		synchronized (eventLoop.threadLock) {
+			op = stream.unCork();
+		}
+
+		op.waitForCompletion();
+		op.releaseReference();
 		isStarted = true;
+		
+		/*if (dataWritten) {
+			fireLineEvent(new LineEvent(PulseAudioDataLine.this,
+					LineEvent.Type.START, AudioSystem.NOT_SPECIFIED));
+		}*/
+
 	}
 
-	public void stop() {
+	public synchronized void stop() {
 		if (!isOpen) {
 			throw new IllegalStateException(
 					"Line must be open()ed before it can be start()ed");
+
 		}
+		if (corked) {
+			return;
+		}
+		corked = true;
+		Operation op;
+		synchronized (eventLoop.threadLock) {
+			op = stream.cork();
+		}
+
+		op.waitForCompletion();
+		op.releaseReference();
+
 		isStarted = false;
+		if (dataWritten) {
+			fireLineEvent(new LineEvent(PulseAudioDataLine.this,
+					LineEvent.Type.STOP, AudioSystem.NOT_SPECIFIED));
+		}
+
+		isStarted = false;
+
 	}
 
 	/*
@@ -257,10 +391,10 @@
 	}
 
 	public boolean isRunning() {
-		return isEngagedInIo;
+		return !corked && dataWritten;
 	}
 
-	protected abstract void connectLine(int bufferSize)
+	protected abstract void connectLine(int bufferSize, Stream masterStream)
 			throws LineUnavailableException;
 
 	public abstract void drain();
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioLine.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioLine.java	Tue Sep 23 17:01:26 2008 -0400
@@ -114,9 +114,9 @@
 
 	@Override
 	public void open() throws LineUnavailableException {
-		if (isOpen) {
+		/*if (isOpen) {
 			throw new IllegalStateException("Line is already open");
-		}
+		}*/
 		isOpen = true;
 	}
 
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java	Tue Sep 23 17:01:26 2008 -0400
@@ -38,6 +38,7 @@
 package org.classpath.icedtea.pulseaudio;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -61,6 +62,7 @@
 import javax.sound.sampled.Port;
 import javax.sound.sampled.SourceDataLine;
 import javax.sound.sampled.TargetDataLine;
+import javax.sound.sampled.UnsupportedAudioFileException;
 import javax.sound.sampled.AudioFormat.Encoding;
 import javax.sound.sampled.Control.Type;
 
@@ -393,13 +395,32 @@
 	@Override
 	public boolean isSynchronizationSupported(Line[] lines, boolean maintainSync) {
 		// FIXME
-		return false;
+		return true;
 	}
 
 	@Override
 	public void synchronize(Line[] lines, boolean maintainSync) {
-		// FIXME pulse audio supports this
-		throw new IllegalArgumentException();
+		Line masterStream = null;
+		for (Line line : lines) {
+			if (line.isOpen()) {
+				masterStream = line;
+				break;
+			}
+		}
+		if(masterStream == null) {
+			//for now, can't synchronize lines if none of them is open (no stream pointer to pass)
+			//will see what to do about this later
+			throw new IllegalArgumentException();
+		}
+		
+		for(Line line : lines) {
+			if(line != masterStream) {
+			
+					((PulseAudioDataLine) line).reconnectforSynchronization(((PulseAudioDataLine) masterStream).getStream());
+				
+			}
+		}
+	
 	}
 
 	@Override
@@ -631,6 +652,10 @@
 		}
 	}
 
+	public static void debug(String string) {
+		System.out.println("DEBUG: " + string);
+	}
+	
 	public static void main(String[] args) throws Exception {
 		Mixer.Info mixerInfos[] = AudioSystem.getMixerInfo();
 		Mixer.Info selectedMixerInfo = null;
@@ -648,29 +673,99 @@
 				.getMixer(selectedMixerInfo);
 
 		mixer.open();
-		File soundFile = new File("testsounds/startup.wav");
-		AudioInputStream audioInputStream = AudioSystem
-				.getAudioInputStream(soundFile);
-		AudioFormat audioFormat = audioInputStream.getFormat();
+
+		String fileName1 = "testsounds/startup.wav";
+		PulseAudioSourceDataLine line1;
+		line1 = (PulseAudioSourceDataLine) mixer.getLine(new Line.Info(
+				SourceDataLine.class));
+		line1.setName("Line 1");
+
+		String fileName2 = "testsounds/logout.wav";
+		PulseAudioSourceDataLine line2;
+		line2 = (PulseAudioSourceDataLine) mixer.getLine(new Line.Info(
+				SourceDataLine.class));
+		line2.setName("Line 2");
+		
 
-		SourceDataLine line;
-		line = (SourceDataLine) mixer.getLine(new DataLine.Info(
-				SourceDataLine.class, audioFormat));
+		ThreadWriter writer1 = mixer.new ThreadWriter(line1, fileName1);
+		ThreadWriter writer2 = mixer.new ThreadWriter(line2, fileName2);
+		//line2.start();
+	//	line1.start();
+		
+		Line[] lines = { line1, line2 };
+		mixer.synchronize(lines, true);
+		
+	//	line2.stop();
+		
+		debug("PulseAudioMixer: " + line1.getName() + " and " + line2.getName()
+				+ " synchronized");
+		writer1.start();
+		writer2.start();
+
+		debug("PulseAudioMixer: writers started");
+		line2.start();
+		//line1.stop();
+		//line1.start();
+		debug("PulseAudioMixer: Started a line");
+
+		writer1.join();
+		writer2.join();
+
+		debug("PulseAudioMixer: both lines joined");
+
+		line2.close();
+		debug("PulseAudioMixer: " + line2.getName() + " closed");
+
+		line1.close();
+		debug("PulseAudioMixer: " + line1.getName() + " closed");
 
-		line.open();
-		Port.Info info = new Port.Info(Port.class,
-				"alsa_output.pci_8086_24d5_sound_card_0_alsa_playback_0", false);
-		Port port = (Port) mixer.getLine(info);
-		FloatControl control = (FloatControl) port
-				.getControl(FloatControl.Type.VOLUME);
-		control.getValue();
-		control.setValue(0);
-		control.getValue();
-		System.out.println(control.getValue());
 		mixer.close();
+		debug("PulseAudioMixer: mixer closed");
 
 	}
 
+	public class ThreadWriter extends Thread {
+
+		private PulseAudioSourceDataLine line;
+		private AudioInputStream audioInputStream;
+
+		public ThreadWriter(PulseAudioSourceDataLine line, String fileName)
+				throws UnsupportedAudioFileException, IOException,
+				LineUnavailableException {
+			this.line = line;
+			File soundFile1 = new File(fileName);
+			audioInputStream = AudioSystem.getAudioInputStream(soundFile1);
+			AudioFormat audioFormat = audioInputStream.getFormat();
+			line.open(audioFormat);
+		}
+
+		@Override
+		public void run() {
+			debug("PulseAudioMixer: ThreadWriter: run(): entering");
+
+			//line.start();
+			//debug("PulseAudioMixer: " + line.getName() + " started");
+
+			byte[] abData = new byte[1000];
+			int bytesRead = 0;
+			try {
+				while (bytesRead >= 0) {
+					bytesRead = audioInputStream.read(abData, 0, abData.length);
+					if (bytesRead > 0) {
+						line.write(abData, 0, bytesRead);
+						//debug("PulseAudioMixer: wrote " + bytesRead + "data on "
+						//		+ line.getName());
+					}
+				}
+			} catch (IOException e) {
+				debug("PulseAudioMixer: ThreadWriter: run(): exception doing a read()");
+			}
+
+			debug("PulseAudioMixer: ThreadWriter: run(): leaving");
+
+		}
+	}
+
 	void addSourceLine(PulseAudioLine line) {
 		sourceLines.add(line);
 	}
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Tue Sep 23 17:01:26 2008 -0400
@@ -53,6 +53,7 @@
 	private PulseAudioVolumeControl volumeControl;
 	private boolean muted;
 	private float volume;
+	private int bytesAvailableToWrite;
 
 	private long currentFramePosition = 0;
 
@@ -104,15 +105,25 @@
 
 	}
 
-	protected void connectLine(int bufferSize) throws LineUnavailableException {
+	protected void connectLine(int bufferSize, Stream masterStream) throws LineUnavailableException {
 		StreamBufferAttributes bufferAttributes = new StreamBufferAttributes(
+
 				bufferSize, bufferSize / 4, bufferSize / 8,
 				((bufferSize / 10) > 100 ? bufferSize / 10 : 100), 0);
 
-		synchronized (eventLoop.threadLock) {
-			stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes);
+
+		if(masterStream != null) {
+			synchronized (eventLoop.threadLock) {
+				stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes, masterStream.getStreamPointer());
+			}
+		} else {
+			synchronized (eventLoop.threadLock) {
+				stream.connectForPlayback(Stream.DEFAULT_DEVICE, bufferAttributes, null);
+			}
 		}
 	}
+	
+
 
 	@Override
 	public int write(byte[] data, int offset, int length) {
@@ -122,6 +133,12 @@
 			throw new IllegalStateException("must call open() before write()");
 		}
 
+
+		/*if (!isStarted) {
+			throw new IllegalStateException("must call start() before write()");
+		}*/
+
+
 		int frameSize = currentFormat.getFrameSize();
 		if (length % frameSize != 0) {
 			throw new IllegalArgumentException(
@@ -141,15 +158,7 @@
 
 		int sizeWritten = 0;
 
-		WriteListener writeNotifier = new WriteListener() {
-
-			@Override
-			public void update() {
-				eventLoop.threadLock.notifyAll();
-			}
-
-		};
-		stream.addWriteListener(writeNotifier);
+	
 
 		boolean interrupted = false;
 
@@ -157,6 +166,7 @@
 
 			synchronized (eventLoop.threadLock) {
 				availableSize = stream.getWritableSize();
+			
 				do {
 					if (availableSize < 0) {
 						return sizeWritten;
@@ -171,22 +181,29 @@
 						}
 
 					}
+					
 					availableSize = stream.getWritableSize();
+					//System.out.println(availableSize);
+					
 
 				} while (availableSize == 0);
+			
 
 				if (availableSize > remainingLength) {
 					availableSize = remainingLength;
 				}
 				/* write a little bit of the buffer */
-				stream.write(data, position, availableSize);
+		
+					stream.write(data, position, availableSize);
+					//System.out.println("written " + availableSize);
+	
 
 				sizeWritten += availableSize;
 				position += availableSize;
 				remainingLength -= availableSize;
-
+				}
 			}
-		}
+		
 
 		// all the data should have been played by now
 		assert (sizeWritten == length);
@@ -196,10 +213,10 @@
 		 * this should unblock and return the the size of data written so far
 		 */
 
-		stream.removeWriteListener(writeNotifier);
 		if (interrupted) {
 			Thread.currentThread().interrupt();
 		}
+
 		return sizeWritten;
 	}
 
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java	Tue Sep 23 17:01:26 2008 -0400
@@ -83,7 +83,7 @@
 		parentMixer.addTargetLine(this);
 	}
 
-	protected void connectLine(int bufferSize) throws LineUnavailableException {
+	protected void connectLine(int bufferSize, Stream masterStream) throws LineUnavailableException{
 		StreamBufferAttributes bufferAttributes = new StreamBufferAttributes(
 				bufferSize, 0, 0, 0, bufferSize / 10);
 		synchronized (eventLoop.threadLock) {
--- a/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Tue Sep 23 17:01:26 2008 -0400
@@ -54,6 +54,10 @@
 	public interface StateListener {
 		public void update();
 	}
+	
+	public interface CorkListener {
+		public void update();
+	}
 
 	public interface WriteListener {
 		public void update();
@@ -124,6 +128,7 @@
 	private List<LatencyUpdateListener> latencyUpdateListeners;
 	private List<MovedListener> movedListeners;
 	private List<SuspendedListener> suspendedListeners;
+	private List<CorkListener> corkListeners;
 
 	private native void native_pa_stream_new(byte[] contextPointer,
 			String name, String format, int sampleRate, int channels);
@@ -248,7 +253,7 @@
 		latencyUpdateListeners = new LinkedList<LatencyUpdateListener>();
 		movedListeners = new LinkedList<MovedListener>();
 		suspendedListeners = new LinkedList<SuspendedListener>();
-
+		corkListeners = new LinkedList<CorkListener>();
 		this.format = format;
 
 		StreamSampleSpecification spec = new StreamSampleSpecification(format,
@@ -318,6 +323,18 @@
 			underflowListeners.remove(listener);
 		}
 	}
+	
+	public void addCorkListener(CorkListener listener) {
+		synchronized (corkListeners) {
+			corkListeners.add(listener);
+		}
+	}
+
+	public void removeCorkListener(CorkListener listener) {
+		synchronized (corkListeners) {
+			corkListeners.remove(listener);
+		}
+	}
 
 	public void addPlaybackStartedListener(PlaybackStartedListener listener) {
 		synchronized (playbackStartedListeners) {
@@ -430,14 +447,14 @@
 	 * @throws LineUnavailableException
 	 */
 	public void connectForPlayback(String deviceName,
-			StreamBufferAttributes bufferAttributes)
+			StreamBufferAttributes bufferAttributes, byte[] syncStreamPointer)
 			throws LineUnavailableException {
 
 		int returnValue = native_pa_stream_connect_playback(deviceName,
 				bufferAttributes.getMaxLength(), bufferAttributes
 						.getTargetLength(), bufferAttributes.getPreBuffering(),
 				bufferAttributes.getMinimumRequest(), bufferAttributes
-						.getFragmentSize(), 0, null, null);
+						.getFragmentSize(), 0, null, syncStreamPointer);
 		if (returnValue < 0) {
 			throw new LineUnavailableException(
 					"Unable To connect a line for playback");
@@ -619,6 +636,15 @@
 			}
 		}
 	}
+	
+	@SuppressWarnings("unused")
+	private void corkCallback() {
+		synchronized (corkListeners) {
+			for (CorkListener listener : corkListeners) {
+				listener.update();
+			}
+		}
+	}
 
 	/**
 	 * whenever the sink/source this stream is connected to is suspended or
--- a/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Tue Sep 23 15:02:55 2008 -0400
+++ b/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Tue Sep 23 17:01:26 2008 -0400
@@ -400,8 +400,17 @@
 		jint bufferMinimumRequest, jint bufferFragmentSize, jint flags,
 		jbyteArray volumePointer, jbyteArray sync_streamPointer) {
 
+	pa_stream *sync_stream;
+        if(sync_streamPointer != NULL) {
+        	sync_stream = convertJavaPointerToNative(env, sync_streamPointer);
+        	printf("Master stream is %p\n", sync_stream);
+        } else {
+        	sync_stream = NULL;
+        }
+
 	pa_stream* stream = (pa_stream*) getJavaPointer(env, obj, STREAM_POINTER);
 
+
 	pa_buffer_attr buffer_attr;
 
 	memset(&buffer_attr, 0, sizeof(buffer_attr));
@@ -426,7 +435,7 @@
 			return -1; // oome thrown
 		}
 	}
-	int value = pa_stream_connect_playback(stream, dev, &buffer_attr, 0, NULL, NULL);
+	int value = pa_stream_connect_playback(stream, dev, &buffer_attr, PA_STREAM_START_CORKED, NULL, sync_stream);
 
 	if (dev != NULL) {
 		(*env)->ReleaseStringUTFChars(env, device, dev);
@@ -631,10 +640,15 @@
 }
 
 static void cork_callback(pa_stream* stream, int success, void* userdata) {
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
 	assert(success);
 	JNIEnv* env = pulse_thread_env;
 	assert(env);
 	notifyWaitingOperations(env);
+
 }
 
 /*
@@ -646,7 +660,11 @@
 (JNIEnv* env, jobject obj, jint yes) {
 	pa_stream* stream = (pa_stream*)getJavaPointer(env, obj, STREAM_POINTER);
 	assert(stream);
-	pa_operation* operation = pa_stream_cork(stream, yes, cork_callback, NULL);
+	java_context* j_context = malloc(sizeof(java_context));
+	assert(j_context);
+	j_context->env = env;
+	j_context->obj = (*env)->NewGlobalRef(env, obj);
+	pa_operation* operation = pa_stream_cork(stream, yes, cork_callback, j_context);
 	assert(operation);
 	return convertNativePointerToJava(env, operation);
 }
--- a/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineRawTest.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineRawTest.java	Tue Sep 23 17:01:26 2008 -0400
@@ -177,36 +177,12 @@
 		Thread.sleep(1000);
 
 		// CORK
-		Operation o;
-		synchronized (EventLoop.getEventLoop().threadLock) {
-			o = line.getStream().cork();
-		}
-
-		o.waitForCompletion();
-		o.releaseReference();
-		synchronized (EventLoop.getEventLoop().threadLock) {
-			System.out.println("corked? " + line.getStream().isCorked());
-			assert (line.getStream().isCorked() == true);
-		}
-
-		System.out.println("corked");
+		line.stop();
 
 		Thread.sleep(1000);
 
 		// UNCORK
-		synchronized (EventLoop.getEventLoop().threadLock) {
-			o = line.getStream().unCork();
-		}
-
-		o.waitForCompletion();
-		o.releaseReference();
-
-		System.out.println("uncorked");
-
-		synchronized (EventLoop.getEventLoop().threadLock) {
-			System.out.println("corked? " + line.getStream().isCorked());
-			assert (line.getStream().isCorked() == false);
-		}
+		line.start();
 
 		Thread.sleep(1000);
 
--- a/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java	Tue Sep 23 15:02:55 2008 -0400
+++ b/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java	Tue Sep 23 17:01:26 2008 -0400
@@ -328,12 +328,14 @@
 
 		System.out.println("corked");
 
-		Thread.sleep(1000);
+		Thread.sleep(5000);
 
 		// UNCORK
+
 		sourceDataLine.start();
 
-		Thread.sleep(1000);
+
+		Thread.sleep(2000);
 
 		// System.out.println("waiting for thread to finish");
 		writer.join();