1 45 package org.exolab.jms.net.multiplexer; 46 47 import java.io.DataInputStream ; 48 import java.io.DataOutputStream ; 49 import java.io.IOException ; 50 import java.io.InterruptedIOException ; 51 import java.net.ProtocolException ; 52 import java.security.Principal ; 53 import java.util.HashMap ; 54 import java.util.LinkedList ; 55 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 59 60 import org.exolab.jms.common.security.BasicPrincipal; 61 import org.exolab.jms.net.connector.Authenticator; 62 import org.exolab.jms.net.connector.ResourceException; 63 import org.exolab.jms.net.connector.SecurityException; 64 65 66 72 public class Multiplexer implements Constants, Runnable { 73 74 77 private MultiplexerListener _listener; 78 79 82 private volatile boolean _closed; 83 84 87 private Endpoint _endpoint; 88 89 92 private DataOutputStream _out; 93 94 97 private DataInputStream _in; 98 99 102 private HashMap _channels = new HashMap (); 103 104 107 private LinkedList _free = new LinkedList (); 108 109 114 private boolean _client = false; 115 116 119 private int _seed = 0; 120 121 124 private PooledExecutor _pool; 125 126 130 private Principal _principal; 131 132 135 private static final int BUFFER_SIZE = 2048; 136 137 140 private static final Log _log = LogFactory.getLog(Multiplexer.class); 141 142 152 public Multiplexer(MultiplexerListener listener, Endpoint endpoint, 153 Principal principal, PooledExecutor pool) 154 throws IOException , SecurityException { 155 initialise(listener, endpoint, pool, true); 156 authenticate(principal); 157 } 158 159 169 public Multiplexer(MultiplexerListener listener, Endpoint endpoint, 170 Authenticator authenticator, 171 PooledExecutor pool) 172 throws IOException , ResourceException { 173 initialise(listener, endpoint, pool, false); 174 authenticate(authenticator); 175 } 176 177 183 protected Multiplexer() { 184 } 185 186 189 public void run() { 190 while (!_closed) { 191 multiplex(); 192 } 193 } 194 195 202 public Channel getChannel() throws IOException { 203 Channel channel = null; 204 205 synchronized (_free) { 206 if (!_free.isEmpty()) { 207 channel = (Channel) _free.removeFirst(); 208 } 209 } 210 211 if (channel == null) { 212 channel = open(); 213 } 214 215 return channel; 216 } 217 218 223 public void release(Channel channel) { 224 synchronized (_free) { 225 _free.add(channel); 226 } 227 } 228 229 235 public void close(Channel channel) throws IOException { 236 int channelId = channel.getId(); 237 synchronized (_channels) { 238 _channels.remove(new Integer (channelId)); 239 } 240 241 send(CLOSE, channelId); 242 } 243 244 250 public void send(byte type) throws IOException { 251 synchronized (_out) { 252 _out.writeByte(type); 253 _out.flush(); 254 if (_log.isDebugEnabled()) { 255 _log.debug("send(type=0x" + Integer.toHexString(type) + ")"); 256 } 257 } 258 } 259 260 267 public void send(byte type, int channelId) throws IOException { 268 synchronized (_out) { 269 _out.writeByte(type); 270 _out.writeShort(channelId); 271 _out.flush(); 272 if (_log.isDebugEnabled()) { 273 _log.debug("send(type=0x" + Integer.toHexString(type) 274 + ", channel=" + channelId + ")"); 275 } 276 } 277 } 278 279 287 public void send(byte type, int channelId, int data) throws IOException { 288 synchronized (_out) { 289 _out.writeByte(type); 290 _out.writeShort(channelId); 291 _out.writeInt(data); 292 _out.flush(); 293 if (_log.isDebugEnabled()) { 294 _log.debug("send(type=" + type + ", channel=" + channelId 295 + ", data=" + Integer.toHexString(data) + ")"); 296 } 297 } 298 } 299 300 310 public void send(byte type, int channelId, byte[] data, int offset, 311 int length) throws IOException { 312 synchronized (_out) { 313 _out.writeByte(type); 314 _out.writeShort(channelId); 315 _out.writeInt(length); 316 _out.write(data, offset, length); 317 _out.flush(); 318 } 319 } 320 321 325 public void close() { 326 if (!_closed) { 327 _closed = true; 328 try { 329 send(SHUTDOWN); 330 } catch (IOException exception) { 331 _log.debug(exception); 332 } 333 try { 334 _endpoint.close(); 335 } catch (IOException exception) { 336 _log.debug(exception); 337 } 338 } 342 } 343 344 349 public boolean isClosed() { 350 return _closed; 351 } 352 353 359 public boolean isClient() { 360 return _client; 361 } 362 363 369 public Principal getPrincipal() { 370 return _principal; 371 } 372 373 383 protected void initialise(MultiplexerListener listener, Endpoint endpoint, 384 PooledExecutor pool, boolean client) 385 throws IOException { 386 387 388 if (listener == null) { 389 throw new IllegalArgumentException ("Argument 'listener' is null"); 390 } 391 if (endpoint == null) { 392 throw new IllegalArgumentException ("Argument 'endpoint' is null"); 393 } 394 if (pool == null) { 395 throw new IllegalArgumentException ("Argument 'pool' is null"); 396 } 397 if (_log.isDebugEnabled()) { 398 _log.debug("Multiplexer(uri=" + endpoint.getURI() 399 + ", client=" + client); 400 } 401 _listener = listener; 402 _endpoint = endpoint; 403 _pool = pool; 404 _out = new DataOutputStream (endpoint.getOutputStream()); 405 _in = new DataInputStream (endpoint.getInputStream()); 406 _client = client; 407 handshake(_out, _in); 408 } 409 410 418 protected void handshake(DataOutputStream out, DataInputStream in) 419 throws IOException { 420 out.writeInt(MAGIC); 421 out.writeInt(VERSION); 422 out.flush(); 423 424 int magic = in.readInt(); 425 if (magic != MAGIC) { 426 throw new ProtocolException ("Expected protocol magic=" + MAGIC 427 + ", but received=" + magic); 428 } 429 int version = in.readInt(); 430 if (version != VERSION) { 431 throw new ProtocolException ("Expected protocol version=" + VERSION 432 + ", but received=" + version); 433 } 434 } 435 436 443 protected void authenticate(Principal principal) 444 throws IOException , SecurityException { 445 try { 446 if (principal != null && !(principal instanceof BasicPrincipal)) { 447 throw new IOException ( 448 "Cannot authenticate with principal of type " 449 + principal.getClass().getName()); 450 } 451 if (principal != null) { 452 BasicPrincipal basic = (BasicPrincipal) principal; 453 _out.writeByte(AUTH_BASIC); 454 _out.writeUTF(basic.getName()); 455 _out.writeUTF(basic.getPassword()); 456 } else { 457 _out.writeByte(AUTH_NONE); 458 } 459 _out.flush(); 460 if (_in.readByte() != AUTH_OK) { 461 throw new SecurityException ("Connection refused"); 462 } 463 } catch (IOException exception) { 464 _endpoint.close(); 466 throw exception; 467 } 468 _principal = principal; 469 } 470 471 478 protected void authenticate(Authenticator authenticator) 479 throws IOException , ResourceException { 480 481 try { 482 Principal principal = null; 483 byte type = _in.readByte(); 484 485 switch (type) { 486 case AUTH_BASIC: 487 String name = _in.readUTF(); 488 String password = _in.readUTF(); 489 principal = new BasicPrincipal(name, password); 490 break; 491 case AUTH_NONE: 492 break; 493 default: 494 throw new IOException ("Invalid packet type: " + type); 495 } 496 if (authenticator.authenticate(principal)) { 497 _out.writeByte(AUTH_OK); 498 _out.flush(); 499 } else { 500 _out.writeByte(AUTH_DENIED); 501 _out.flush(); 502 throw new SecurityException ("User " + principal 503 + " unauthorised"); 504 } 505 _principal = principal; 506 } catch (IOException exception) { 507 _endpoint.close(); 509 throw exception; 510 } catch (ResourceException exception) { 511 _endpoint.close(); 513 throw exception; 514 } 515 } 516 517 523 protected Channel open() throws IOException { 524 Channel channel; 525 int channelId; 526 synchronized (_channels) { 527 channelId = getNextChannelId(); 528 channel = addChannel(channelId); 529 } 530 531 send(OPEN, channelId); 532 return channel; 533 } 534 535 538 private void multiplex() { 539 try { 540 byte type = _in.readByte(); 541 switch (type) { 542 case OPEN: 543 handleOpen(); 544 break; 545 case CLOSE: 546 handleClose(); 547 break; 548 case REQUEST: 549 handleRequest(); 550 break; 551 case RESPONSE: 552 handleResponse(); 553 break; 554 case DATA: 555 handleData(); 556 break; 557 case PING_REQUEST: 558 handlePingRequest(); 559 break; 560 case PING_RESPONSE: 561 handlePingResponse(); 562 break; 563 case FLOW_READ: 564 handleFlowRead(); 565 break; 566 case SHUTDOWN: 567 handleShutdown(); 568 break; 569 default: 570 throw new IOException ("Unrecognised message type: " 571 + type); 572 } 573 } catch (Exception exception) { 574 boolean closed = _closed; 575 shutdown(); 576 if (!closed) { 577 _log.debug("Multiplexer shutting down on error", exception); 578 _listener.error(exception); 580 } 581 } 582 } 583 584 587 private void shutdown() { 588 _closed = true; 590 591 Channel[] channels; 593 synchronized (_channels) { 594 channels = (Channel[]) _channels.values().toArray(new Channel[0]); 595 } 596 for (int i = 0; i < channels.length; ++i) { 597 channels[i].disconnected(); 598 } 599 } 600 601 606 private void handleOpen() throws IOException { 607 int channelId = _in.readUnsignedShort(); 608 Integer key = new Integer (channelId); 609 610 synchronized (_channels) { 611 if (_channels.get(key) != null) { 612 throw new IOException ( 613 "A channel already exists with identifier: " + key); 614 } 615 addChannel(channelId); 616 } 617 } 618 619 624 private void handleClose() throws IOException { 625 int channelId = _in.readUnsignedShort(); 626 Integer key = new Integer (channelId); 627 628 synchronized (_channels) { 629 Channel channel = (Channel) _channels.remove(key); 630 if (channel == null) { 631 throw new IOException ( 632 "No channel exists with identifier: " + key); 633 } 634 channel.close(); 635 } 636 } 637 638 644 private void handleRequest() throws IOException { 645 final Channel channel = handleData(); 646 Runnable request = new Runnable () { 647 public void run() { 648 if (_log.isDebugEnabled()) { 649 _log.debug("handleRequest() [channel=" 650 + channel.getId() + "]"); 651 } 652 _listener.request(channel); 654 655 if (_log.isDebugEnabled()) { 656 _log.debug("handleRequest() [channel=" 657 + channel.getId() + "] - end"); 658 } 659 } 660 }; 661 try { 662 _pool.execute(request); 663 } catch (InterruptedException exception) { 664 throw new InterruptedIOException (exception.getMessage()); 665 } 666 } 667 668 674 private void handleResponse() throws IOException { 675 handleData(); 676 } 677 678 683 private void handlePingRequest() throws IOException { 684 Channel channel = readChannel(); 685 channel.handlePingRequest(); 686 } 687 688 693 private void handlePingResponse() throws IOException { 694 Channel channel = readChannel(); 695 channel.handlePingResponse(); 696 } 697 698 705 private Channel handleData() throws IOException { 706 Channel channel = readChannel(); 707 int length = _in.readInt(); 708 channel.getMultiplexInputStream().receive(_in, length); 709 return channel; 710 } 711 712 717 private void handleFlowRead() throws IOException { 718 Channel channel = readChannel(); 719 int read = _in.readInt(); 720 channel.getMultiplexOutputStream().notifyRead(read); 721 } 722 723 726 private void handleShutdown() { 727 shutdown(); 728 _listener.closed(); 729 } 730 731 739 private Channel addChannel(int channelId) { 740 int size = BUFFER_SIZE; 741 MultiplexOutputStream out = 742 new MultiplexOutputStream(channelId, this, size, size); 743 MultiplexInputStream in = 744 new MultiplexInputStream(channelId, this, size); 745 Channel channel = new Channel(channelId, this, in, out); 746 _channels.put(new Integer (channelId), channel); 747 return channel; 748 } 749 750 758 private Channel readChannel() throws IOException { 759 int channelId = _in.readUnsignedShort(); 760 return getChannel(channelId); 761 } 762 763 770 private Channel getChannel(int channelId) throws IOException { 771 Channel channel; 772 Integer key = new Integer (channelId); 773 synchronized (_channels) { 774 channel = (Channel) _channels.get(key); 775 if (channel == null) { 776 throw new IOException ( 777 "No channel exists with identifier: " + channelId); 778 } 779 } 780 return channel; 781 } 782 783 793 private int getNextChannelId() throws IOException { 794 final int mask = 0x7fff; 795 final int serverIdBase = 0x8000; 796 int channelId = 0; 797 while (!_closed) { 798 _seed = (_seed + 1) & mask; 799 channelId = (_client) ? _seed : _seed + serverIdBase; 800 if (!_channels.containsKey(new Integer (channelId))) { 801 break; 802 } 803 } 804 if (_closed) { 805 throw new IOException ("Connection has been closed"); 806 } 807 return channelId; 808 } 809 810 } 811 | Popular Tags |