KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > maverick > multiplex > channels > ThreadedStreamServerChannel


1 package com.maverick.multiplex.channels;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.io.OutputStream JavaDoc;
6
7 import com.maverick.multiplex.Channel;
8 import com.maverick.multiplex.ChannelOpenException;
9 import com.maverick.util.ByteArrayReader;
10 import com.maverick.util.IOUtil;
11
12 /**
13  * Channel implementation used for download files required by the <i>Agent</i>
14  * to launch an <i>Application Shortcut</i>.
15  *
16  * @author Lee David Painter <a HREF="mailto: lee@3sp.com">&lt;lee@3sp.com&gt;</a>
17  */

18 public class ThreadedStreamServerChannel extends Channel implements Runnable JavaDoc, StreamServerChannel {
19
20     // #ifdef DEBUG
21
static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThreadedStreamServerChannel.class);
22     // #endif
23

24     /**
25      * Channel type identifier
26      */

27     public static final String JavaDoc CHANNEL_TYPE = "stream@3sp.com";
28
29     StreamManager streamManager;
30     ThreadedStreamServerChannel joinedChannel;
31     String JavaDoc id;
32     boolean initiator;
33     Object JavaDoc joinLock = new Object JavaDoc();
34     boolean closed = false;
35
36     /**
37      * Constructor.
38      *
39      * @param service service
40      *
41      */

42     public ThreadedStreamServerChannel(StreamManager service) {
43         super(CHANNEL_TYPE, 32768, 32768);
44         // #ifdef DEBUG
45
log.debug("Creating channel");
46         // #endif
47
this.streamManager = service;
48     }
49
50     /**
51      * Get the stream ID.
52      *
53      * @return
54      */

55     public String JavaDoc getId() {
56         return id;
57     }
58
59     /*
60      * (non-Javadoc)
61      *
62      * @see com.maverick.multiplex.Channel#open(byte[])
63      */

64     public byte[] open(byte[] data) throws IOException JavaDoc, ChannelOpenException {
65
66         ByteArrayReader reader = new ByteArrayReader(data);
67         initiator = reader.readBoolean();
68         id = reader.readString();
69         // #ifdef DEBUG
70
log.debug("Opening. Id is '" + id + "', initiator is '" + initiator + "'");
71         // #endif
72

73         if (initiator) {
74             if (streamManager.containsChannel(id, true)) {
75                 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED,
76                                 "Cannot create channel (as initiator) when channel with same ID already exists.");
77             }
78             streamManager.putChannel(this);
79         } else {
80             // #ifdef DEBUG
81
log.debug("Waiting for initiator with ID '" + id + "'");
82             // #endif
83
try {
84                 streamManager.waitForInitiator(id, 20000);
85             } catch (IllegalStateException JavaDoc e) {
86                 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, "Timeout waiting for initiator");
87             } catch (InterruptedException JavaDoc e) {
88                 throw new ChannelOpenException(ChannelOpenException.CHANNEL_REFUSED, "Interrupted waiting for initiator");
89             }
90             // #ifdef DEBUG
91
log.debug("Got initiator with ID '" + id + "'");
92             // #endif
93
streamManager.putChannel(this);
94             ThreadedStreamServerChannel channelToJoin = (ThreadedStreamServerChannel) streamManager.getChannel(id, true);
95             joinedChannel = channelToJoin;
96             channelToJoin.join(this);
97         }
98         // #ifdef DEBUG
99
log.debug("Opened channel '" + id + "'");
100         // #endif
101
return null;
102
103     }
104
105     /*
106      * (non-Javadoc)
107      *
108      * @see com.maverick.multiplex.Channel#onChannelData(byte[], int, int)
109      */

110     public void onChannelData(byte[] buf, int off, int len) {
111     }
112
113     /*
114      * (non-Javadoc)
115      *
116      * @see com.maverick.multiplex.Channel#create()
117      */

118     public byte[] create() throws IOException JavaDoc {
119         return null;
120     }
121
122     /*
123      * (non-Javadoc)
124      *
125      * @see com.maverick.multiplex.Channel#onChannelOpen(byte[])
126      */

127     public void onChannelOpen(byte[] data) {
128         if (initiator) {
129             // #ifdef DEBUG
130
log.debug("Stream '" + id + "' is initiator so joining streams");
131             // #endif
132
Thread JavaDoc t = new Thread JavaDoc(this, "Stream" + id + "-" + (initiator ? "Initiator" : "Recipient"));
133             t.start();
134         } else {
135             // #ifdef DEBUG
136
log.debug("Stream '" + id + "' is NOT initiator so doing nothing");
137             // #endif
138
}
139     }
140
141     /*
142      * (non-Javadoc)
143      *
144      * @see com.maverick.multiplex.Channel#onChannelClose()
145      */

