KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > maverick > multiplex > Channel


1 /*
2  * Created on 08-May-2005
3  *
4  * TODO To change the template for this generated file go to
5  * Window - Preferences - Java - Code Style - Code Templates
6  */

7 package com.maverick.multiplex;
8
9 import java.io.EOFException JavaDoc;
10 import java.io.IOException JavaDoc;
11 import java.io.InputStream JavaDoc;
12 import java.io.OutputStream JavaDoc;
13 import java.util.Enumeration JavaDoc;
14 import java.util.Vector JavaDoc;
15
16 import com.jcraft.jzlib.ZStream;
17
18 /**
19  *
20  * @author lee
21  *
22  * TODO To change the template for this generated type comment go to Window -
23  * Preferences - Java - Code Style - Code Templates
24  */

25 public abstract class Channel {
26
27     MultiplexedConnection connection;
28     int channelid;
29     int remoteid;
30     String JavaDoc type;
31     int timeout;
32     DataWindow remotewindow;
33     DataWindow localwindow;
34     Vector JavaDoc listeners = new Vector JavaDoc();
35     ChannelInputStream in;
36     ChannelOutputStream out;
37     int windowSequence = 0;
38     boolean isClosed;
39     boolean autoConsumeInput = false;
40     boolean compressionEnabled = false;
41     int compressionLevel = 6;
42     private ZStream compressionIn;
43     private ZStream compressionOut;
44     
45     //#ifdef DEBUG
46
org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(Channel.class);
47     //#endif
48
MessageObserver stickyMessages = new MessageObserver() {
49         public boolean wantsNotification(Message msg) {
50             switch (msg.getMessageId()) {
51                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
52                     return true;
53                 default:
54                     return false;
55             }
56         }
57     };
58
59     MessageObserver channelRequestMessages = new MessageObserver() {
60         public boolean wantsNotification(Message msg) {
61             switch (msg.getMessageId()) {
62                 case MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS:
63                 case MultiplexedConnection.MSG_CHANNEL_REQUEST_FAILURE:
64                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
65                     return true;
66                 default:
67                     return false;
68             }
69         }
70     };
71     final MessageObserver WINDOW_ADJUST_MESSAGES = new MessageObserver() {
72         public boolean wantsNotification(Message msg) {
73             switch (msg.getMessageId()) {
74                 case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
75                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
76                     return true;
77                 default:
78                     return false;
79             }
80         }
81     };
82
83     final MessageObserver CHANNEL_CLOSE_MESSAGES = new MessageObserver() {
84         public boolean wantsNotification(Message msg) {
85             switch (msg.getMessageId()) {
86                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
87                     return true;
88                 default:
89                     return false;
90             }
91         }
92     };
93
94     final MessageObserver CHANNEL_DATA_MESSAGES = new MessageObserver() {
95         public boolean wantsNotification(Message msg) {
96
97             // Access to this observer is synchronized by the ThreadSynchronizer
98
// so we can flag our InputStream as blocking when the method is
99
// called and released once we have found a message
100
switch (msg.getMessageId()) {
101                 case MultiplexedConnection.MSG_CHANNEL_DATA:
102                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
103                     return true;
104                 default:
105                     return false;
106             }
107         }
108     };
109
110     MessageStore messageStore = new MessageStore(this, stickyMessages);
111
112     public Channel(String JavaDoc type, int localpacket, int localwindow) {
113         this(type, localpacket, localwindow, 0, false, 0);
114     }
115     
116     public Channel(String JavaDoc type, int localpacket, int localwindow, boolean compress, int compressionLevel) {
117         this(type, localpacket, localwindow, 0, compress, compressionLevel);
118     }
119
120     public Channel(String JavaDoc type, int localpacket, int localwindow, int timeout, boolean compress, int compressionLevel) {
121         this.type = type;
122         this.localwindow = new DataWindow(localpacket, localwindow);
123         this.timeout = timeout;
124         this.compressionEnabled = compress;
125         this.compressionLevel = compressionLevel;
126         in = new ChannelInputStream(CHANNEL_DATA_MESSAGES);
127         out = new ChannelOutputStream();
128         
129         compressionIn = new ZStream();
130         compressionOut = new ZStream();
131         
132         compressionIn.inflateInit();
133         compressionOut.deflateInit(compressionLevel);
134         
135     }
136
137     public void setTimeout(int timeout) {
138         this.timeout = timeout;
139     }
140
141     public int getTimeout() {
142         return timeout;
143     }
144
145     public MultiplexedConnection getConnection() {
146         return connection;
147     }
148
149     public void init(MultiplexedConnection connection, int remoteid, int remotepacket, int remotewindow) {
150         this.connection = connection;
151         this.remoteid = remoteid;
152         this.remotewindow = new DataWindow(remotewindow, remotepacket);
153     }
154
155     public synchronized boolean sendChannelRequest(Request request, boolean wantReply) throws IOException JavaDoc {
156         return sendChannelRequest(request, wantReply, 0);
157     }
158
159     public synchronized boolean sendChannelRequest(Request request, boolean wantReply, int timeoutMs) throws IOException JavaDoc {
160
161         Packet msg = new Packet();
162         msg.write(MultiplexedConnection.MSG_CHANNEL_REQUEST);
163         msg.writeInt(channelid);
164         msg.writeString(request.getRequestName());
165         msg.writeBoolean(wantReply);
166         msg.writeBinaryString(request.getRequestData());
167
168         connection.sendMessage(msg);
169
170         if (wantReply) {
171             Message reply = messageStore.nextMessage(channelRequestMessages, timeoutMs);
172
173             switch (reply.getMessageId()) {
174                 case MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS:
175                 case MultiplexedConnection.MSG_CHANNEL_REQUEST_FAILURE:
176
177                     byte[] data = null;
178                     if (reply.available() > 0)
179                         data = reply.readBinaryString();
180
181                     request.setRequestData(data);
182
183                     return reply.getMessageId() == MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS;
184                 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
185                     checkCloseStatus(true);
186                     throw new EOFException JavaDoc("Channel closed before request reply");
187                 default:
188                     throw new IOException JavaDoc("Unexpected reply in channel open procedure");
189             }
190         } else
191             return true;
192     }
193
194     boolean closing = false;
195
196     public void close() {
197         if(connection == null) {
198             // #ifdef DEBUG
199
log.warn("Closing channel of type " + type +" before it was opened");
200             // #endif
201
return;
202         }
203         // #ifdef DEBUG
204
log.debug("Close channel '" + getType() + "'");
205         // #endif
206

207         boolean performClose = false;
208
209         synchronized (this) {
210             if (!closing)
211                 performClose = closing = true;
212         }
213
214         try {
215             
216             if (performClose) {
217
218                 // Close the ChannelOutputStream
219
out.close();
220                 in.close();
221
222                 // Send our close message
223
connection.closeChannel(this);
224
225             }
226         } catch (EOFException JavaDoc eof) {
227             // Ignore this is the message store informing of close/eof
228
} catch (IOException JavaDoc ex) {
229             // IO Error during close so the connection has dropped
230
connection.disconnect("IOException during channel close: " + ex.getMessage());
231
232         } finally {
233             isClosed = true;
234             checkCloseStatus(false);
235         }
236     }
237
238     private void checkCloseStatus(boolean remoteClosed) {
239
240         if(!isClosed) {
241             close();
242             if(!remoteClosed)
243                 remoteClosed = (messageStore.hasMessage(CHANNEL_CLOSE_MESSAGES)!=null);
244         }
245
246         if(remoteClosed) {
247                 if (connection != null)
248                     connection.freeChannel(this);
249
250                 synchronized (listeners) {
251                     
252                     onChannelClose();
253                     
254                     for (Enumeration JavaDoc e = listeners.elements(); e.hasMoreElements();) {
255                         ((ChannelListener) e.nextElement()).onChannelClose(this);
256                     }
257                     
258                 }
259             }
260
261     }
262     public abstract byte[] open(byte[] data) throws IOException JavaDoc, ChannelOpenException;
263
264     public abstract byte[] create() throws IOException JavaDoc;
265
266     void fireChannelOpen(byte[] data) {
267
268         onChannelOpen(data);
269
270         for (Enumeration JavaDoc e = listeners.elements(); e.hasMoreElements();) {
271             ((ChannelListener) e.nextElement()).onChannelOpen(this);
272         }
273
274     }
275
276     public abstract void onChannelOpen(byte[] data);
277
278     public void onChannelData(byte[] buf, int off, int len) {
279     };
280
281     public abstract void onChannelClose();
282
283     public void addListener(ChannelListener listener) {
284         if (listener != null)
285             listeners.addElement(listener);
286     }
287
288     public boolean onChannelRequest(Request request) {
289         return false;
290     }
291
292     public OutputStream JavaDoc getOutputStream() {
293         return out;
294     }
295
296     public InputStream JavaDoc getInputStream() {
297         return in;
298     }
299
300     public String JavaDoc getType() {
301         return type;
302     }
303
304     public int getLocalWindow() {
305         return localwindow.available();
306     }
307
308     public int getLocalPacket() {
309         return localwindow.getPacketSize();
310     }
311
312     public boolean isClosed() {
313         return messageStore.isClosed();
314     }
315
316     protected void adjustWindow(int increment) throws IOException JavaDoc {
317         localwindow.adjust(increment);
318         connection.sendWindowAdjust(this, increment);
319     }
320     
321     private void uncompressMesasge(Message msg) {
322         
323     }
324     
325     private void compressMessage(Message msg) {
326         
327     }
328     
329     protected boolean processChannelMessage(Message msg) throws IOException JavaDoc {
330         
331         boolean addToMessageStore = true;
332
333         switch(msg.getMessageId()) {
334         case MultiplexedConnection.MSG_CHANNEL_CLOSE:
335             checkCloseStatus(true);
336             break;
337         case MultiplexedConnection.MSG_CHANNEL_DATA:
338             
339             if (autoConsumeInput) {
340                 localwindow.consume(msg.available()-4);
341                 if (localwindow.available() <= in.buffer.length / 2) {
342                     adjustWindow(in.buffer.length - localwindow.available());
343                 }
344                 addToMessageStore = false;
345             }
346             
347             if(compressionEnabled) {
348                 uncompressMesasge(msg);
349             }
350             
351             onChannelData(msg.array(), msg.getPosition()+4, msg.available()-4);
352             
353             for (Enumeration JavaDoc e = listeners.elements(); e.hasMoreElements();) {
354                 ((ChannelListener) e.nextElement()).onChannelData(this, msg.array(), msg.getPosition()+4, msg.available()-4);
355             }
356             
357             break;
358         default:
359                 break;
360         }
361         
362         return addToMessageStore;
363     }
364
365     Message processMessages(MessageObserver messagefilter) throws IOException JavaDoc, EOFException JavaDoc {
366
367         Message msg;
368
369         /**
370          * Collect the next channel message from the connection protocol
371          */

372         msg = messageStore.nextMessage(messagefilter, timeout);
373
374         switch (msg.getMessageId()) {
375
376             case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
377                 int i = (int) msg.readInt();
378                 remotewindow.adjust(i);
379                 windowSequence++;
380                 break;
381
382             case MultiplexedConnection.MSG_CHANNEL_DATA:
383                 msg.skip(4); // Skip the length
384
in.write(msg.array(), msg.getPosition(), msg.available());
385                 break;
386
387             case MultiplexedConnection.MSG_CHANNEL_CLOSE:
388                 checkCloseStatus(true);
389                 throw new EOFException JavaDoc("The channel is closed");
390
391             default:
392                 break;
393         }
394
395         return msg;
396     }
397
398     class ChannelOutputStream extends OutputStream JavaDoc {
399
400         boolean isEOF = false;
401         boolean closed = false;
402
403         public void write(int b) throws java.io.IOException JavaDoc {
404             write(new byte[] { (byte) b }, 0, 1);
405         }
406
407         public synchronized void write(byte[] buf, int offset, int len) throws IOException JavaDoc {
408
409             int write;
410
411             do {
412
413                 if (remotewindow.available() <= 0) {
414                     Message msg = processMessages(WINDOW_ADJUST_MESSAGES);
415                 }
416
417                 if (closed) {
418                     throw new IOException JavaDoc("The channel stream is closed!");
419                 }
420
421                 write = remotewindow.available() < remotewindow.getPacketSize() ? (remotewindow.available() < len ? remotewindow.available()
422                     : len)
423                     : (remotewindow.getPacketSize() < len ? remotewindow.getPacketSize() : len);
424
425                 if (write > 0) {
426                     
427                     connection.sendChannelData(Channel.this, buf, offset, write);
428                     remotewindow.consume(write);
429                     len -= write;
430                     offset += write;
431                 }
432
433             } while (len > 0);
434
435         }
436
437         public void close() throws IOException JavaDoc {
438             closed = true;
439             Channel.this.close();
440         }
441
442     }
443
444     class ChannelInputStream extends InputStream JavaDoc {
445
446         byte[] buffer;
447         int unread = 0;
448         int position = 0;
449         int base = 0;
450         MessageObserver messagefilter;
451         long transfered = 0;
452         boolean closed = false;
453
454         ChannelInputStream(MessageObserver messagefilter) {
455             buffer = new byte[localwindow.available()];
456             this.messagefilter = messagefilter;
457         }
458
459         public synchronized int available() throws IOException JavaDoc {
460             if(closed && unread==0) {
461                 return -1;
462             }
463
464             try {
465                 if (unread == 0) {
466                     if (messageStore.hasMessage(messagefilter) != null) {
467                         processMessages(messagefilter);
468                     }
469                 }
470                 return unread;
471             } catch (EOFException JavaDoc ex) {
472                 closed = true;
473                 return -1;
474             }
475         }
476
477         public void close() {
478             Channel.this.close();
479         }
480
481         public int read() throws IOException JavaDoc {
482             byte[] b = new byte[1];
483             int ret = read(b, 0, 1);
484             if (ret > 0) {
485                 return b[0] & 0xFF;
486             } else {
487                 return -1;
488             }
489         }
490
491         public long skip(long len) throws IOException JavaDoc {
492
493             int count = unread < len ? unread : (int) len;
494
495             try {
496                 if (count == 0 && isClosed())
497                     throw new EOFException JavaDoc("The inputstream is closed");
498
499                 int index = base;
500                 base = (base + count) % buffer.length;
501                 unread -= count;
502
503                 if ((unread + localwindow.available()) < (buffer.length / 2)) {
504                     adjustWindow(buffer.length - localwindow.available() - unread);
505                 }
506
507             } finally {
508                 transfered += count;
509             }
510             return count;
511         }
512
513         public synchronized int read(byte[] buf, int offset, int len) throws IOException JavaDoc {
514
515             try {
516
517                 if (available() == -1)
518                     return -1;
519
520                 if (unread <= 0 && !isClosed()) {
521                     processMessages(messagefilter);
522                 }
523
524                 int count = unread < len ? unread : len;
525
526                 if (count == 0 && isClosed())
527                     return -1;
528
529                 int index = base;
530                 base = (base + count) % buffer.length;
531                 if (buffer.length - index > count) {
532                     System.arraycopy(buffer, index, buf, offset, count);
533                 } else {
534                     int remaining = buffer.length - index;
535                     System.arraycopy(buffer, index, buf, offset, remaining);
536                     System.arraycopy(buffer, 0, buf, offset + remaining, count - remaining);
537                 }
538
539                 unread -= count;
540
541                 if ((unread + localwindow.available()) < (buffer.length / 2)) {
542                     adjustWindow(buffer.length - localwindow.available() - unread);
543                 }
544
545                 transfered += count;
546
547                 return count;
548             } catch (EOFException JavaDoc ex) {
549                 return -1;
550             }
551         }
552
553         void write(byte[] buf, int offset, int len) throws IOException JavaDoc {
554
555             if (localwindow.available() < len) {
556                 connection.disconnect("Received data exceeding current window space");
557                 throw new IOException JavaDoc("Window space exceeded");
558             }
559
560             int i = 0;
561             int index;
562             int count;
563             while (i < len) {
564                 // Copy data up to the end of the array and start back
565
// at the beginning
566
index = (base + unread) % buffer.length;
567                 count = ((buffer.length - index < len - i) ? buffer.length - index : len - i);
568                 System.arraycopy(buf, offset + i, buffer, index, count);
569                 unread += count;
570                 i += count;
571             }
572
573             localwindow.consume(len);
574
575         }
576     }
577
578     class DataWindow {
579         int windowsize;
580         int packetsize;
581
582         DataWindow(int windowsize, int packetsize) {
583             this.windowsize = windowsize;
584             this.packetsize = packetsize;
585         }
586
587         int getPacketSize() {
588             return packetsize;
589         }
590
591         void adjust(int count) {
592             windowsize += count;
593         }
594
595         void consume(int count) {
596             windowsize -= count;
597         }
598
599         int available() {
600             return windowsize;
601         }
602     }
603
604     public boolean isAutoConsumeInput() {
605         return autoConsumeInput;
606     }
607
608     public void setAutoConsumeInput(boolean autoConsumeInput) {
609         this.autoConsumeInput = autoConsumeInput;
610     }
611     
612     public int getCompressionLevel() {
613         return compressionLevel;
614     }
615
616     public boolean isCompressionEnabled() {
617         return compressionEnabled;
618     }
619
620     ZStream getCompressionIn() {
621         return compressionIn;
622     }
623
624     ZStream getCompressionOut() {
625         return compressionOut;
626     }
627
628 }
629
Popular Tags