1 package com.maverick.multiplex.channels; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.io.OutputStream ; 6 7 import org.apache.commons.logging.Log; 8 import org.apache.commons.logging.LogFactory; 9 10 import com.maverick.multiplex.Channel; 11 import com.maverick.multiplex.ChannelListener; 12 import com.maverick.multiplex.ChannelOpenException; 13 import com.maverick.util.ByteArrayReader; 14 import com.maverick.util.IOUtil; 15 16 19 public class EventStreamServerChannel extends Channel implements Runnable , StreamServerChannel { 20 final static Log log = LogFactory.getLog(EventStreamServerChannel.class); 21 22 25 public static final String CHANNEL_TYPE = "stream@3sp.com"; 26 27 StreamManager streamManager; 28 EventStreamServerChannel joinedChannel; 29 String id; 30 boolean initiator; 31 Object joinLock = new Object (); 32 33 39 public EventStreamServerChannel(StreamManager service) { 40 super(CHANNEL_TYPE, 32768, 32768); 41 System.out.println("Creating receiver channel"); 42 this.streamManager = service; 43 } 44 45 50 public String getId() { 51 return id; 52 } 53 54 59 public byte[] open(byte[] data) throws IOException , ChannelOpenException { 60 61 ByteArrayReader reader = new ByteArrayReader(data); 62 initiator = reader.readBoolean(); 63 id = reader.readString(); 64 65 if (initiator) { 66 System.out.println("Opening intiator channel"); 67 if (streamManager.containsChannel(id, true)) { 68 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, 69 "Cannot create channel (as initiator) when channel with same ID already exists."); 70 } 71 streamManager.putChannel(this); 72 System.out.println("Opened intiator channel"); 73 } else { 74 System.out.println("Recipient channel"); 75 streamManager.putChannel(this); 76 EventStreamServerChannel channelToJoin = (EventStreamServerChannel)streamManager.getChannel(id, true); 77 joinedChannel = channelToJoin; 78 channelToJoin.join(this); 79 System.out.println("Opened recipient channel"); 80 } 81 return null; 82 83 } 84 85 90 public void onChannelData(byte[] buf, int off, int len) { 91 } 92 93 98 public byte[] create() throws IOException { 99 return null; 100 } 101 102 107 public void onChannelOpen(byte[] data) { 108 if(initiator) { 109 System.out.println("Initiator so joining streams"); 110 Thread t = new Thread (this, "Stream" + id + "-" + (initiator ? "Initiator" : "Recipient")); 111 t.start(); 112 } 113 } 114 115 120 public void onChannelClose() { 121 streamManager.removeChannel(this); 122 } 123 124 public void run() { 125 waitForJoin(); 126 127 Thread t = new Thread (Thread.currentThread().getName() + "-In-Out") { 128 public void run() { 129 InputStream in = joinedChannel.getInputStream(); 130 OutputStream out = getOutputStream(); 131 try { 132 IOUtil.copy(in, out); 133 } catch (IOException e) { 134 e.printStackTrace(); 135 } 136 finally { 137 IOUtil.closeStream(in); 138 IOUtil.closeStream(out); 139 } 140 } 141 }; 142 t.start(); 143 InputStream in = getInputStream(); 144 OutputStream out = joinedChannel.getOutputStream(); 145 try { 146 IOUtil.copy(in, out); 147 } catch (IOException e) { 148 log.error("Failed to join streams.", e); 149 } 150 finally { 151 IOUtil.closeStream(in); 152 IOUtil.closeStream(out); 153 } 154 } 155 156 public synchronized void join(EventStreamServerChannel joinedChannel) { 157 System.out.println("Joining recipients " + joinedChannel.getId() + " to this channel (" + getId() + ")"); 158 if (this.joinedChannel != null) { 159 throw new IllegalStateException ("Already joined."); 160 } 161 this.joinedChannel = joinedChannel; 162 synchronized (joinLock) { 163 joinLock.notifyAll(); 164 } 165 } 166 167 public void waitForJoin() { 168 synchronized (joinLock) { 169 System.out.println("Waiting for recipient to join"); 170 while (joinedChannel == null) { 171 try { 172 joinLock.wait(1000); 173 } catch (InterruptedException e) { 174 } 175 } 176 System.out.println("Recipient joined"); 177 } 178 179 } 180 181 public boolean isInitiator() { 182 return initiator; 183 } 184 185 186 class BridgeListener implements ChannelListener { 187 188 EventStreamServerChannel bridgedChannel; 189 190 BridgeListener(EventStreamServerChannel channel) { 191 this.bridgedChannel = bridgedChannel; 192 } 193 194 public void onChannelClose(Channel channel) { 195 } 196 197 public void onChannelData(Channel channel, byte[] buf, int off, int len) { 198 199 } 200 201 public void onChannelOpen(Channel channel) { 202 } 203 } 204 } 205 | Popular Tags |