146     public synchronized void onChannelClose() {
147         // #ifdef DEBUG
148
log.debug("Stream channel " + getId() + " closed");
149         closed = true;
150         synchronized (joinLock) {
151             joinLock.notifyAll();
152         }
153         // #endif
154
}
155
156     public void run() {
157         try {
158             waitForJoin();
159             if (closed) {
160                 return;
161             }
162
163             Thread JavaDoc t = new Thread JavaDoc(Thread.currentThread().getName() + "-In-Out") {
164                 public void run() {
165                     InputStream JavaDoc in = joinedChannel.getInputStream();
166                     OutputStream JavaDoc out = getOutputStream();
167                     try {
168                         IOUtil.copy(in, out);
169                     } catch (IOException JavaDoc e) {
170                     } finally {
171                         if (!ThreadedStreamServerChannel.this.isClosed()) {
172                             ThreadedStreamServerChannel.this.close();
173                         }
174                         if (!joinedChannel.isClosed()) {
175                             ThreadedStreamServerChannel.this.close();
176                         }
177                         // // #ifdef DEBUG
178
// log.debug("Closing input stream");
179
// // #endif
180
// IOUtil.closeStream(in);
181
// // #ifdef DEBUG
182
// log.debug("Closing out stream");
183
// // #endif
184
// IOUtil.closeStream(out);
185
// // #ifdef DEBUG
186
// log.debug("Closed streams");
187
// // #endif
188
}
189                 }
190             };
191             t.start();
192             InputStream JavaDoc in = getInputStream();
193             OutputStream JavaDoc out = joinedChannel.getOutputStream();
194             IOUtil.copy(in, out);
195         } catch (IOException JavaDoc e) {
196             // #ifdef DEBUG
197
log.error("Failed to join streams.", e);
198             // #endif
199
} finally {
200             if (!ThreadedStreamServerChannel.this.isClosed()) {
201                 ThreadedStreamServerChannel.this.close();
202             }
203             if (joinedChannel != null && !joinedChannel.isClosed()) {
204                 ThreadedStreamServerChannel.this.close();
205             }
206             // // #ifdef DEBUG
207
// log.debug("Closing input stream");
208
// // #endif
209
// IOUtil.closeStream(in);
210
// // #ifdef DEBUG
211
// log.debug("Closing out stream");
212
// // #endif
213
// IOUtil.closeStream(out);
214
// // #ifdef DEBUG
215
// log.debug("Closed streams");
216
// // #endif
217
if (streamManager.containsChannel(this.getId(), initiator)) {
218                 // #ifdef DEBUG
219
log.info("Removing stream channel " + getId() + "/" + initiator);
220                 streamManager.removeChannel(this);
221                 // #endif
222
}
223         }
224     }
225
226     public synchronized void join(ThreadedStreamServerChannel joinedChannel) {
227         if (initiator == joinedChannel.initiator) {
228             throw new IllegalArgumentException JavaDoc("Cannot both be initiators");
229         }
230
231         // #ifdef DEBUG
232
log.debug("Joining this channel ('" + id + "') to '" + joinedChannel.getId());
233         // #endif
234
if (this.joinedChannel != null) {
235             throw new IllegalStateException JavaDoc("Already joined.");
236         }
237         this.joinedChannel = joinedChannel;
238         synchronized (joinLock) {
239             // #ifdef DEBUG
240
log.debug("Notifying channel '" + id + "' joined");
241             // #endif
242
joinLock.notifyAll();
243             // #ifdef DEBUG
244
log.debug("Notified channel '" + id + "' joined");
245             // #endif
246
}
247     }
248
249     public void waitForJoin() {
250         synchronized (joinLock) {
251             // #ifdef DEBUG
252
log.debug("Channel '" + id + "' waiting to be joined");
253             // #endif
254
while (joinedChannel == null && !closed) {
255                 try {
256                     joinLock.wait(1000);
257                 } catch (InterruptedException JavaDoc e) {
258                 }
259             }
260             // #ifdef DEBUG
261
if(joinedChannel == null) {
262                 log.warn("Channel '" + id + "' closed before joined.");
263             }
264             else {
265                 log.debug("Channel '" + id + "' now joined to " + joinedChannel.getId());
266             }
267             // #endif
268
}
269
270     }
271
272     public boolean isInitiator() {
273         return initiator;
274     }
275 }
276
Popular Tags