changeset 171:4260a476a101

2008-10-08 Ioana Ivan <iivan@redhat.com> * src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java (addStreamListener): startedListener always fires a START event strtedListener notifies drain that there is data on the line (stop): sets writeInterrupted to true (start): doesn't send any events (getBytesInBuffer): new function, returns the number of bytes currently present in a stream's buffer * src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java: (write): -checks writeInterrupted instead of drain and flush -if stop() was called before write() it writes data to the buffer until the buffer fills, then blocks -moved some code from the synchronized(this) block, since it was causing one test to hang (close): sets writeInterrupted to true (drain): sets writeInterrupted to true if the line is stopped and there is no data on the line, returns immediately, if there is data, blocks until the line is started * src/java/org/classpath/icedtea/pulseaudio/Stream.java (native_pa_stream_updateTimingInfo): new function ( bytesInBuffer): new function * src/native/org_classpath_icedtea_pulseaudio_Stream.c (JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_bytesInBuffer): new function (Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1updateTimingInfo): new function (update_timing_info_callback): new function (Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1writable_1size): return 0 if the stream has been closed
author Ioana Ivan <iivan@redhat.com>
date Wed, 08 Oct 2008 14:27:39 -0400
parents a3a8e9e19967
children 66bcf656c0fb
files ChangeLog configure.ac src/java/org/classpath/icedtea/pulseaudio/EventLoop.java src/java/org/classpath/icedtea/pulseaudio/Operation.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourcePort.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetPort.java src/java/org/classpath/icedtea/pulseaudio/Stream.java src/native/org_classpath_icedtea_pulseaudio_Stream.c unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java
diffstat 12 files changed, 244 insertions(+), 92 deletions(-) [+]
line wrap: on
line diff
--- a/ChangeLog	Wed Oct 08 10:38:29 2008 -0400
+++ b/ChangeLog	Wed Oct 08 14:27:39 2008 -0400
@@ -1,3 +1,39 @@
+2008-10-08 Ioana Ivan <iivan@redhat.com>
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java
+	(addStreamListener): startedListener always fires a START event
+			     strtedListener notifies drain that there is data
+			     on the line
+	(stop): sets writeInterrupted to true
+	(start): doesn't send any events
+	(getBytesInBuffer): new function, returns the number of bytes
+	currently present in a stream's buffer
+
+	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java:
+	(write): -checks writeInterrupted instead of drain and flush
+	         -if stop() was called before write() it writes data to the
+		 buffer until the buffer fills, then blocks
+		 -moved some code from the synchronized(this) block, since it
+		 was causing one test to hang
+	(close): sets writeInterrupted to true
+	(drain): sets writeInterrupted to true
+		 if the line is stopped and there is no data on the line,
+		 returns immediately, if there is data, blocks until the line
+		 is started
+	
+	* src/java/org/classpath/icedtea/pulseaudio/Stream.java
+	(native_pa_stream_updateTimingInfo): new function
+	( bytesInBuffer): new function
+
+	* src/native/org_classpath_icedtea_pulseaudio_Stream.c
+	(JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_bytesInBuffer):
+	new function
+	(Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1updateTimingInfo):
+	new function
+	(update_timing_info_callback): new function
+	(Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1writable_1size):
+	return 0 if the stream has been closed
+		 
+
 2008-09-25 Ioana Ivan <iivan@redhat.com>
 	* src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java:
 	removed all references to the boolean variable corked since it was
--- a/configure.ac	Wed Oct 08 10:38:29 2008 -0400
+++ b/configure.ac	Wed Oct 08 14:27:39 2008 -0400
@@ -4,12 +4,12 @@
 AC_PROG_LIBTOOL
 
 dnl Check for pulseaudio libraries.
