changeset 75:a2034200b782

2008-08-15 Omair Majid <omajid@redhat.com> * src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java removed all native functions. uses the stream class to work now * src/java/org/classpath/icedtea/pulseaudio/PulseAudioStreamVolumeContro fixed to use the stream class * src/java/org/classpath/icedtea/pulseaudio/Stream.java: initial implementation * src/native/jni-common.c: added callJavaVoidMethod() * src/native/jni-common.h: same * src/native/org_classpath_icedtea_pulseaudio_Stream.c: implemented callbacks
author Omair Majid <omajid@redhat.com>
date Fri, 15 Aug 2008 10:26:12 -0400
parents 5f856aeca15a
children c7d9beb92d3d
files src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java src/java/org/classpath/icedtea/pulseaudio/PulseAudioStreamVolumeControl.java src/java/org/classpath/icedtea/pulseaudio/Stream.java src/native/jni-common.c src/native/jni-common.h src/native/org_classpath_icedtea_pulseaudio_Stream.c
diffstat 6 files changed, 478 insertions(+), 188 deletions(-) [+]
line wrap: on
line diff
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioSourceDataLine.java	Fri Aug 15 10:26:12 2008 -0400
@@ -37,7 +37,6 @@
 
 package org.classpath.icedtea.pulseaudio;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -62,7 +61,6 @@
 	private static final String PULSEAUDIO_FORMAT_KEY = "PulseAudioFormatKey";
 
 	private String streamName = "Java Stream";
-	private List<StreamListener> streamListeners = new ArrayList<StreamListener>();
 
 	private EventLoop eventLoop = null;
 
@@ -80,50 +78,13 @@
 	private PulseAudioStreamVolumeControl volumeControl;
 	private boolean muted;
 	private float volume;
-	
+
 	private long currentFramePosition = 0;
 
