1 package com.maverick.multiplex.channels; 2 3 import java.util.Hashtable ; 4 5 public class StreamManager { 6 7 protected Hashtable channels; 8 9 private Object initiatorWaitLock = new Object (); 10 11 public StreamManager() { 12 super(); 13 channels = new Hashtable (); 14 } 15 16 public StreamServerChannel getChannel(String id, boolean initiator) { 17 return (StreamServerChannel) channels.get(getKey(id, initiator)); 18 } 19 20 String getKey(String id, boolean initiator) { 21 return id + (initiator ? "-initiator" : "-recipient"); 22 } 23 24 public void putChannel(StreamServerChannel streamChannel) { 25 synchronized (channels) { 26 String key = getKey(streamChannel.getId(), streamChannel.isInitiator()); 27 if (channels.containsKey(key)) { 28 throw new IllegalArgumentException ("Channel already exists."); 29 } 30 channels.put(key, streamChannel); 31 synchronized (initiatorWaitLock) { 32 initiatorWaitLock.notifyAll(); 33 } 34 } 35 } 36 37 public boolean containsChannel(String id, boolean initiator) { 38 return channels.containsKey(getKey(id, initiator)); 39 } 40 41 public void removeChannel(StreamServerChannel channel) { 42 if (channels.remove(getKey(channel.getId(), channel.isInitiator())) == null) { 43 throw new IllegalArgumentException ("No such channel."); 44 } 45 } 46 47 public void waitForInitiator(String id, int timeout) throws IllegalStateException , InterruptedException { 48 long expire = timeout == -1 ? Long.MAX_VALUE : ( timeout + System.currentTimeMillis() ); 49 while (System.currentTimeMillis() < expire && !containsChannel(id, true)) { 50 synchronized (initiatorWaitLock) { 51 initiatorWaitLock.wait(1000); 52 } 53 } 54 if(!containsChannel(id, true)) { 55 throw new IllegalStateException ("Timeout waiting for initiator."); 56 } 57 } 58 59 public void interruptInitiatorWait(String id) { 60 62 } 63 64 } 65 | Popular Tags |