-PKG_CHECK_MODULES(LIBPULSE,[libpulse >= 0.9.11],[LIBPULSE_FOUND=yes]
+PKG_CHECK_MODULES(LIBPULSE,libpulse,[LIBPULSE_FOUND=yes]
     ,[LIBPULSE_FOUND=no])
 if test "x${LIBPULSE_FOUND}" = xno
 then
-  AC_MSG_ERROR([Could not find pulseaudio>=0.9.11 libraries - \
-  Try installing pulseaudio-libs-devel>=0.9.11.])
+  AC_MSG_ERROR([Could not find pulseaudio libraries - \
+  Try installing pulseaudio-libs-devel.])
 fi
 AC_SUBST(LIBPULSE_CFLAGS)
 AC_SUBST(LIBPULSE_LIBS)
--- a/src/java/org/classpath/icedtea/pulseaudio/EventLoop.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/EventLoop.java	Wed Oct 08 14:27:39 2008 -0400
@@ -37,6 +37,7 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Semaphore;
@@ -105,7 +106,7 @@
 	 */
 
 	static {
-			System.loadLibrary("pulse-java");
+		System.loadLibrary("pulse-java");
 	}
 
 	private EventLoop() {
@@ -197,6 +198,7 @@
 				break;
 			case 5:
 				fireEvent(new ContextEvent(Type.FAILED));
+				System.out.println("context failed");
 				break;
 			case 6:
 				fireEvent(new ContextEvent(Type.TERMINATED));
--- a/src/java/org/classpath/icedtea/pulseaudio/Operation.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/Operation.java	Wed Oct 08 14:27:39 2008 -0400
@@ -37,6 +37,7 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.io.IOException;
 
 /*
  * Encapsulates a pa_operation object
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioDataLine.java	Wed Oct 08 14:27:39 2008 -0400
@@ -76,6 +76,7 @@
 	protected EventLoop eventLoop = null;
 	protected Semaphore semaphore = new Semaphore(0);
 	protected Stream stream;
+	boolean writeInterrupted = false;
 
 	protected void open(AudioFormat format, int bufferSize)
 			throws LineUnavailableException {
@@ -149,6 +150,9 @@
 									PulseAudioDataLine.this,
 									LineEvent.Type.CLOSE, framesSinceOpen)));
 						}
+						synchronized (this) {
+							this.notifyAll();
+						}
 						semaphore.release();
 
 					}
@@ -174,15 +178,13 @@
 		Stream.PlaybackStartedListener startedListener = new Stream.PlaybackStartedListener() {
 			@Override
 			public void update() {
-
-				// only send a START event in the beginning and following
-				// an underflow
-				if (!dataWritten) {
-					fireLineEvent(new LineEvent(PulseAudioDataLine.this,
-							LineEvent.Type.START, framesSinceOpen));
-				}
+				fireLineEvent(new LineEvent(PulseAudioDataLine.this,
+						LineEvent.Type.START, framesSinceOpen));
 
 				dataWritten = true;
+				synchronized (this) {
+					this.notifyAll();
+				}
 
 			}
 		};
@@ -260,6 +262,7 @@
 	}
 
 	public void close() {
+
 		if (!isOpen) {
 			throw new IllegalStateException(
 					"Line must be open for close() to work");
@@ -280,10 +283,10 @@
 		}
 
 		super.close();
+
 		isStarted = false;
-
 	}
-
+	
 	public void reconnectforSynchronization(Stream masterStream)
 			throws LineUnavailableException {
 		sendEvents = false;
@@ -319,10 +322,6 @@
 		Operation op;
 		synchronized (eventLoop.threadLock) {
 			op = stream.unCork();
-			if (dataWritten && (!isStarted)) {
-				fireLineEvent(new LineEvent(PulseAudioDataLine.this,
-						LineEvent.Type.START, framesSinceOpen));
-			}
 		}
 
 		op.waitForCompletion();
@@ -331,16 +330,18 @@
 
 	}
 
+
 	public synchronized void stop() {
 		if (!isOpen) {
 			throw new IllegalStateException(
 					"Line must be open()ed before it can be start()ed");
 
 		}
+		writeInterrupted = true;
 		if (!isStarted) {
 			return;
 		}
-		isStarted = true;
+
 		Operation op;
 		synchronized (eventLoop.threadLock) {
 			op = stream.cork();
@@ -442,4 +443,14 @@
 		return streamName;
 	}
 
+	public int getBytesInBuffer() {
+		Operation o;
+		synchronized (eventLoop.threadLock) {
+			o = stream.updateTimingInfo();
+		}
+		o.waitForCompletion();
+		o.releaseReference();
+		return stream.bytesInBuffer();
+	}
+
 }
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioMixer.java	Wed Oct 08 14:27:39 2008 -0400
@@ -401,6 +401,7 @@
 
 	@Override
 	public boolean isSynchronizationSupported(Line[] lines, boolean maintainSync) {
+
 		return false;
 	}
 
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Wed Oct 08 14:27:39 2008 -0400
@@ -44,6 +44,8 @@
 import javax.sound.sampled.LineUnavailableException;
 import javax.sound.sampled.SourceDataLine;
 
+import org.classpath.icedtea.pulseaudio.Operation.State;
+
 public class PulseAudioSourceDataLine extends PulseAudioDataLine implements
 		SourceDataLine, PulseAudioPlaybackLine {
 
@@ -52,9 +54,6 @@
 	private boolean muted;
 	private float volume;
 
-	boolean flushed = false;
-	boolean drained = false;
-
 	public PulseAudioSourceDataLine(EventLoop eventLoop, AudioFormat[] formats,
 			AudioFormat defaultFormat) {
 
@@ -134,6 +133,10 @@
 	public int write(byte[] data, int offset, int length) {
 		// can't call write() without open()ing first, but can call write()
 		// without start()ing
+		synchronized (this) {
+			writeInterrupted = false;
+		}
+		
 		if (!isOpen) {
 			throw new IllegalStateException("must call open() before write()");
 		}
@@ -157,62 +160,57 @@
 
 		int sizeWritten = 0;
 
-		flushed = false;
-
 		boolean interrupted = false;
 
 		while (remainingLength != 0) {
 
-			synchronized (this) {
+			synchronized (eventLoop.threadLock) {
 
-				if (!isStarted || !isOpen) {
-					return sizeWritten;
-				}
+				do {
+					if (writeInterrupted) {
+						return sizeWritten;
+					}
 
-				if (flushed) {
-					flushed = false;
-					return sizeWritten;
-				}
+					if (availableSize == -1) {
+						return sizeWritten;
+					}
+					availableSize = stream.getWritableSize();
 
-				synchronized (eventLoop.threadLock) {
-
-					do {
-						availableSize = stream.getWritableSize();
-
-						if (availableSize < 0) {
-							return sizeWritten;
+					if (availableSize == 0) {
+						try {
+							eventLoop.threadLock.wait(100);
+						} catch (InterruptedException e) {
+							// ignore for now
+							interrupted = true;
 						}
 
-						if (availableSize == 0) {
-							try {
-								eventLoop.threadLock.wait();
-							} catch (InterruptedException e) {
-								// ignore for now
-								interrupted = true;
-							}
-
-						}
-
-					} while (availableSize == 0);
-
-					if (availableSize > remainingLength) {
-						availableSize = remainingLength;
 					}
 
-					// only write entire frames, so round down avialableSize to
-					// a
-					// multiple of frameSize
-					availableSize = (availableSize / frameSize) * frameSize;
+				} while (availableSize == 0);
+
+				if (availableSize > remainingLength) {
+					availableSize = remainingLength;
+				}
 
+				// only write entire frames, so round down avialableSize to
+				// a
+				// multiple of frameSize
+				availableSize = (availableSize / frameSize) * frameSize;
+
+				synchronized (this) {
+					if (writeInterrupted) {
+						return sizeWritten;
+					}
 					/* write a little bit of the buffer */
 					stream.write(data, position, availableSize);
+				}
 
-					sizeWritten += availableSize;
-					position += availableSize;
-					remainingLength -= availableSize;
+				sizeWritten += availableSize;
+				position += availableSize;
+				remainingLength -= availableSize;
 
-					framesSinceOpen += availableSize / frameSize;
-				}
+				framesSinceOpen += availableSize / frameSize;
+
 			}
 		}
 
