# HG changeset patch # User Omair Majid # Date 1222718263 14400 # Node ID 5ab54b0ca4ead63e025c3027eebd176e69a53fe1 # Parent b4390e330ff7309b949162f8756a3ba40ee994d5# Parent b185dd9217bb275c80a252b647dbaf76249f6ae3 2008-09-29 Omair Majid merged changes diff -r b4390e330ff7 -r 5ab54b0ca4ea src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java --- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java Mon Sep 29 14:01:57 2008 -0400 +++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java Mon Sep 29 15:57:43 2008 -0400 @@ -266,8 +266,6 @@ "Line must be open for close() to work"); } - drain(); - synchronized (eventLoop.threadLock) { stream.disconnect(); } @@ -283,6 +281,7 @@ } super.close(); + isStarted = false; } diff -r b4390e330ff7 -r 5ab54b0ca4ea src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java --- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java Mon Sep 29 14:01:57 2008 -0400 +++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLine.java Mon Sep 29 15:57:43 2008 -0400 @@ -45,6 +45,20 @@ public class PulseAudioTargetDataLine extends PulseAudioDataLine implements TargetDataLine { + /* + * This contains the data from the PulseAudio buffer that has since been + * dropped. If 20 bytes of a fragment of size 200 are read, the other 180 + * are dumped in this + */ + byte[] fragmentBuffer; + + /* + * these are set to true only by the respective functions (flush(), drain()) + * set to false only by read() + */ + boolean flushed = false; + boolean drained = false; + public PulseAudioTargetDataLine(EventLoop eventLoop, AudioFormat[] formats, AudioFormat defaultFormat) { supportedFormats = formats; @@ -55,7 +69,7 @@ } @Override - public void close() { + synchronized public void close() { if (!isOpen) { throw new IllegalStateException( "Line cant be closed if it isnt open"); @@ -68,7 +82,7 @@ } @Override - public void open(AudioFormat format, int bufferSize) + synchronized public void open(AudioFormat format, int bufferSize) throws LineUnavailableException { if (isOpen) { throw new IllegalStateException("already open"); @@ -83,14 +97,16 @@ } @Override - public void open(AudioFormat format) throws LineUnavailableException { + synchronized public void open(AudioFormat format) + throws LineUnavailableException { open(format, DEFAULT_BUFFER_SIZE); } protected void connectLine(int bufferSize, Stream masterStream) throws LineUnavailableException { + int fragmentSize = bufferSize / 10 > 500 ? bufferSize / 10 : 500; StreamBufferAttributes bufferAttributes = new StreamBufferAttributes( - bufferSize, 0, 0, 0, bufferSize / 10); + bufferSize, 0, 0, 0, fragmentSize); synchronized (eventLoop.threadLock) { stream.connectForRecording(Stream.DEFAULT_DEVICE, bufferAttributes); } @@ -123,24 +139,113 @@ int remainingLength = length; int sizeRead = 0; - while (remainingLength != 0) { + /* bytes read on each iteration of loop */ + int bytesRead; + + /* + * to read, we first take stuff from the fragmentBuffer + */ + + /* on first read() of the line, fragmentBuffer is null */ + if (fragmentBuffer != null) { + synchronized (this) { - synchronized (eventLoop.threadLock) { - int bytesRead = stream.read(data, remainingLength, position); - if (bytesRead < 0) { - return sizeRead; + boolean fragmentBufferSmaller = fragmentBuffer.length < length; + int smallerBufferLength = Math.min(fragmentBuffer.length, + length); + System.arraycopy(fragmentBuffer, 0, data, position, + smallerBufferLength); + framesSinceOpen += smallerBufferLength + / currentFormat.getFrameSize(); + + if (!fragmentBufferSmaller) { + /* + * if fragment was larger, then we already have all the data + * we need. clean up the buffer before returning. Make a new + * fragmentBuffer from the remaining bytes + */ + int remainingBytesInFragment = (fragmentBuffer.length - length); + byte[] newFragmentBuffer = new byte[remainingBytesInFragment]; + System.arraycopy(fragmentBuffer, length, newFragmentBuffer, + 0, newFragmentBuffer.length); + fragmentBuffer = newFragmentBuffer; + return length; } + + /* done with fragment buffer, remove it */ + bytesRead = smallerBufferLength; sizeRead += bytesRead; position += bytesRead; remainingLength -= bytesRead; - framesSinceOpen += bytesRead / currentFormat.getFrameSize(); + fragmentBuffer = null; + + } + } + + /* + * if we need to read more data, then we read from PulseAudio's buffer + */ + while (remainingLength != 0) { + synchronized (this) { + + if (!isOpen || !isStarted) { + return sizeRead; + } + + if (flushed) { + flushed = false; + return sizeRead; + } + + if (drained) { + drained = false; + return sizeRead; + } + + byte[] currentFragment; + synchronized (eventLoop.threadLock) { + /* read a fragment, and drop it from the server */ + currentFragment = stream.peek(); + stream.drop(); + if (currentFragment == null) { + System.out + .println("DEBUG: PulseAudioTargetDataLine:read(): error in stream.peek()"); + return sizeRead; + } + + bytesRead = Math.min(currentFragment.length, + remainingLength); + + /* + * we read more than we required, save the rest of the data + * in the fragmentBuffer + */ + if (bytesRead < currentFragment.length) { + /* allocate a buffer to store unsaved data */ + fragmentBuffer = new byte[currentFragment.length + - bytesRead]; + + /* copy over the unsaved data */ + System.arraycopy(currentFragment, bytesRead, + fragmentBuffer, 0, currentFragment.length + - bytesRead); + } + + System.arraycopy(currentFragment, 0, data, position, + bytesRead); + + sizeRead += bytesRead; + position += bytesRead; + remainingLength -= bytesRead; + framesSinceOpen += bytesRead / currentFormat.getFrameSize(); + } } } // all the data should have been played by now assert (sizeRead == length); - // currentFramePosition += (sizeWritten/getFormat().getFrameSize()); + /* * FIXME when the stream is flushed() etc, instead of returning length * this should unblock and return the the size of data written so far @@ -156,9 +261,18 @@ throw new IllegalStateException("must call open() before drain()"); } + synchronized (this) { + drained = true; + } + // blocks when there is data on the line // http://www.jsresources.org/faq_audio.html#stop_drain_tdl - while (isStarted) { + while (true) { + synchronized (this) { + if (!isStarted || !isOpen) { + break; + } + } try { Thread.sleep(100); } catch (InterruptedException e) { @@ -180,6 +294,10 @@ operation.waitForCompletion(); operation.releaseReference(); + synchronized (this) { + flushed = true; + } + } public int available() { @@ -211,14 +329,14 @@ */ @Override - public void start() { + synchronized public void start() { super.start(); fireLineEvent(new LineEvent(this, LineEvent.Type.START, framesSinceOpen)); } @Override - public void stop() { + synchronized public void stop() { super.stop(); fireLineEvent(new LineEvent(this, LineEvent.Type.STOP, framesSinceOpen)); diff -r b4390e330ff7 -r 5ab54b0ca4ea src/java/org/classpath/icedtea/pulseaudio/Stream.java --- a/src/java/org/classpath/icedtea/pulseaudio/Stream.java Mon Sep 29 14:01:57 2008 -0400 +++ b/src/java/org/classpath/icedtea/pulseaudio/Stream.java Mon Sep 29 15:57:43 2008 -0400 @@ -54,7 +54,7 @@ public interface StateListener { public void update(); } - + public interface CorkListener { public void update(); } @@ -164,9 +164,6 @@ private native int native_pa_stream_write(byte[] data, int offset, int length); - private native int native_pa_stream_read(byte[] array, int length, - int position); - private native byte[] native_pa_stream_peek(); private native int native_pa_stream_drop(); @@ -323,7 +320,7 @@ underflowListeners.remove(listener); } } - + public void addCorkListener(CorkListener listener) { synchronized (corkListeners) { corkListeners.add(listener); @@ -501,10 +498,6 @@ return native_pa_stream_write(data, offset, length); } - public int read(byte[] array, int length, int position) { - return native_pa_stream_read(array, length, position); - } - /** * Read the next fragment from the buffer (for recording). * @@ -636,7 +629,7 @@ } } } - + @SuppressWarnings("unused") private void corkCallback() { synchronized (corkListeners) { diff -r b4390e330ff7 -r 5ab54b0ca4ea src/native/org_classpath_icedtea_pulseaudio_Stream.c --- a/src/native/org_classpath_icedtea_pulseaudio_Stream.c Mon Sep 29 14:01:57 2008 -0400 +++ b/src/native/org_classpath_icedtea_pulseaudio_Stream.c Mon Sep 29 15:57:43 2008 -0400 @@ -525,6 +525,7 @@ return value; } +/* JNIEXPORT jint JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1read (JNIEnv *env, jobject obj, jbyteArray array, jint length, jint offset) { pa_stream *stream = getJavaPointer(env, obj, STREAM_POINTER); @@ -540,6 +541,7 @@ pa_stream_drop(stream); return read_length; } +*/ /* * Class: org_classpath_icedtea_pulseaudio_Stream diff -r b4390e330ff7 -r 5ab54b0ca4ea unittests/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLineTest.java --- a/unittests/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLineTest.java Mon Sep 29 14:01:57 2008 -0400 +++ b/unittests/org/classpath/icedtea/pulseaudio/PulseAudioTargetDataLineTest.java Mon Sep 29 15:57:43 2008 -0400 @@ -55,6 +55,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class PulseAudioTargetDataLineTest { @@ -68,6 +69,28 @@ AudioFormat aSupportedFormat = new AudioFormat( AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 1, 1, 44100f, true); + class ThreadReader extends Thread { + TargetDataLine line; + byte[] buffer; + + public ThreadReader(TargetDataLine line, byte[] buffer) + throws LineUnavailableException { + + this.line = line; + this.buffer = buffer; + + } + + @Override + public void run() { + int bytesRead = 0; + + bytesRead = line.read(buffer, 0, buffer.length); + // System.out.println("read data"); + + } + } + @Before public void setUp() throws LineUnavailableException { Mixer.Info[] mixerInfos = AudioSystem.getMixerInfo(); @@ -147,7 +170,7 @@ @Test public void testOpenWithFormat() throws LineUnavailableException { - System.out.println("This test checks that read() sort of wroks"); + System.out.println("This test checks that open(AudioFormat) works"); targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info( TargetDataLine.class)); @@ -214,6 +237,143 @@ } @Test + public void testReadAndClose() throws LineUnavailableException, + InterruptedException { + System.out.println("This test tries to close a line while " + + "read()ing to check that read() returns"); + targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info( + TargetDataLine.class)); + Assert.assertNotNull(targetDataLine); + AudioFormat breakingFormat = new AudioFormat( + AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f, + true); + targetDataLine.open(breakingFormat); + targetDataLine.start(); + byte[] buffer = new byte[1000000]; + + ThreadReader reader = new ThreadReader(targetDataLine, buffer); + reader.start(); + + Thread.sleep(100); + + Assert.assertTrue(reader.isAlive()); + + targetDataLine.close(); + + reader.join(500); + + Assert.assertFalse(reader.isAlive()); + + } + + @Test + public void testReadAndStop() throws LineUnavailableException, + InterruptedException { + targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info( + TargetDataLine.class)); + Assert.assertNotNull(targetDataLine); + AudioFormat breakingFormat = new AudioFormat( + AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f, + true); + targetDataLine.open(breakingFormat); + targetDataLine.start(); + byte[] buffer = new byte[10000000]; + + ThreadReader reader = new ThreadReader(targetDataLine, buffer); + reader.start(); + + Thread.sleep(100); + + Assert.assertTrue(reader.isAlive()); + + targetDataLine.stop(); + + Thread.sleep(100); + + Assert.assertFalse(reader.isAlive()); + + targetDataLine.close(); + + } + + // this is kind of messed up + // drain should hang on a started data line + // but read should return + @Test + public void testReadAndDrain() throws LineUnavailableException, + InterruptedException { + targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info( + TargetDataLine.class)); + Assert.assertNotNull(targetDataLine); + AudioFormat breakingFormat = new AudioFormat( + AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f, + true); + targetDataLine.open(breakingFormat); + targetDataLine.start(); + byte[] buffer = new byte[10000000]; + + ThreadReader reader = new ThreadReader(targetDataLine, buffer); + reader.start(); + + Thread.sleep(100); + + Assert.assertTrue(reader.isAlive()); + + Thread drainer = new Thread() { + + @Override + public void run() { + targetDataLine.drain(); + + } + + }; + + drainer.start(); + + Thread.sleep(100); + + Assert.assertFalse(reader.isAlive()); + + targetDataLine.stop(); + + Thread.sleep(100); + Assert.assertFalse(drainer.isAlive()); + + targetDataLine.close(); + } + + @Test + public void testReadAndFlush() throws LineUnavailableException, + InterruptedException { + targetDataLine = (TargetDataLine) mixer.getLine(new Line.Info( + TargetDataLine.class)); + Assert.assertNotNull(targetDataLine); + AudioFormat breakingFormat = new AudioFormat( + AudioFormat.Encoding.PCM_UNSIGNED, 44100f, 8, 2, 2, 44100f, + true); + targetDataLine.open(breakingFormat); + targetDataLine.start(); + byte[] buffer = new byte[10000000]; + + ThreadReader reader = new ThreadReader(targetDataLine, buffer); + reader.start(); + + Thread.sleep(100); + + Assert.assertTrue(reader.isAlive()); + + targetDataLine.flush(); + + Thread.sleep(100); + + Assert.assertFalse(reader.isAlive()); + + targetDataLine.stop(); + targetDataLine.close(); + } + + @Test public void testDrain() throws LineUnavailableException, InterruptedException { System.out