KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > maverick > multiplex > channels > EventStreamServerChannel


1 package com.maverick.multiplex.channels;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.OutputStream JavaDoc;
6
7 import org.apache.commons.logging.Log;
8 import org.apache.commons.logging.LogFactory;
9
10 import com.maverick.multiplex.Channel;
11 import com.maverick.multiplex.ChannelListener;
12 import com.maverick.multiplex.ChannelOpenException;
13 import com.maverick.util.ByteArrayReader;
14 import com.maverick.util.IOUtil;
15
16 /**
17 NOT YET USED - CONSIDER REMOVING ONCE CONVERSE IS RELEASED
18  */

19 public class EventStreamServerChannel extends Channel implements Runnable JavaDoc, StreamServerChannel {
20     final static Log log = LogFactory.getLog(EventStreamServerChannel.class);
21
22     /**
23      * Channel type identifier
24      */

25     public static final String JavaDoc CHANNEL_TYPE = "stream@3sp.com";
26
27     StreamManager streamManager;
28     EventStreamServerChannel joinedChannel;
29     String JavaDoc id;
30     boolean initiator;
31     Object JavaDoc joinLock = new Object JavaDoc();
32
33     /**
34      * Constructor.
35      *
36      * @param service service
37      *
38      */

39     public EventStreamServerChannel(StreamManager service) {
40         super(CHANNEL_TYPE, 32768, 32768);
41         System.out.println("Creating receiver channel");
42         this.streamManager = service;
43     }
44
45     /**
46      * Get the stream ID.
47      *
48      * @return
49      */

50     public String JavaDoc getId() {
51         return id;
52     }
53
54     /*
55      * (non-Javadoc)
56      *
57      * @see com.maverick.multiplex.Channel#open(byte[])
58      */

59     public byte[] open(byte[] data) throws IOException JavaDoc, ChannelOpenException {
60
61         ByteArrayReader reader = new ByteArrayReader(data);
62         initiator = reader.readBoolean();
63         id = reader.readString();
64
65         if (initiator) {
66             System.out.println("Opening intiator channel");
67             if (streamManager.containsChannel(id, true)) {
68                 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED,
69                                 "Cannot create channel (as initiator) when channel with same ID already exists.");
70             }
71             streamManager.putChannel(this);
72             System.out.println("Opened intiator channel");
73         } else {
74             System.out.println("Recipient channel");
75             streamManager.putChannel(this);
76             EventStreamServerChannel channelToJoin = (EventStreamServerChannel)streamManager.getChannel(id, true);
77             joinedChannel = channelToJoin;
78             channelToJoin.join(this);
79             System.out.println("Opened recipient channel");
80         }
81         return null;
82
83     }
84
85     /*
86      * (non-Javadoc)
87      *
88      * @see com.maverick.multiplex.Channel#onChannelData(byte[], int, int)
89      */

90     public void onChannelData(byte[] buf, int off, int len) {
91     }
92
93     /*
94      * (non-Javadoc)
95      *
96      * @see com.maverick.multiplex.Channel#create()
97      */

98     public byte[] create() throws IOException JavaDoc {
99         return null;
100     }
101
102     /*
103      * (non-Javadoc)
104      *
105      * @see com.maverick.multiplex.Channel#onChannelOpen(byte[])
106      */

107     public void onChannelOpen(byte[] data) {
108         if(initiator) {
109             System.out.println("Initiator so joining streams");
110             Thread JavaDoc t = new Thread JavaDoc(this, "Stream" + id + "-" + (initiator ? "Initiator" : "Recipient"));
111             t.start();
112         }
113     }
114
115     /*
116      * (non-Javadoc)
117      *
118      * @see com.maverick.multiplex.Channel#onChannelClose()
119      */

120     public void onChannelClose() {
121         streamManager.removeChannel(this);
122     }
123
124     public void run() {
125         waitForJoin();
126         
127         Thread JavaDoc t = new Thread JavaDoc(Thread.currentThread().getName() + "-In-Out") {
128             public void run() {
129                 InputStream JavaDoc in = joinedChannel.getInputStream();
130                 OutputStream JavaDoc out = getOutputStream();
131                 try {
132                     IOUtil.copy(in, out);
133                 } catch (IOException JavaDoc e) {
134                     e.printStackTrace();
135                 }
136                 finally {
137                     IOUtil.closeStream(in);
138                     IOUtil.closeStream(out);
139                 }
140             }
141         };
142         t.start();
143         InputStream JavaDoc in = getInputStream();
144         OutputStream JavaDoc out = joinedChannel.getOutputStream();
145         try {
146             IOUtil.copy(in, out);
147         } catch (IOException JavaDoc e) {
148             log.error("Failed to join streams.", e);
149         }
150         finally {
151             IOUtil.closeStream(in);
152             IOUtil.closeStream(out);
153         }
154     }
155
156     public synchronized void join(EventStreamServerChannel joinedChannel) {
157         System.out.println("Joining recipients " + joinedChannel.getId() + " to this channel (" + getId() + ")");
158         if (this.joinedChannel != null) {
159             throw new IllegalStateException JavaDoc("Already joined.");
160         }
161         this.joinedChannel = joinedChannel;
162         synchronized (joinLock) {
163             joinLock.notifyAll();
164         }
165     }
166
167     public void waitForJoin() {
168         synchronized (joinLock) {
169             System.out.println("Waiting for recipient to join");
170             while (joinedChannel == null) {
171                 try {
172                     joinLock.wait(1000);
173                 } catch (InterruptedException JavaDoc e) {
174                 }
175             }
176             System.out.println("Recipient joined");
177         }
178
179     }
180
181     public boolean isInitiator() {
182         return initiator;
183     }
184     
185     
186     class BridgeListener implements ChannelListener {
187         
188         EventStreamServerChannel bridgedChannel;
189         
190         BridgeListener(EventStreamServerChannel channel) {
191             this.bridgedChannel = bridgedChannel;
192         }
193         
194         public void onChannelClose(Channel channel) {
195         }
196
197         public void onChannelData(Channel channel, byte[] buf, int off, int len) {
198             
199         }
200
201         public void onChannelOpen(Channel channel) {
202         }
203     }
204 }
205
Popular Tags