1 7 package com.maverick.multiplex; 8 9 import java.io.EOFException ; 10 import java.io.IOException ; 11 import java.io.InputStream ; 12 import java.io.OutputStream ; 13 import java.util.Enumeration ; 14 import java.util.Vector ; 15 16 import com.jcraft.jzlib.ZStream; 17 18 25 public abstract class Channel { 26 27 MultiplexedConnection connection; 28 int channelid; 29 int remoteid; 30 String type; 31 int timeout; 32 DataWindow remotewindow; 33 DataWindow localwindow; 34 Vector listeners = new Vector (); 35 ChannelInputStream in; 36 ChannelOutputStream out; 37 int windowSequence = 0; 38 boolean isClosed; 39 boolean autoConsumeInput = false; 40 boolean compressionEnabled = false; 41 int compressionLevel = 6; 42 private ZStream compressionIn; 43 private ZStream compressionOut; 44 45 org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(Channel.class); 47 MessageObserver stickyMessages = new MessageObserver() { 49 public boolean wantsNotification(Message msg) { 50 switch (msg.getMessageId()) { 51 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 52 return true; 53 default: 54 return false; 55 } 56 } 57 }; 58 59 MessageObserver channelRequestMessages = new MessageObserver() { 60 public boolean wantsNotification(Message msg) { 61 switch (msg.getMessageId()) { 62 case MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS: 63 case MultiplexedConnection.MSG_CHANNEL_REQUEST_FAILURE: 64 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 65 return true; 66 default: 67 return false; 68 } 69 } 70 }; 71 final MessageObserver WINDOW_ADJUST_MESSAGES = new MessageObserver() { 72 public boolean wantsNotification(Message msg) { 73 switch (msg.getMessageId()) { 74 case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST: 75 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 76 return true; 77 default: 78 return false; 79 } 80 } 81 }; 82 83 final MessageObserver CHANNEL_CLOSE_MESSAGES = new MessageObserver() { 84 public boolean wantsNotification(Message msg) { 85 switch (msg.getMessageId()) { 86 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 87 return true; 88 default: 89 return false; 90 } 91 } 92 }; 93 94 final MessageObserver CHANNEL_DATA_MESSAGES = new MessageObserver() { 95 public boolean wantsNotification(Message msg) { 96 97 switch (msg.getMessageId()) { 101 case MultiplexedConnection.MSG_CHANNEL_DATA: 102 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 103 return true; 104 default: 105 return false; 106 } 107 } 108 }; 109 110 MessageStore messageStore = new MessageStore(this, stickyMessages); 111 112 public Channel(String type, int localpacket, int localwindow) { 113 this(type, localpacket, localwindow, 0, false, 0); 114 } 115 116 public Channel(String type, int localpacket, int localwindow, boolean compress, int compressionLevel) { 117 this(type, localpacket, localwindow, 0, compress, compressionLevel); 118 } 119 120 public Channel(String type, int localpacket, int localwindow, int timeout, boolean compress, int compressionLevel) { 121 this.type = type; 122 this.localwindow = new DataWindow(localpacket, localwindow); 123 this.timeout = timeout; 124 this.compressionEnabled = compress; 125 this.compressionLevel = compressionLevel; 126 in = new ChannelInputStream(CHANNEL_DATA_MESSAGES); 127 out = new ChannelOutputStream(); 128 129 compressionIn = new ZStream(); 130 compressionOut = new ZStream(); 131 132 compressionIn.inflateInit(); 133 compressionOut.deflateInit(compressionLevel); 134 135 } 136 137 public void setTimeout(int timeout) { 138 this.timeout = timeout; 139 } 140 141 public int getTimeout() { 142 return timeout; 143 } 144 145 public MultiplexedConnection getConnection() { 146 return connection; 147 } 148 149 public void init(MultiplexedConnection connection, int remoteid, int remotepacket, int remotewindow) { 150 this.connection = connection; 151 this.remoteid = remoteid; 152 this.remotewindow = new DataWindow(remotewindow, remotepacket); 153 } 154 155 public synchronized boolean sendChannelRequest(Request request, boolean wantReply) throws IOException { 156 return sendChannelRequest(request, wantReply, 0); 157 } 158 159 public synchronized boolean sendChannelRequest(Request request, boolean wantReply, int timeoutMs) throws IOException { 160 161 Packet msg = new Packet(); 162 msg.write(MultiplexedConnection.MSG_CHANNEL_REQUEST); 163 msg.writeInt(channelid); 164 msg.writeString(request.getRequestName()); 165 msg.writeBoolean(wantReply); 166 msg.writeBinaryString(request.getRequestData()); 167 168 connection.sendMessage(msg); 169 170 if (wantReply) { 171 Message reply = messageStore.nextMessage(channelRequestMessages, timeoutMs); 172 173 switch (reply.getMessageId()) { 174 case MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS: 175 case MultiplexedConnection.MSG_CHANNEL_REQUEST_FAILURE: 176 177 byte[] data = null; 178 if (reply.available() > 0) 179 data = reply.readBinaryString(); 180 181 request.setRequestData(data); 182 183 return reply.getMessageId() == MultiplexedConnection.MSG_CHANNEL_REQUEST_SUCCESS; 184 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 185 checkCloseStatus(true); 186 throw new EOFException ("Channel closed before request reply"); 187 default: 188 throw new IOException ("Unexpected reply in channel open procedure"); 189 } 190 } else 191 return true; 192 } 193 194 boolean closing = false; 195 196 public void close() { 197 if(connection == null) { 198 log.warn("Closing channel of type " + type +" before it was opened"); 200 return; 202 } 203 log.debug("Close channel '" + getType() + "'"); 205 207 boolean performClose = false; 208 209 synchronized (this) { 210 if (!closing) 211 performClose = closing = true; 212 } 213 214 try { 215 216 if (performClose) { 217 218 out.close(); 220 in.close(); 221 222 connection.closeChannel(this); 224 225 } 226 } catch (EOFException eof) { 227 } catch (IOException ex) { 229 connection.disconnect("IOException during channel close: " + ex.getMessage()); 231 232 } finally { 233 isClosed = true; 234 checkCloseStatus(false); 235 } 236 } 237 238 private void checkCloseStatus(boolean remoteClosed) { 239 240 if(!isClosed) { 241 close(); 242 if(!remoteClosed) 243 remoteClosed = (messageStore.hasMessage(CHANNEL_CLOSE_MESSAGES)!=null); 244 } 245 246 if(remoteClosed) { 247 if (connection != null) 248 connection.freeChannel(this); 249 250 synchronized (listeners) { 251 252 onChannelClose(); 253 254 for (Enumeration e = listeners.elements(); e.hasMoreElements();) { 255 ((ChannelListener) e.nextElement()).onChannelClose(this); 256 } 257 258 } 259 } 260 261 } 262 public abstract byte[] open(byte[] data) throws IOException , ChannelOpenException; 263 264 public abstract byte[] create() throws IOException ; 265 266 void fireChannelOpen(byte[] data) { 267 268 onChannelOpen(data); 269 270 for (Enumeration e = listeners.elements(); e.hasMoreElements();) { 271 ((ChannelListener) e.nextElement()).onChannelOpen(this); 272 } 273 274 } 275 276 public abstract void onChannelOpen(byte[] data); 277 278 public void onChannelData(byte[] buf, int off, int len) { 279 }; 280 281 public abstract void onChannelClose(); 282 283 public void addListener(ChannelListener listener) { 284 if (listener != null) 285 listeners.addElement(listener); 286 } 287 288 public boolean onChannelRequest(Request request) { 289 return false; 290 } 291 292 public OutputStream getOutputStream() { 293 return out; 294 } 295 296 public InputStream getInputStream() { 297 return in; 298 } 299 300 public String getType() { 301 return type; 302 } 303 304 public int getLocalWindow() { 305 return localwindow.available(); 306 } 307 308 public int getLocalPacket() { 309 return localwindow.getPacketSize(); 310 } 311 312 public boolean isClosed() { 313 return messageStore.isClosed(); 314 } 315 316 protected void adjustWindow(int increment) throws IOException { 317 localwindow.adjust(increment); 318 connection.sendWindowAdjust(this, increment); 319 } 320 321 private void uncompressMesasge(Message msg) { 322 323 } 324 325 private void compressMessage(Message msg) { 326 327 } 328 329 protected boolean processChannelMessage(Message msg) throws IOException { 330 331 boolean addToMessageStore = true; 332 333 switch(msg.getMessageId()) { 334 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 335 checkCloseStatus(true); 336 break; 337 case MultiplexedConnection.MSG_CHANNEL_DATA: 338 339 if (autoConsumeInput) { 340 localwindow.consume(msg.available()-4); 341 if (localwindow.available() <= in.buffer.length / 2) { 342 adjustWindow(in.buffer.length - localwindow.available()); 343 } 344 addToMessageStore = false; 345 } 346 347 if(compressionEnabled) { 348 uncompressMesasge(msg); 349 } 350 351 onChannelData(msg.array(), msg.getPosition()+4, msg.available()-4); 352 353 for (Enumeration e = listeners.elements(); e.hasMoreElements();) { 354 ((ChannelListener) e.nextElement()).onChannelData(this, msg.array(), msg.getPosition()+4, msg.available()-4); 355 } 356 357 break; 358 default: 359 break; 360 } 361 362 return addToMessageStore; 363 } 364 365 Message processMessages(MessageObserver messagefilter) throws IOException , EOFException { 366 367 Message msg; 368 369 372 msg = messageStore.nextMessage(messagefilter, timeout); 373 374 switch (msg.getMessageId()) { 375 376 case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST: 377 int i = (int) msg.readInt(); 378 remotewindow.adjust(i); 379 windowSequence++; 380 break; 381 382 case MultiplexedConnection.MSG_CHANNEL_DATA: 383 msg.skip(4); in.write(msg.array(), msg.getPosition(), msg.available()); 385 break; 386 387 case MultiplexedConnection.MSG_CHANNEL_CLOSE: 388 checkCloseStatus(true); 389 throw new EOFException ("The channel is closed"); 390 391 default: 392 break; 393 } 394 395 return msg; 396 } 397 398 class ChannelOutputStream extends OutputStream { 399 400 boolean isEOF = false; 401 boolean closed = false; 402 403 public void write(int b) throws java.io.IOException { 404 write(new byte[] { (byte) b }, 0, 1); 405 } 406 407 public synchronized void write(byte[] buf, int offset, int len) throws IOException { 408 409 int write; 410 411 do { 412 413 if (remotewindow.available() <= 0) { 414 Message msg = processMessages(WINDOW_ADJUST_MESSAGES); 415 } 416 417 if (closed) { 418 throw new IOException ("The channel stream is closed!"); 419 } 420 421 write = remotewindow.available() < remotewindow.getPacketSize() ? (remotewindow.available() < len ? remotewindow.available() 422 : len) 423 : (remotewindow.getPacketSize() < len ? remotewindow.getPacketSize() : len); 424 425 if (write > 0) { 426 427 connection.sendChannelData(Channel.this, buf, offset, write); 428 remotewindow.consume(write); 429 len -= write; 430 offset += write; 431 } 432 433 } while (len > 0); 434 435 } 436 437 public void close() throws IOException { 438 closed = true; 439 Channel.this.close(); 440 } 441 442 } 443 444 class ChannelInputStream extends InputStream { 445 446 byte[] buffer; 447 int unread = 0; 448 int position = 0; 449 int base = 0; 450 MessageObserver messagefilter; 451 long transfered = 0; 452 boolean closed = false; 453 454 ChannelInputStream(MessageObserver messagefilter) { 455 buffer = new byte[localwindow.available()]; 456 this.messagefilter = messagefilter; 457 } 458 459 public synchronized int available() throws IOException { 460 if(closed && unread==0) { 461 return -1; 462 } 463 464 try { 465 if (unread == 0) { 466 if (messageStore.hasMessage(messagefilter) != null) { 467 processMessages(messagefilter); 468 } 469 } 470 return unread; 471 } catch (EOFException ex) { 472 closed = true; 473 return -1; 474 } 475 } 476 477 public void close() { 478 Channel.this.close(); 479 } 480 481 public int read() throws IOException { 482 byte[] b = new byte[1]; 483 int ret = read(b, 0, 1); 484 if (ret > 0) { 485 return b[0] & 0xFF; 486 } else { 487 return -1; 488 } 489 } 490 491 public long skip(long len) throws IOException { 492 493 int count = unread < len ? unread : (int) len; 494 495 try { 496 if (count == 0 && isClosed()) 497 throw new EOFException ("The inputstream is closed"); 498 499 int index = base; 500 base = (base + count) % buffer.length; 501 unread -= count; 502 503 if ((unread + localwindow.available()) < (buffer.length / 2)) { 504 adjustWindow(buffer.length - localwindow.available() - unread); 505 } 506 507 } finally { 508 transfered += count; 509 } 510 return count; 511 } 512 513 public synchronized int read(byte[] buf, int offset, int len) throws IOException { 514 515 try { 516 517 if (available() == -1) 518 return -1; 519 520 if (unread <= 0 && !isClosed()) { 521 processMessages(messagefilter); 522 } 523 524 int count = unread < len ? unread : len; 525 526 if (count == 0 && isClosed()) 527 return -1; 528 529 int index = base; 530 base = (base + count) % buffer.length; 531 if (buffer.length - index > count) { 532 System.arraycopy(buffer, index, buf, offset, count); 533 } else { 534 int remaining = buffer.length - index; 535 System.arraycopy(buffer, index, buf, offset, remaining); 536 System.arraycopy(buffer, 0, buf, offset + remaining, count - remaining); 537 } 538 539 unread -= count; 540 541 if ((unread + localwindow.available()) < (buffer.length / 2)) { 542 adjustWindow(buffer.length - localwindow.available() - unread); 543 } 544 545 transfered += count; 546 547 return count; 548 } catch (EOFException ex) { 549 return -1; 550 } 551 } 552 553 void write(byte[] buf, int offset, int len) throws IOException { 554 555 if (localwindow.available() < len) { 556 connection.disconnect("Received data exceeding current window space"); 557 throw new IOException ("Window space exceeded"); 558 } 559 560 int i = 0; 561 int index; 562 int count; 563 while (i < len) { 564 index = (base + unread) % buffer.length; 567 count = ((buffer.length - index < len - i) ? buffer.length - index : len - i); 568 System.arraycopy(buf, offset + i, buffer, index, count); 569 unread += count; 570 i += count; 571 } 572 573 localwindow.consume(len); 574 575 } 576 } 577 578 class DataWindow { 579 int windowsize; 580 int packetsize; 581 582 DataWindow(int windowsize, int packetsize) { 583 this.windowsize = windowsize; 584 this.packetsize = packetsize; 585 } 586 587 int getPacketSize() { 588 return packetsize; 589 } 590 591 void adjust(int count) { 592 windowsize += count; 593 } 594 595 void consume(int count) { 596 windowsize -= count; 597 } 598 599 int available() { 600 return windowsize; 601 } 602 } 603 604 public boolean isAutoConsumeInput() { 605 return autoConsumeInput; 606 } 607 608 public void setAutoConsumeInput(boolean autoConsumeInput) { 609 this.autoConsumeInput = autoConsumeInput; 610 } 611 612 public int getCompressionLevel() { 613 return compressionLevel; 614 } 615 616 public boolean isCompressionEnabled() { 617 return compressionEnabled; 618 } 619 620 ZStream getCompressionIn() { 621 return compressionIn; 622 } 623 624 ZStream getCompressionOut() { 625 return compressionOut; 626 } 627 628 } 629 | Popular Tags |