@@ -260,6 +258,29 @@
 
 		}
 
+		synchronized (this) {
+			writeInterrupted = true;
+		}
+
+		do {
+			synchronized (this) {
+				if (!isOpen) {
+					return;
+				}
+				if (getBytesInBuffer() == 0) {
+					return;
+				}
+				if (isStarted || !isOpen) {
+					break;
+				}
+				try {
+					this.wait(100);
+				} catch (InterruptedException e) {
+					return;
+				}
+			}
+		} while (!isStarted);
+
 		Operation operation;
 
 		synchronized (eventLoop.threadLock) {
@@ -269,10 +290,6 @@
 		operation.waitForCompletion();
 		operation.releaseReference();
 
-		synchronized (this) {
-			drained = true;
-		}
-
 	}
 
 	@Override
@@ -281,6 +298,9 @@
 			throw new IllegalStateException(
 					"Line must be open before it can be flush()ed");
 		}
+		synchronized (this) {
+			writeInterrupted = true;
+		}
 
 		Operation operation;
 		synchronized (eventLoop.threadLock) {
@@ -290,10 +310,6 @@
 		operation.waitForCompletion();
 		operation.releaseReference();
 
-		synchronized (this) {
-			flushed = true;
-		}
-
 	}
 
 	@Override
@@ -301,6 +317,8 @@
 		if (!isOpen) {
 			throw new IllegalStateException("not open so cant close");
 		}
