changeset 153:5ab54b0ca4ea

2008-09-29 Omair Majid <omajid@redhat.com> merged changes
author Omair Majid <omajid@redhat.com>
date Mon, 29 Sep 2008 15:57:43 -0400
parents b4390e330ff7 (current diff) b185dd9217bb (diff)
children 59f5e34743f6
files src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java
diffstat 5 files changed, 299 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Mon Sep 29 14:01:57 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Mon Sep 29 15:57:43 2008 -0400
@@ -266,8 +266,6 @@
 					"Line must be open for close() to work");
 		}
 
-		drain();
-
 		synchronized (eventLoop.threadLock) {
 			stream.disconnect();
 		}
@@ -283,6 +281,7 @@
 		}
 
 		super.close();
+		isStarted = false;
 
 	}
 
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java	Mon Sep 29 14:01:57 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java	Mon Sep 29 15:57:43 2008 -0400
@@ -45,6 +45,20 @@
 public class PulseAudioTargetDataLine extends PulseAudioDataLine implements
 		TargetDataLine {
 
+	/*
+	 * This contains the data from the PulseAudio buffer that has since been
+	 * dropped. If 20 bytes of a fragment of size 200 are read, the other 180
+	 * are dumped in this
+	 */
+	byte[] fragmentBuffer;
+
+	/*
+	 * these are set to true only by the respective functions (flush(), drain())
+	 * set to false only by read()
+	 */
+	boolean flushed = false;
+	boolean drained = false;
+
 	public PulseAudioTargetDataLine(EventLoop eventLoop, AudioFormat[] formats,
 			AudioFormat defaultFormat) {
 		supportedFormats = formats;
@@ -55,7 +69,7 @@
 	}
 
 	@Override
-	public void close() {
+	synchronized public void close() {
 		if (!isOpen) {
 			throw new IllegalStateException(
 					"Line cant be closed if it isnt open");
@@ -68,7 +82,7 @@
 	}
 
 	@Override
-	public void open(AudioFormat format, int bufferSize)
+	synchronized public void open(AudioFormat format, int bufferSize)
 			throws LineUnavailableException {
 		if (isOpen) {
 			throw new IllegalStateException("already open");
@@ -83,14 +97,16 @@
 	}
 
 	@Override
-	public void open(AudioFormat format) throws LineUnavailableException {
+	synchronized public void open(AudioFormat format)
+			throws LineUnavailableException {
 		open(format, DEFAULT_BUFFER_SIZE);
 	}
 
 	protected void connectLine(int bufferSize, Stream masterStream)
 			throws LineUnavailableException {
+		int fragmentSize = bufferSize / 10 > 500 ? bufferSize / 10 : 500;
 		StreamBufferAttributes bufferAttributes = new StreamBufferAttributes(
-				bufferSize, 0, 0, 0, bufferSize / 10);
+				bufferSize, 0, 0, 0, fragmentSize);
 		synchronized (eventLoop.threadLock) {
 			stream.connectForRecording(Stream.DEFAULT_DEVICE, bufferAttributes);
 		}
@@ -123,24 +139,113 @@
 		int remainingLength = length;
 		int sizeRead = 0;
 
-		while (remainingLength != 0) {
+		/* bytes read on each iteration of loop */
+		int bytesRead;
+
+		/*
+		 * to read, we first take stuff from the fragmentBuffer
+		 */
+
+		/* on first read() of the line, fragmentBuffer is null */
+		if (fragmentBuffer != null) {
+			synchronized (this) {
 
-			synchronized (eventLoop.threadLock) {
-				int bytesRead = stream.read(data, remainingLength, position);
-				if (bytesRead < 0) {
-					return sizeRead;
+				boolean fragmentBufferSmaller = fragmentBuffer.length < length;
+				int smallerBufferLength = Math.min(fragmentBuffer.length,
+						length);
+				System.arraycopy(fragmentBuffer, 0, data, position,
+						smallerBufferLength);
+				framesSinceOpen += smallerBufferLength
+						/ currentFormat.getFrameSize();
+
+				if (!fragmentBufferSmaller) {
+					/*
+					 * if fragment was larger, then we already have all the data
+					 * we need. clean up the buffer before returning. Make a new
+					 * fragmentBuffer from the remaining bytes
+					 */
+					int remainingBytesInFragment = (fragmentBuffer.length - length);
+					byte[] newFragmentBuffer = new byte[remainingBytesInFragment];
+					System.arraycopy(fragmentBuffer, length, newFragmentBuffer,
+							0, newFragmentBuffer.length);
+					fragmentBuffer = newFragmentBuffer;
+					return length;
 				}
+
+				/* done with fragment buffer, remove it */
+				bytesRead = smallerBufferLength;
 				sizeRead += bytesRead;
 				position += bytesRead;
 				remainingLength -= bytesRead;
-				framesSinceOpen += bytesRead / currentFormat.getFrameSize();
+				fragmentBuffer = null;
+
+			}
+		}
+
+		/*
+		 * if we need to read more data, then we read from PulseAudio's buffer
+		 */
+		while (remainingLength != 0) {
+			synchronized (this) {
+
+				if (!isOpen || !isStarted) {
+					return sizeRead;
+				}
+
+				if (flushed) {
+					flushed = false;
+					return sizeRead;
+				}
+
+				if (drained) {
+					drained = false;
+					return sizeRead;
+				}
+
+				byte[] currentFragment;
+				synchronized (eventLoop.threadLock) {
 
+					/* read a fragment, and drop it from the server */
+					currentFragment = stream.peek();
+					stream.drop();
+					if (currentFragment == null) {
+						System.out
+								.println("DEBUG: PulseAudioTargetDataLine:read(): error in stream.peek()");
+						return sizeRead;
+					}
+
+					bytesRead = Math.min(currentFragment.length,
+							remainingLength);
+
+					/*
+					 * we read more than we required, save the rest of the data
+					 * in the fragmentBuffer
+					 */
+					if (bytesRead < currentFragment.length) {
+						/* allocate a buffer to store unsaved data */
+						fragmentBuffer = new byte[currentFragment.length
+								- bytesRead];
+
+						/* copy over the unsaved data */
+						System.arraycopy(currentFragment, bytesRead,
+								fragmentBuffer, 0, currentFragment.length
+										- bytesRead);
+					}
+
+					System.arraycopy(currentFragment, 0, data, position,
+							bytesRead);
+
+					sizeRead += bytesRead;
+					position += bytesRead;
+					remainingLength -= bytesRead;
+					framesSinceOpen += bytesRead / currentFormat.getFrameSize();
+				}
 			}
 		}
 
 		// all the data should have been played by now
 		assert (sizeRead == length);
-		// currentFramePosition += (sizeWritten/getFormat().getFrameSize());
+
 		/*
 		 * FIXME when the stream is flushed() etc, instead of returning length
 		 * this should unblock and return the the size of data written so far
@@ -156,9 +261,18 @@
 			throw new IllegalStateException("must call open() before drain()");
 		}
 
+		synchronized (this) {
+			drained = true;
+		}
+
 		// blocks when there is data on the line
 		// http://www.jsresources.org/faq_audio.html#stop_drain_tdl
-		while (isStarted) {
+		while (true) {
+			synchronized (this) {
+				if (!isStarted || !isOpen) {
+					break;
+				}
+			}
 			try {
 				Thread.sleep(100);
 			} catch (InterruptedException e) {
@@ -180,6 +294,10 @@
 		operation.waitForCompletion();
 		operation.releaseReference();
 
+		synchronized (this) {
+			flushed = true;
+		}
+
 	}
 
 	public int available() {
@@ -211,14 +329,14 @@
 	 */
 
 	@Override
-	public void start() {
+	synchronized public void start() {
 		super.start();
 
 		fireLineEvent(new LineEvent(this, LineEvent.Type.START, framesSinceOpen));
 	}
 
 	@Override
-	public void stop() {
+	synchronized public void stop() {
 		super.stop();
 
 		fireLineEvent(new LineEvent(this, LineEvent.Type.STOP, framesSinceOpen));
--- a/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Mon Sep 29 14:01:57 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Mon Sep 29 15:57:43 2008 -0400
@@ -54,7 +54,7 @@
 	public interface StateListener {
 		public void update();
 	}
-	
+
 	public interface CorkListener {
 		public void update();
 	}
@@ -164,9 +164,6 @@
 	private native int native_pa_stream_write(byte[] data, int offset,
 			int length);
 
-	private native int native_pa_stream_read(byte[] array, int length,
-			int position);
-
 	private native byte[] native_pa_stream_peek();
 
 	private native int native_pa_stream_drop();
@@ -323,7 +320,7 @@
 			underflowListeners.remove(listener);
 		}
 	}
-	
+
 	public void addCorkListener(CorkListener listener) {
 		synchronized (corkListeners) {
 			corkListeners.add(listener);
@@ -501,10 +498,6 @@
 		return native_pa_stream_write(data, offset, length);
 	}
 
-	public int read(byte[] array, int length, int position) {
-		return native_pa_stream_read(array, length, position);
-	}
-
 	/**
 	 * Read the next fragment from the buffer (for recording).
 	 * 
@@ -636,7 +629,7 @@
 			}
 		}
 	}
-	
+
 	@SuppressWarnings("unused")
 	private void corkCallback() {
 		synchronized (corkListeners) {
--- a/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Mon Sep 29 14:01:57 2008 -0400
+++ b/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Mon Sep 29 15:57:43 2008 -0400
@@ -525,6 +525,7 @@
 	return value;
 }
 
+/*
 JNIEXPORT jint JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1read
 (JNIEnv *env, jobject obj, jbyteArray array, jint length, jint offset) {
 	pa_stream *stream = getJavaPointer(env, obj, STREAM_POINTER);
@@ -540,6 +541,7 @@
 	pa_stream_drop(stream);
 	return read_length;
 }
+*/
 
 /*
  * Class:     org_classpath_icedtea_pulseaudio_Stream
--- a/unittests/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLineTest.java	Mon Sep 29 14:01:57 2008 -0400
+++ b/unittests/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLineTest.java	Mon Sep 29 15:57:43 2008 -0400
@@ -55,6 +55,7 @@
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class PulseAudioTargetDataLineTest {
@@ -68,6 +69,28 @@
 	AudioFormat aSupportedFormat = new AudioFormat(
 			AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 1, 1, 44100f, true);
 
+	class ThreadReader extends Thread {
+		TargetDataLine line;
+		byte[] buffer;
+
+		public ThreadReader(TargetDataLine line, byte[] buffer)
+				throws LineUnavailableException {
+
+			this.line = line;
+			this.buffer = buffer;
+
+		}
+
+		@Override
+		public void run() {
+			int bytesRead = 0;
+
+			bytesRead = line.read(buffer, 0, buffer.length);
+			// System.out.println("read data");
+
+		}
+	}
+
 	@Before
 	public void setUp() throws LineUnavailableException {
 		Mixer.Info[] mixerInfos = AudioSystem.getMixerInfo();
@@ -147,7 +170,7 @@
 
 	@Test
 	public void testOpenWithFormat() throws LineUnavailableException {
-		System.out.println("This test checks that read() sort of wroks");
+		System.out.println("This test checks that open(AudioFormat) works");
 
 		targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info(
 				TargetDataLine.class));
@@ -214,6 +237,143 @@
 	}
 
 	@Test
+	public void testReadAndClose() throws LineUnavailableException,
+			InterruptedException {
+		System.out.println("This test tries to close a line while "
+				+ "read()ing to check that read() returns");
+		targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info(
+				TargetDataLine.class));
+		Assert.assertNotNull(targetDataLine);
+		AudioFormat breakingFormat = new AudioFormat(
+				AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f,
+				true);
+		targetDataLine.open(breakingFormat);
+		targetDataLine.start();
+		byte[] buffer = new byte[1000000];
+
+		ThreadReader reader = new ThreadReader(targetDataLine, buffer);
+		reader.start();
+
+		Thread.sleep(100);
+
+		Assert.assertTrue(reader.isAlive());
+
+		targetDataLine.close();
+
+		reader.join(500);
+
+		Assert.assertFalse(reader.isAlive());
+
+	}
+
+	@Test
+	public void testReadAndStop() throws LineUnavailableException,
+			InterruptedException {
+		targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info(
+				TargetDataLine.class));
+		Assert.assertNotNull(targetDataLine);
+		AudioFormat breakingFormat = new AudioFormat(
+				AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f,
+				true);
+		targetDataLine.open(breakingFormat);
+		targetDataLine.start();
+		byte[] buffer = new byte[10000000];
+
+		ThreadReader reader = new ThreadReader(targetDataLine, buffer);
+		reader.start();
+
+		Thread.sleep(100);
+
+		Assert.assertTrue(reader.isAlive());
+
+		targetDataLine.stop();
+
+		Thread.sleep(100);
+
+		Assert.assertFalse(reader.isAlive());
+
+		targetDataLine.close();
+
+	}
+
+	// this is kind of messed up
+	// drain should hang on a started data line
+	// but read should return
+	@Test
+	public void testReadAndDrain() throws LineUnavailableException,
+			InterruptedException {
+		targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info(
+				TargetDataLine.class));
+		Assert.assertNotNull(targetDataLine);
+		AudioFormat breakingFormat = new AudioFormat(
+				AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f,
+				true);
+		targetDataLine.open(breakingFormat);
+		targetDataLine.start();
+		byte[] buffer = new byte[10000000];
+
+		ThreadReader reader = new ThreadReader(targetDataLine, buffer);
+		reader.start();
+
+		Thread.sleep(100);
+
+		Assert.assertTrue(reader.isAlive());
+
+		Thread drainer = new Thread() {
+
+			@Override
+			public void run() {
+				targetDataLine.drain();
+
+			}
+
+		};
+
+		drainer.start();
+
+		Thread.sleep(100);
+
+		Assert.assertFalse(reader.isAlive());
+
+		targetDataLine.stop();
+
+		Thread.sleep(100);
+		Assert.assertFalse(drainer.isAlive());
+
+		targetDataLine.close();
+	}
+
+	@Test
+	public void testReadAndFlush() throws LineUnavailableException,
+			InterruptedException {
+		targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info(
+				TargetDataLine.class));
+		Assert.assertNotNull(targetDataLine);
+		AudioFormat breakingFormat = new AudioFormat(
+				AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f,
+				true);
+		targetDataLine.open(breakingFormat);
+		targetDataLine.start();
+		byte[] buffer = new byte[10000000];
+
+		ThreadReader reader = new ThreadReader(targetDataLine, buffer);
+		reader.start();
+
+		Thread.sleep(100);
+
+		Assert.assertTrue(reader.isAlive());
+
+		targetDataLine.flush();
+
+		Thread.sleep(100);
+
+		Assert.assertFalse(reader.isAlive());
+
+		targetDataLine.stop();
+		targetDataLine.close();
+	}
+
+	@Test
 	public void testDrain() throws LineUnavailableException,
 			InterruptedException {
 		System.out