-	/*
-	 * When moving from 32bit platform to 64 bit platform, these variables
-	 * should make life fun
-	 * 
-	 */
-	@SuppressWarnings("unused")
-	private long streamPointer;
-
-	private native void native_open(long contextPointer, String streamName,
-			String encoding, int sampleRate, int channels, int bufferSize);
-
-	private native void native_write(byte[] data, int offset, int length);
-
-	private native int native_get_writable_size();
-
-	private native long native_flush();
-
-	private native void native_start();
-
-	private native void native_pause();
-
-	private native void native_resume();
-
-	private native long native_drain();
-
-	private native void native_close();
-
-	static {
-		try {
-			String library = new java.io.File(".").getCanonicalPath()
-					+ java.io.File.separatorChar
-					+ System.mapLibraryName("pulse-java");
-			System.out.println(library);
-			System.load(library);
-		} catch (IOException e) {
-			assert ("Loading failed".endsWith("library"));
-		}
-
-	}
+	private Stream stream;
 
 	public PulseAudioSourceDataLine(EventLoop eventLoop) {
+
 		this.eventLoop = eventLoop;
 		this.lineListeners = new ArrayList<LineListener>();
 		this.volume = PulseAudioVolumeControl.MAX_VOLUME;
@@ -338,10 +299,10 @@
 
 		for (AudioFormat myFormat : supportedFormats) {
 			if (format.matches(myFormat)) {
-				native_open(eventLoop.getContextPointer(), streamName,
-						(String) myFormat.getProperty(PULSEAUDIO_FORMAT_KEY),
-						(int) format.getSampleRate(), format.getChannels(),
-						bufferSize);
+				stream = new Stream(eventLoop.getContextPointer(), streamName,
+						Stream.Format.valueOf((String) myFormat
+								.getProperty(PULSEAUDIO_FORMAT_KEY)),
+						(int) format.getSampleRate(), format.getChannels());
 				currentFormat = format;
 				isOpen = true;
 			}
@@ -351,15 +312,15 @@
 			throw new IllegalArgumentException("Invalid format");
 		}
 
-		StreamListener openCloseListener = new StreamListener() {
+		Stream.StateListener openCloseListener = new Stream.StateListener() {
 
 			@Override
-			public void update(StreamEvent e) {
-				if (e.getType() == StreamEvent.Type.READY) {
+			public void update() {
+				if (stream.getState() == Stream.State.READY) {
 					fireLineEvent(new LineEvent(PulseAudioSourceDataLine.this,
 							LineEvent.Type.OPEN, AudioSystem.NOT_SPECIFIED));
-				} else if (e.getType() == StreamEvent.Type.TERMINATED
-						|| e.getType() == StreamEvent.Type.FAILED) {
+				} else if (stream.getState() == Stream.State.TERMINATED
+						|| stream.getState() == Stream.State.FAILED) {
 					fireLineEvent((new LineEvent(PulseAudioSourceDataLine.this,
 							LineEvent.Type.CLOSE, AudioSystem.NOT_SPECIFIED)));
 				}
@@ -367,22 +328,22 @@
 
 		};
 
-		addStreamListener(openCloseListener);
+		stream.addStateListener(openCloseListener);
 
 		final Semaphore semaphore = new Semaphore(0);
 
 		synchronized (eventLoop.threadLock) {
 
-			this.addStreamListener(new StreamListener() {
+			stream.addStateListener(new Stream.StateListener() {
 				@Override
-				public void update(StreamEvent e) {
-					if (e.getType() == StreamEvent.Type.READY) {
+				public void update() {
+					if (stream.getState() == Stream.State.READY) {
 						semaphore.release();
 					}
 				}
 			});
 
-			native_start();
+			stream.connectForPlayback(null);
 		}
 
 		try {
@@ -440,7 +401,7 @@
 		while (remainingLength != 0) {
 
 			synchronized (eventLoop.threadLock) {
-				availableSize = native_get_writable_size();
+				availableSize = stream.getWritableSize();
 				if (availableSize < 0) {
 					return sizeWritten;
 				}
@@ -448,19 +409,18 @@
 					availableSize = remainingLength;
 				}
 				/* write a little bit of the buffer */
-				native_write(data, position, availableSize);
+				stream.write(data, position, availableSize);
 
 				sizeWritten += availableSize;
 				position += availableSize;
 				remainingLength -= availableSize;
-				
 
 			}
 		}
 
 		// all the data should have been played by now
 		assert (sizeWritten == length);
-		currentFramePosition += (sizeWritten/getFormat().getFrameSize());
+		currentFramePosition += (sizeWritten / getFormat().getFrameSize());
 		/*
 		 * FIXME when the stream is flushed() etc, instead of returning length
 		 * this should unblock and return the the size of data written so far
@@ -470,7 +430,7 @@
 
 	public void start() {
 		if (isPaused) {
-			native_resume();
+			stream.cork(false);
 			isPaused = false;
 		}
 
@@ -483,7 +443,7 @@
 
 	public void stop() {
 		synchronized (eventLoop.threadLock) {
-			native_pause();
+			stream.cork(true);
 		}
 		isPaused = true;
 
@@ -497,25 +457,13 @@
 		this.lineListeners.remove(listener);
 	}
 
-	private void addStreamListener(StreamListener listener) {
-		synchronized (streamListeners) {
-			this.streamListeners.add(listener);
-		}
-	}
-
-	private void removeStreamListener(StreamListener listener) {
-		synchronized (streamListeners) {
-			this.streamListeners.remove(listener);
-		}
-	}
-
 	public boolean isOpen() {
 		return isOpen;
 	}
 
 	public int available() {
 		synchronized (eventLoop.threadLock) {
-			return native_get_writable_size();
+			return stream.getWritableSize();
 		}
 	};
 
@@ -524,26 +472,27 @@
 
 		final Semaphore semaphore = new Semaphore(0);
 
-		StreamListener closeListener = new StreamListener() {
+		Stream.StateListener closeListener = new Stream.StateListener() {
 			@Override
-			public void update(StreamEvent e) {
-				if (e.getType() == StreamEvent.Type.TERMINATED
-						|| e.getType() == StreamEvent.Type.FAILED) {
+			public void update() {
+				if (stream.getState() == Stream.State.TERMINATED
+						|| stream.getState() == Stream.State.FAILED) {
 					semaphore.release();
 				}
 			}
 		};
 
-		addStreamListener(closeListener);
+		stream.addStateListener(closeListener);
 
 		synchronized (eventLoop.threadLock) {
-			native_close();
+			stream.drain();
+			stream.disconnect();
 		}
 
 		try {
 			semaphore.acquire();
 			System.out.println("stream closed");
-			removeStreamListener(closeListener);
+			stream.removeStateListener(closeListener);
 		} catch (InterruptedException e) {
 			// throw new LineUnavailableException("unable to prepare
 			// stream");
@@ -569,7 +518,7 @@
 	}
 
 	public int getFramePosition() {
-		return (int)currentFramePosition;
+		return (int) currentFramePosition;
 	}
 
 	public float getLevel() {
@@ -583,8 +532,8 @@
 	public long getMicrosecondPosition() {
 
 		float frameRate = currentFormat.getFrameRate();
-		float time = currentFramePosition/frameRate; // seconds
-		long microseconds = (long)(time * 1000);
+		float time = currentFramePosition / frameRate; // seconds
+		long microseconds = (long) (time * 1000);
 		return microseconds;
 	}
 
@@ -640,7 +589,7 @@
 		Operation operation;
 		Operation.State operationState;
 		synchronized (eventLoop.threadLock) {
-			operation = new Operation(native_drain());
+			operation = stream.drain();
 			operationState = operation.getState();
 		}
 
@@ -659,7 +608,7 @@
 		Operation operation;
 		Operation.State operationState;
 		synchronized (eventLoop.threadLock) {
-			operation = new Operation(native_flush());
+			operation = stream.flush();
 			operationState = operation.getState();
 		}
 		// FIXME need to find a way to do a wait than a busy loop
@@ -673,50 +622,18 @@
 
 	}
 
-	public void update(int status) {
-		synchronized (eventLoop.threadLock) {
-			switch (status) {
-			case 0:
-				fireStreamEvent(new StreamEvent(StreamEvent.Type.UNCONNECTED));
-				break;
-			case 1:
-				fireStreamEvent(new StreamEvent(StreamEvent.Type.CREATING));
-				break;
-			case 2:
-				fireStreamEvent(new StreamEvent(StreamEvent.Type.READY));
-				break;
-			case 3:
-				fireStreamEvent(new StreamEvent(StreamEvent.Type.FAILED));
-				break;
-			case 4:
-				fireStreamEvent(new StreamEvent(StreamEvent.Type.TERMINATED));
-				break;
-			default:
-				assert ("not supposed to happen".indexOf("false") >= 0);
-			}
-		}
-	}
-
 	private void fireLineEvent(LineEvent e) {
 		for (LineListener lineListener : lineListeners) {
 			lineListener.update(e);
 		}
 	}
 
-	private void fireStreamEvent(StreamEvent e) {
-		synchronized (streamListeners) {
-			for (StreamListener streamListener : streamListeners) {
-				streamListener.update(e);
-			}
-		}
-	}
-
 	protected EventLoop getEventLoop() {
 		return this.eventLoop;
 	}
 
-	public long getStreamPointer() {
-		return streamPointer;
+	public Stream getStream() {
+		return stream;
 	}
 
 }
--- a/src/java/org/classpath/icedtea/pulseaudio/PulseAudioStreamVolumeControl.java	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/PulseAudioStreamVolumeControl.java	Fri Aug 15 10:26:12 2008 -0400
@@ -28,7 +28,7 @@
 
 	protected PulseAudioStreamVolumeControl(PulseAudioSourceDataLine line) {
 		super(line);
-		this.streamPointer = line.getStreamPointer();
+		this.streamPointer = line.getStream().getStreamPointer();
 		this.eventLoop = line.getEventLoop();
 		this.line = line;
 	}
--- a/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/java/org/classpath/icedtea/pulseaudio/Stream.java	Fri Aug 15 10:26:12 2008 -0400
@@ -1,6 +1,7 @@
 package org.classpath.icedtea.pulseaudio;
 
-import javax.sound.sampled.AudioFormat;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * 
@@ -10,17 +11,65 @@
  */
 public class Stream {
 
+	public interface StateListener {
+		public void update();
+	}
+
+	public interface WriteListener {
+		public void update();
+	}
+
+	public interface ReadListener {
+		public void update();
+	}
+
+	public interface OverflowListener {
+		public void update();
+	}
+
+	public interface UnderflowListener {
+		public void update();
+	}
+
+	public interface PlaybackStartedListener {
+		public void update();
+	}
+
+	public interface LatencyUpdateListener {
+		public void update();
+	}
+
+	public interface MovedListener {
+		public void update();
+	}
+
+	public interface SuspendedListener {
+		public void update();
+	}
+
 	public static enum State {
 		UNCONNECTED, CREATING, READY, FAILED, TERMINATED,
 	}
 
 	public static enum Format {
-		PA_SAMPLE_U8, PA_SAMPLE_ULAW, PA_SAMPLE_ALAW, PA_SAMPLE_S16LE, PA_SAMPLE_S16BE, PA_SAMPLE_FLOAT32LE, PA_SAMPLE_FLOAT32BE, PA_SAMPLE_S32LE, PA_SAMPLE_S32BE,
+		PA_SAMPLE_U8, PA_SAMPLE_ULAW, PA_SAMPLE_ALAW, PA_SAMPLE_S16LE, PA_SAMPLE_S16BE, PA_SAMPLE_FLOAT32LE, PA_SAMPLE_FLOAT32BE, PA_SAMPLE_S32LE, PA_SAMPLE_S32BE
 	}
 
 	@SuppressWarnings("unused")
 	private long streamPointer;
 
+	private Format format;
+
+	private List<StateListener> stateListeners;
+	private List<WriteListener> writeListeners;
+	private List<ReadListener> readListeners;
+	private List<OverflowListener> overflowListeners;
+	private List<UnderflowListener> underflowListeners;
+	private List<PlaybackStartedListener> playbackStartedListeners;
+	private List<LatencyUpdateListener> latencyUpdateListeners;
+	private List<MovedListener> movedListeners;
+	private List<SuspendedListener> suspendedListeners;
+
 	private native void native_pa_stream_new(long contextPointer, String name,
 			String format, int sampleRate, int channels);
 
@@ -65,32 +114,7 @@
 	/*
 	 * Drain a playback stream. pa_operation pa_stream_update_timing_info
 	 * (pa_stream *p, pa_stream_success_cb_t cb, void *userdata) Request a
-	 * timing info structure update for a stream. void
-	 * pa_stream_set_state_callback (pa_stream *s, pa_stream_notify_cb_t cb,
-	 * void *userdata) Set the callback function that is called whenever the
-	 * state of the stream changes. void pa_stream_set_write_callback (pa_stream
-	 * *p, pa_stream_request_cb_t cb, void *userdata) Set the callback function
-	 * that is called when new data may be written to the stream. void
-	 * pa_stream_set_read_callback (pa_stream *p, pa_stream_request_cb_t cb,
-	 * void *userdata) Set the callback function that is called when new data is
-	 * available from the stream. void pa_stream_set_overflow_callback
-	 * (pa_stream *p, pa_stream_notify_cb_t cb, void *userdata) Set the callback
-	 * function that is called when a buffer overflow happens. void
-	 * pa_stream_set_underflow_callback (pa_stream *p, pa_stream_notify_cb_t cb,
-	 * void *userdata) Set the callback function that is called when a buffer
-	 * underflow happens. void pa_stream_set_started_callback (pa_stream *p,
-	 * pa_stream_notify_cb_t cb, void *userdata) Set the callback function that
-	 * is called when a the server starts playback after an underrun or on
-	 * initial startup. void pa_stream_set_latency_update_callback (pa_stream
-	 * *p, pa_stream_notify_cb_t cb, void *userdata) Set the callback function
-	 * that is called whenever a latency information update happens. void
-	 * pa_stream_set_moved_callback (pa_stream *p, pa_stream_notify_cb_t cb,
-	 * void *userdata) Set the callback function that is called whenever the
-	 * stream is moved to a different sink/source. void
-	 * pa_stream_set_suspended_callback (pa_stream *p, pa_stream_notify_cb_t cb,
-	 * void *userdata) Set the callback function that is called whenever the
-	 * sink/source this stream is connected to is suspended or resumed.
-	 * 
+	 * timing info structure update for a stream.
 	 */
 
 	private native long native_pa_stream_cork(int b);
@@ -146,11 +170,109 @@
 
 	public Stream(long contextPointer, String name, Format format,
 			int sampleRate, int channels) {
-		System.out.println("format: "+ format.toString());
+		System.out.println("format: " + format.toString());
+
+		stateListeners = new LinkedList<StateListener>();
+		writeListeners = new LinkedList<WriteListener>();
+		readListeners = new LinkedList<ReadListener>();
+		overflowListeners = new LinkedList<OverflowListener>();
+		underflowListeners = new LinkedList<UnderflowListener>();
+		playbackStartedListeners = new LinkedList<PlaybackStartedListener>();
+		latencyUpdateListeners = new LinkedList<LatencyUpdateListener>();
+		movedListeners = new LinkedList<MovedListener>();
+		suspendedListeners = new LinkedList<SuspendedListener>();
+
+		this.format = format;
+
 		native_pa_stream_new(contextPointer, name, format.toString(),
 				sampleRate, channels);
 	}
 
+	public void addStateListener(StateListener listener) {
+		synchronized (stateListeners) {
+			stateListeners.add(listener);
+		}
+	}
+
+	public void removeStateListener(StateListener listener) {
+		synchronized (stateListeners) {
+			stateListeners.remove(listener);
+		}
+
+	}
+
+	public void addWriteListener(WriteListener listener) {
+		synchronized (writeListeners) {
+			writeListeners.add(listener);
+		}
+	}
+
+	public void removeWriteListener(WriteListener listener) {
+		synchronized (writeListeners) {
+			writeListeners.remove(listener);
+		}
+	}
+
+	public void addReadListener(ReadListener listener) {
+		synchronized (readListeners) {
+			readListeners.add(listener);
+		}
+	}
+
+	public void removeReadListener(ReadListener listener) {
+		synchronized (readListeners) {
+			readListeners.remove(listener);
+		}
+	}
+
+	public void addOverflowListener(OverflowListener listener) {
+		overflowListeners.add(listener);
+	}
+
+	public void removeOverflowListener(OverflowListener listener) {
+		overflowListeners.remove(listener);
+	}
+
+	public void addUnderflowListener(UnderflowListener listener) {
+		underflowListeners.add(listener);
+	}
+
+	public void removeUnderflowListener(UnderflowListener listener) {
+		underflowListeners.remove(listener);
+	}
+
+	public void addPlaybackStartedListener(PlaybackStartedListener listener) {
+		playbackStartedListeners.add(listener);
+	}
+
+	public void removePlaybackStartedListener(PlaybackStartedListener listener) {
+		playbackStartedListeners.remove(listener);
+	}
+
+	public void addLatencyUpdateListener(LatencyUpdateListener listener) {
+		latencyUpdateListeners.add(listener);
+	}
+
+	public void removeLatencyUpdateListener(LatencyUpdateListener listener) {
+		latencyUpdateListeners.remove(listener);
+	}
+
+	public void addMovedListener(MovedListener listener) {
+		movedListeners.add(listener);
+	}
+
+	public void removeMovedListener(MovedListener listener) {
+		movedListeners.remove(listener);
+	}
+
+	public void addSuspendedListener(SuspendedListener listener) {
+		suspendedListeners.add(listener);
+	}
+
+	public void removeSuspendedListener(SuspendedListener listener) {
+		suspendedListeners.remove(listener);
+	}
+
 	public Stream.State getState() {
 		int state = native_pa_stream_get_state();
 		switch (state) {
@@ -214,22 +336,25 @@
 
 		int returnValue = native_pa_stream_connect_playback(deviceName, 0, 0,
 				0, 0);
+		assert(returnValue == 0);
 	}
 
 	/**
 	 * Connect the stream to a source.
 	 * 
 	 */
-	void connectForRecording(String deviceName) {
+	public void connectForRecording(String deviceName) {
 		int returnValue = native_pa_stream_connect_record(deviceName, 0, 0);
+		assert(returnValue == 0);
 	}
 
 	/**
 	 * Disconnect a stream from a source/sink.
 	 */
-	void disconnect() {
+	public void disconnect() {
 		int returnValue = native_pa_stream_disconnect();
-	}
+		assert(returnValue == 0);
+	}	
 
 	/**
 	 * Write data to the server
@@ -238,7 +363,7 @@
 	 * @param length
 	 * @return
 	 */
-	int write(byte[] data, int offset, int length) {
+	public int write(byte[] data, int offset, int length) {
 		return native_pa_stream_write(data, offset, length);
 	}
 
@@ -256,7 +381,7 @@
 	 * 
 	 * Remove the current fragment on record streams.
 	 */
-	void drop() {
+	public void drop() {
 
 	}
 
@@ -283,7 +408,7 @@
 	 * 
 	 * @return
 	 */
-	Operation drain() {
+	public Operation drain() {
 		Operation drainOperation = new Operation(native_pa_stream_drain());
 		return drainOperation;
 	}
@@ -291,54 +416,100 @@
 	/**
 	 * this function is called whenever the state changes
 	 */
-	void stateCallback() {
-
+	@SuppressWarnings("unused")
+	private void stateCallback() {
+		synchronized (stateListeners) {
+			for (StateListener listener : stateListeners) {
+				listener.update();
+			}
+		}
 	}
 
-	void writeCallback() {
-
+	@SuppressWarnings("unused")
+	private void writeCallback() {
+		synchronized (writeListeners) {
+			for (WriteListener listener : writeListeners) {
+				listener.update();
+			}
+		}
 	}
 
-	void readCallback() {
-
+	@SuppressWarnings("unused")
+	private void readCallback() {
+		synchronized (readListeners) {
+			for (ReadListener listener : readListeners) {
+				listener.update();
+			}
+		}
 	}
 
-	void overflowCallback() {
-
+	@SuppressWarnings("unused")
+	private void overflowCallback() {
+		synchronized (overflowListeners) {
+			for (OverflowListener listener : overflowListeners) {
+				listener.update();
+			}
+		}
 	}
 
-	void underflowCallback() {
-
+	@SuppressWarnings("unused")
+	private void underflowCallback() {
+		synchronized (underflowListeners) {
+			for (UnderflowListener listener : underflowListeners) {
+				listener.update();
+			}
+		}
 	}
 
 	/**
 	 * callback function that is called when a the server starts playback after
 	 * an underrun or on initial startup
 	 */
-	void playbackStartedCallback() {
-
+	@SuppressWarnings("unused")
+	private void playbackStartedCallback() {
+		assert(false);
+		synchronized (playbackStartedListeners) {
+			for (PlaybackStartedListener listener : playbackStartedListeners) {
+				listener.update();
+			}
+		}
 	}
 
 	/**
 	 * called whenever a latency information update happens
 	 */
-	void latencyUpdateCallback() {
-
+	@SuppressWarnings("unused")
+	private void latencyUpdateCallback() {
+		synchronized (latencyUpdateListeners) {
+			for (LatencyUpdateListener listener : latencyUpdateListeners) {
+				listener.update();
+			}
+		}
 	}
 
 	/**
 	 * whenever the stream is moved to a different sink/source
 	 */
-	void movedCallback() {
-
+	@SuppressWarnings("unused")
+	private void movedCallback() {
+		synchronized (movedListeners) {
+			for (MovedListener listener : movedListeners) {
+				listener.update();
+			}
+		}
 	}
 
 	/**
 	 * whenever the sink/source this stream is connected to is suspended or
 	 * resumed
 	 */
-	void suspendedCallback() {
-
+	@SuppressWarnings("unused")
+	private void suspendedCallback() {
+		synchronized (suspendedListeners) {
+			for (SuspendedListener listener : suspendedListeners) {
+				listener.update();
+			}
+		}
 	}
 
 	/**
@@ -347,18 +518,26 @@
 	 * @param cork
 	 * @return
 	 */
-	Operation cork(boolean cork) {
+	public Operation cork(boolean cork) {
 		int yes = cork ? 1 : 0;
 		Operation corkOperation = new Operation(native_pa_stream_cork(yes));
 		return corkOperation;
 	}
 
+	public Operation cork() {
+		return cork(true);
+	}
+
+	public Operation unCork() {
+		return cork(false);
+	}
+
 	/**
 	 * Flush the playback buffer of this stream.
 	 * 
 	 * @return
 	 */
-	Operation flush() {
+	public Operation flush() {
 		Operation flushOperation = new Operation(native_pa_stream_flush());
 		return flushOperation;
 	}
@@ -373,7 +552,7 @@
 	/**
 	 * Request immediate start of playback on this stream.
 	 */
-	Operation triggerStart() {
+	public Operation triggerStart() {
 		Operation triggerOperation = new Operation(native_pa_stream_trigger());
 		return triggerOperation;
 	}
@@ -384,13 +563,13 @@
 	 * @param name
 	 * @return
 	 */
-	Operation setName(String name) {
+	public Operation setName(String name) {
 		Operation setNameOperation = new Operation(
 				native_pa_stream_set_name(name));
 		return setNameOperation;
 	}
 
-	long getTimeInMicroseconds() {
+	public long getTimeInMicroseconds() {
 		return -1;
 	}
 
@@ -398,7 +577,7 @@
 	/**
 	 * @returns the total stream latency
 	 */
-	int getLatency() {
+	public int getLatency() {
 		return -1;
 	}
 
@@ -408,8 +587,8 @@
 	 * 
 	 */
 
-	public AudioFormat getSampleSpec() {
-		return null;
+	public Format getFormat() {
+		return format;
 	}
 
 	/*
@@ -438,4 +617,9 @@
 	 * Operation updateSampleRate(int rate) { Operation updateOp = new
 	 * Operation(native_pa_stream_) }
 	 */
+
+	public long getStreamPointer() {
+		return streamPointer;
+	}
+
 }
--- a/src/native/jni-common.c	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/native/jni-common.c	Fri Aug 15 10:26:12 2008 -0400
@@ -85,6 +85,24 @@
 	(*env)->SetLongField(env, obj, fid, value);
 }
 
+void callJavaVoidMethod(JNIEnv* env, jobject obj, const char* method_name) {
+
+	jclass cls = (*env)->GetObjectClass(env, obj);
+	if (cls == NULL) {
+		printf("unable to get class of object");
+		return;
+	}
+	jmethodID mid = (*env)->GetMethodID(env, cls, method_name, "()V");
+	if (mid == NULL) {
+		printf("unable to get method %s\n", method_name);
+		return;
+
+	}
+	(*env)->CallVoidMethod(env, obj, mid);
+
+	return;
+
+}
 
 /*
  * 
@@ -94,7 +112,6 @@
  * 
  */
 
-
 void* getJavaPointer(JNIEnv* env, jobject obj, char* name) {
 
 	/*
@@ -109,7 +126,7 @@
 	 long a_long = 0x102030;
 	 long long a_long_long = 0x1020304150607080;
 	 printf("size of int: %d\nsize of long: %d\nsize of long long: %d\n", 
-	         sizeof(int), sizeof(long), sizeof(long long));
+	 sizeof(int), sizeof(long), sizeof(long long));
 	 printf("int to long long: %X -> %llX\n", a_int, (long long) a_int);
 	 printf("long long to int: %llX -> %X\n", a_long_long, (int)a_long_long); 
 
@@ -175,5 +192,5 @@
 
 jlong convertPointerToJavaLong(void* pointer) {
 	return (long) pointer;
-	
+
 }
--- a/src/native/jni-common.h	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/native/jni-common.h	Fri Aug 15 10:26:12 2008 -0400
@@ -65,10 +65,13 @@
 jlong getJavaLongField(JNIEnv* env, jobject obj, char* name);
 void setJavaLongField(JNIEnv* env, jobject, char* name, jlong value);
 
+void callJavaVoidMethod(JNIEnv* env, jobject obj, const char* method_name);
+
 void throwByName(JNIEnv* const env, const char* const name,
 		const char* const msg);
 
 
+
 void* getJavaPointer(JNIEnv* env, jobject obj, char* name);
 void setJavaPointer(JNIEnv* env, jobject obj, char*name, void* pointer_value);
 
--- a/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Thu Aug 14 14:24:17 2008 -0400
+++ b/src/native/org_classpath_icedtea_pulseaudio_Stream.c	Fri Aug 15 10:26:12 2008 -0400
@@ -4,6 +4,153 @@
 #include <pulse/pulseaudio.h>
 #include <string.h>
 
+typedef struct java_context {
+	JNIEnv* env;
+	jobject obj;
+} java_context;
+
+extern JNIEnv* pulse_thread_env;
+
+static void stream_state_callback(pa_stream* stream, void *userdata) {
+	printf("stream_state_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "stateCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "stateCallback");
+	}
+
+}
+
+static void stream_write_callback(pa_stream *stream, size_t length,
+		void *userdata) {
+	//	printf("stream_write_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "writeCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "writeCallback");
+	}
+}
+
+static void stream_read_callback(pa_stream *stream, size_t length,
+		void *userdata) {
+	printf("stream_read_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "readCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "readCallback");
+	}
+
+}
+
+static void stream_overflow_callback(pa_stream *stream, void *userdata) {
+	printf("stream_overflow_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "overflowCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "overflowCallback");
+	}
+}
+
+static void stream_underflow_callback(pa_stream *stream, void *userdata) {
+	printf("stream_underflow_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "underflowCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "underflowCallback");
+	}
+}
+
+// requires pulseaudio 0.9.11 :(
+//static void stream_started_callback(pa_stream *p, void *userdata) {
+//	printf("stream_started_callback called\n");
+//}
+
+static void stream_latency_update_callback(pa_stream *stream, void *userdata) {
+	printf("stream_latency_update_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "latencyUpdateCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj,
+				"latencyUpdateCallback");
+	}
+}
+
+static void stream_moved_callback(pa_stream *stream, void *userdata) {
+	printf("stream_moved_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "movedCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "movedCallback");
+	}
+
+}
+
+static void stream_suspended_callback(pa_stream *stream, void *userdata) {
+	printf("stream_suspended_callback called\n");
+
+	java_context* context = userdata;
+	assert(stream);
+	assert(context);
+	assert(context->env);
+	assert(context->obj);
+
+	if (pa_stream_get_state(stream) == PA_STREAM_CREATING) {
+		callJavaVoidMethod(context->env, context->obj, "suspendedCallback");
+	} else {
+		callJavaVoidMethod(pulse_thread_env, context->obj, "suspendedCallback");
+	}
+
+}
+
 /*
  * Class:     org_classpath_icedtea_pulseaudio_Stream
  * Method:    native_pa_stream_new
@@ -12,6 +159,10 @@
 JNIEXPORT void JNICALL Java_org_classpath_icedtea_pulseaudio_Stream_native_1pa_1stream_1new
 (JNIEnv* env, jobject obj, jlong contextPointer, jstring nameString, jstring encodingString, jint sampleRate, jint channels) {
 
+	java_context* j_context = malloc(sizeof(java_context));
+	j_context->env = env;
+	j_context->obj = (*env)->NewGlobalRef(env, obj);
+
 	pa_context* context = convertJavaLongToPointer(contextPointer);
 	assert(context);
 	const char* name = NULL;
@@ -24,11 +175,11 @@
 
 	const char *encoding = (*env)->GetStringUTFChars(env, encodingString, NULL);
 	if( encoding == NULL) {
-		return;		//oome thrown
+		return; //oome thrown
 	}
 
 	pa_sample_spec sample_spec;
-	
+
 	if (strcmp(encoding, "PA_SAMPLE_U8") == 0) {
 		sample_spec.format = PA_SAMPLE_U8;
 	} else if (strcmp(encoding, "PA_SAMPLE_ALAW") == 0) {
@@ -68,8 +219,26 @@
 	if (name) {
 		(*env)->ReleaseStringUTFChars(env, nameString,name);
 	}
+
 	setJavaPointer(env, obj, "streamPointer", stream);
 
+	/*
+	 * 
+	 * The stream has been created; now setup the callbacks 
+	 * so we can do somethig about them
+	 * 
+	 */
+
+	pa_stream_set_state_callback (stream, stream_state_callback, j_context);
+	pa_stream_set_write_callback (stream, stream_write_callback, j_context);
+	pa_stream_set_read_callback (stream, stream_read_callback, j_context);
+	pa_stream_set_overflow_callback (stream, stream_overflow_callback, j_context);
+	pa_stream_set_underflow_callback (stream, stream_underflow_callback, j_context);
+	//	pa_stream_set_started_callback (stream, stream_started_callback, j_context);
+	pa_stream_set_latency_update_callback (stream, stream_latency_update_callback, j_context);
+	pa_stream_set_moved_callback (stream, stream_moved_callback, j_context);
+	pa_stream_set_suspended_callback (stream, stream_suspended_callback, j_context);
+
 }
 
 /*