1 package com.maverick.multiplex.channels; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.OutputStream ; 6 7 import com.maverick.multiplex.Channel; 8 import com.maverick.multiplex.ChannelOpenException; 9 import com.maverick.util.ByteArrayReader; 10 import com.maverick.util.IOUtil; 11 12 18 public class ThreadedStreamServerChannel extends Channel implements Runnable , StreamServerChannel { 19 20 static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThreadedStreamServerChannel.class); 22 24 27 public static final String CHANNEL_TYPE = "stream@3sp.com"; 28 29 StreamManager streamManager; 30 ThreadedStreamServerChannel joinedChannel; 31 String id; 32 boolean initiator; 33 Object joinLock = new Object (); 34 boolean closed = false; 35 36 42 public ThreadedStreamServerChannel(StreamManager service) { 43 super(CHANNEL_TYPE, 32768, 32768); 44 log.debug("Creating channel"); 46 this.streamManager = service; 48 } 49 50 55 public String getId() { 56 return id; 57 } 58 59 64 public byte[] open(byte[] data) throws IOException , ChannelOpenException { 65 66 ByteArrayReader reader = new ByteArrayReader(data); 67 initiator = reader.readBoolean(); 68 id = reader.readString(); 69 log.debug("Opening. Id is '" + id + "', initiator is '" + initiator + "'"); 71 73 if (initiator) { 74 if (streamManager.containsChannel(id, true)) { 75 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, 76 "Cannot create channel (as initiator) when channel with same ID already exists."); 77 } 78 streamManager.putChannel(this); 79 } else { 80 log.debug("Waiting for initiator with ID '" + id + "'"); 82 try { 84 streamManager.waitForInitiator(id, 20000); 85 } catch (IllegalStateException e) { 86 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, "Timeout waiting for initiator"); 87 } catch (InterruptedException e) { 88 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, "Interrupted waiting for initiator"); 89 } 90 log.debug("Got initiator with ID '" + id + "'"); 92 streamManager.putChannel(this); 94 ThreadedStreamServerChannel channelToJoin = (ThreadedStreamServerChannel) streamManager.getChannel(id, true); 95 joinedChannel = channelToJoin; 96 channelToJoin.join(this); 97 } 98 log.debug("Opened channel '" + id + "'"); 100 return null; 102 103 } 104 105 110 public void onChannelData(byte[] buf, int off, int len) { 111 } 112 113 118 public byte[] create() throws IOException { 119 return null; 120 } 121 122 127 public void onChannelOpen(byte[] data) { 128 if (initiator) { 129 log.debug("Stream '" + id + "' is initiator so joining streams"); 131 Thread t = new Thread (this, "Stream" + id + "-" + (initiator ? "Initiator" : "Recipient")); 133 t.start(); 134 } else { 135 log.debug("Stream '" + id + "' is NOT initiator so doing nothing"); 137 } 139 } 140 141 146 public synchronized void onChannelClose() { 147 log.debug("Stream channel " + getId() + " closed"); 149 closed = true; 150 synchronized (joinLock) { 151 joinLock.notifyAll(); 152 } 153 } 155 156 public void run() { 157 try { 158 waitForJoin(); 159 if (closed) { 160 return; 161 } 162 163 Thread t = new Thread (Thread.currentThread().getName() + "-In-Out") { 164 public void run() { 165 InputStream in = joinedChannel.getInputStream(); 166 OutputStream out = getOutputStream(); 167 try { 168 IOUtil.copy(in, out); 169 } catch (IOException e) { 170 } finally { 171 if (!ThreadedStreamServerChannel.this.isClosed()) { 172 ThreadedStreamServerChannel.this.close(); 173 } 174 if (!joinedChannel.isClosed()) { 175 ThreadedStreamServerChannel.this.close(); 176 } 177 } 189 } 190 }; 191 t.start(); 192 InputStream in = getInputStream(); 193 OutputStream out = joinedChannel.getOutputStream(); 194 IOUtil.copy(in, out); 195 } catch (IOException e) { 196 log.error("Failed to join streams.", e); 198 } finally { 200 if (!ThreadedStreamServerChannel.this.isClosed()) { 201 ThreadedStreamServerChannel.this.close(); 202 } 203 if (joinedChannel != null && !joinedChannel.isClosed()) { 204 ThreadedStreamServerChannel.this.close(); 205 } 206 if (streamManager.containsChannel(this.getId(), initiator)) { 218 log.info("Removing stream channel " + getId() + "/" + initiator); 220 streamManager.removeChannel(this); 221 } 223 } 224 } 225 226 public synchronized void join(ThreadedStreamServerChannel joinedChannel) { 227 if (initiator == joinedChannel.initiator) { 228 throw new IllegalArgumentException ("Cannot both be initiators"); 229 } 230 231 log.debug("Joining this channel ('" + id + "') to '" + joinedChannel.getId()); 233 if (this.joinedChannel != null) { 235 throw new IllegalStateException ("Already joined."); 236 } 237 this.joinedChannel = joinedChannel; 238 synchronized (joinLock) { 239 log.debug("Notifying channel '" + id + "' joined"); 241 joinLock.notifyAll(); 243 log.debug("Notified channel '" + id + "' joined"); 245 } 247 } 248 249 public void waitForJoin() { 250 synchronized (joinLock) { 251 log.debug("Channel '" + id + "' waiting to be joined"); 253 while (joinedChannel == null && !closed) { 255 try { 256 joinLock.wait(1000); 257 } catch (InterruptedException e) { 258 } 259 } 260 if(joinedChannel == null) { 262 log.warn("Channel '" + id + "' closed before joined."); 263 } 264 else { 265 log.debug("Channel '" + id + "' now joined to " + joinedChannel.getId()); 266 } 267 } 269 270 } 271 272 public boolean isInitiator() { 273 return initiator; 274 } 275 } 276 | Popular Tags |