+		
+		writeInterrupted = true;
 
 		PulseAudioMixer parent = PulseAudioMixer.getInstance();
 		parent.removeSourceLine(this);
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourcePort.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourcePort.java	Wed Oct 08 14:27:39 2008 -0400
@@ -37,6 +37,8 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.io.IOException;
+
 import javax.sound.sampled.Port;
 
 public class PulseAudioSourcePort extends PulseAudioPort {
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetPort.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioTargetPort.java	Wed Oct 08 14:27:39 2008 -0400
@@ -37,6 +37,8 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.io.IOException;
+
 import javax.sound.sampled.Port;
 
 public class PulseAudioTargetPort extends PulseAudioPort {
--- a/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Wed Oct 08 14:27:39 2008 -0400
@@ -37,6 +37,7 @@
 
 package org.classpath.icedtea.pulseaudio;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -86,6 +87,10 @@
 		public void update();
 	}
 
+	public interface UpdateTimingInfoListener {
+		public void update();
+	}
+
 	public interface SuspendedListener {
 		public void update();
 	}
@@ -164,6 +169,10 @@
 
 	private native byte[] native_pa_stream_drain();
 
+	private native byte[] native_pa_stream_updateTimingInfo();
+
+	public native int bytesInBuffer();
+
 	/*
 	 * pa_operation pa_stream_update_timing_info (pa_stream *p,
 	 * pa_stream_success_cb_t cb, void *userdata) Request a timing info
@@ -370,6 +379,7 @@
 			suspendedListeners.remove(listener);
 		}
 	}
+		
 
 	public Stream.State getState() {
 		int state = native_pa_stream_get_state();
@@ -534,6 +544,12 @@
 		return drainOperation;
 	}
 
+	public Operation updateTimingInfo() {
+		Operation updateOperation = new Operation(
+				native_pa_stream_updateTimingInfo());
+		return updateOperation;
+	}
+
 	/**
 	 * this function is called whenever the state changes
 	 */
--- a/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Wed Oct 08 10:38:29 2008 -0400
+++ b/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Wed Oct 08 14:27:39 2008 -0400
@@ -153,6 +153,22 @@
 	}
 }
 
+
+static void update_timing_info_callback(pa_stream* stream, int success, void* userdata) {
+
+	assert(stream);
+	JNIEnv* env = pulse_thread_env;
+	assert(env);
+
+	notifyWaitingOperations(env);
+
+	if (success == 0) {
+		throwByName(env, ILLEGAL_STATE_EXCEPTION, "drain failed");
+	}
+
+}
+
+
 // requires pulseaudio 0.9.11 :(
 static void stream_started_callback(pa_stream *stream, void *userdata) {
 	// printf("stream_started_callback called\n");
@@ -437,7 +453,7 @@
 	/* Set flags to 0 to fix problem with draining before calling start, might need to
 	 be changed back to PA_STREAM_START_CORKED in the future, if we'll be able to implement
 	 synchronization*/
-	int value = pa_stream_connect_playback(stream, dev, &buffer_attr, 0, NULL, sync_stream);
+	int value = pa_stream_connect_playback(stream, dev, &buffer_attr, PA_STREAM_START_CORKED, NULL, sync_stream);
 
 	if (dev != NULL) {
 		(*env)->ReleaseStringUTFChars(env, device, dev);
@@ -577,7 +593,9 @@
 JNIEXPORT jint JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1writable_1size
 (JNIEnv* env, jobject obj) {
 	pa_stream* stream = (pa_stream*)getJavaPointer(env, obj, STREAM_POINTER);
-	assert(stream);
+	if(!stream) {
+		return 0;
+	}
 	size_t size = pa_stream_writable_size(stream);
 	return size;
 
@@ -953,9 +971,9 @@
  */
 JNIEXPORT jbyteArray JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1setVolume
 (JNIEnv *env, jobject obj, jfloat new_volume) {
+	
 	pa_stream *stream = getJavaPointer(env, obj, STREAM_POINTER);
 	assert(stream);
-
 	pa_context *context = pa_stream_get_context(stream);
 	assert(context);
 
@@ -970,3 +988,23 @@
 
 }
 
+JNIEXPORT jint JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_bytesInBuffer(JNIEnv *env, jobject obj) {
+	pa_stream *stream = getJavaPointer(env, obj, STREAM_POINTER);
+	assert(stream);
+	const pa_timing_info *timing_info = pa_stream_get_timing_info(stream);
+	int write_index = timing_info->write_index;
+	int read_index = timing_info->read_index;
+	return write_index - read_index;
+}
+
+JNIEXPORT jbyteArray JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1updateTimingInfo(JNIEnv* env, jobject obj) {
+	pa_stream *stream = getJavaPointer(env, obj, STREAM_POINTER);
+	assert(stream);
+	pa_operation* o = pa_stream_update_timing_info(stream, update_timing_info_callback, NULL); 
+	assert(o);
+	return convertNativePointerToJava(env, o);
+
+}
+
+
+
--- a/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java	Wed Oct 08 10:38:29 2008 -0400
+++ b/unittests/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLineTest.java	Wed Oct 08 14:27:39 2008 -0400
@@ -1065,8 +1065,9 @@
 	}
 
 	@Test
-	public void testDrainWithoutStart() throws LineUnavailableException,
-			UnsupportedAudioFileException, IOException, InterruptedException {
+	public void testDrainWithoutStartDataOnTheLine()
+			throws LineUnavailableException, UnsupportedAudioFileException,
+			IOException, InterruptedException {
 
 		File soundFile = new File("testsounds/logout.wav");
 		final AudioInputStream audioInputStream = AudioSystem
@@ -1080,22 +1081,16 @@
 		int available = sourceDataLine.available();
 		Assert.assertTrue(available > 1000);
 
-		Thread writer = new Thread() {
-			@Override
-			public void run() {
-				try {
-					final byte[] abData = new byte[100000];
-					int bytesRead = 0;
+		try {
+			final byte[] abData = new byte[2000];
+			int bytesRead = 0;
 
-					bytesRead = audioInputStream.read(abData, 0, abData.length);
-					Assert.assertTrue(bytesRead > 0);
-
-					sourceDataLine.write(abData, 0, bytesRead);
-				} catch (IOException e) {
-
-				}
-			}
-		};
+			bytesRead = audioInputStream.read(abData, 0, abData.length);
+			Assert.assertTrue(bytesRead > 0);
+			sourceDataLine.write(abData, 0, bytesRead);
+		} catch (IOException e) {
+			System.out.println("Error");
+		}
 
 		Thread drainer = new Thread() {
 			@Override
@@ -1104,7 +1099,6 @@
 			}
 		};
 
-		writer.start();
 		drainer.start();
 
 		drainer.join(1000);
@@ -1115,8 +1109,6 @@
 			if (drainer.isAlive()) {
 				Assert
 						.fail("drain() does not return when the line has been closed");
-			} else {
-				sourceDataLine.close();
 			}
 		} else {
 			Assert.fail("drain() does not block when there is data on the "
@@ -1125,6 +1117,39 @@
 
 	}
 
+	@Test
+	public void testDrainWithoutStartNoDataOnTheLine()
+			throws LineUnavailableException, UnsupportedAudioFileException,
+			IOException, InterruptedException {
+
+		File soundFile = new File("testsounds/logout.wav");
+		final AudioInputStream audioInputStream = AudioSystem
+				.getAudioInputStream(soundFile);
+		AudioFormat audioFormat = audioInputStream.getFormat();
+
+		sourceDataLine = (SourceDataLine) mixer.getLine(new DataLine.Info(
+				SourceDataLine.class, audioFormat, 1000));
+		Assert.assertNotNull(sourceDataLine);
+		sourceDataLine.open();
+		int available = sourceDataLine.available();
+		Assert.assertTrue(available > 1000);
+
+		Thread drainer = new Thread() {
+			@Override
+			public void run() {
+				sourceDataLine.drain();
+			}
+		};
+
+		drainer.start();
+		drainer.join(1000);
+
+		if (drainer.isAlive()) {
+			Assert
+					.fail("drain() does not return when there is no data on a line that hasn't been started");
+		}
+	}
+
 	@Test(expected = IllegalStateException.class)
 	public void testDrainWithoutOpen() throws LineUnavailableException {