1 20 21 package org.snmp4j.transport; 22 23 import java.io.*; 24 import java.net.*; 25 import java.nio.*; 26 import java.nio.channels.*; 27 import java.util.*; 28 29 import org.snmp4j.asn1.*; 30 import org.snmp4j.asn1.BER.*; 31 import org.snmp4j.log.*; 32 import org.snmp4j.smi.*; 33 import org.snmp4j.SNMP4JSettings; 34 35 47 public class DefaultTcpTransportMapping extends TcpTransportMapping { 48 49 private static final LogAdapter logger = 50 LogFactory.getLogger(DefaultTcpTransportMapping.class); 51 52 private Map sockets = new Hashtable(); 53 private ServerThread server; 54 55 private Timer socketCleaner; 56 private long connectionTimeout = 60000; 58 private boolean serverEnabled = false; 59 60 private static final int MIN_SNMP_HEADER_LENGTH = 6; 61 private MessageLengthDecoder messageLengthDecoder = 62 new SnmpMesssageLengthDecoder(); 63 64 71 public DefaultTcpTransportMapping() throws UnknownHostException, IOException { 72 super(new TcpAddress(InetAddress.getLocalHost(), 0)); 73 } 74 75 87 public DefaultTcpTransportMapping(TcpAddress serverAddress) 88 throws UnknownHostException, IOException 89 { 90 super(serverAddress); 91 this.serverEnabled = true; 92 } 93 94 102 public synchronized void listen() throws java.io.IOException { 103 if (server != null) { 104 throw new SocketException("Port already listening"); 105 } 106 server = new ServerThread(); 107 if (connectionTimeout > 0) { 108 socketCleaner = new Timer(true); } 110 server.setDaemon(true); 111 server.start(); 112 } 113 114 124 public void setPriority(int newPriority) { 125 ServerThread st = server; 126 if (st != null) { 127 st.setPriority(newPriority); 128 } 129 } 130 131 138 public int getPriority() { 139 ServerThread st = server; 140 if (st != null) { 141 return st.getPriority(); 142 } 143 else { 144 return Thread.NORM_PRIORITY; 145 } 146 } 147 148 157 public void setThreadName(String name) { 158 ServerThread st = server; 159 if (st != null) { 160 st.setName(name); 161 } 162 } 163 164 170 public String getThreadName() { 171 ServerThread st = server; 172 if (st != null) { 173 return st.getName(); 174 } 175 else { 176 return null; 177 } 178 } 179 180 184 public void close() { 185 ServerThread st = server; 186 if (st != null) { 187 st.close(); 188 try { 189 st.join(); 190 } 191 catch (InterruptedException ex) { 192 logger.warn(ex); 193 } 194 server = null; 195 for (Iterator it = sockets.values().iterator(); it.hasNext(); ) { 196 SocketEntry entry = (SocketEntry)it.next(); 197 try { 198 synchronized (entry) { 199 entry.getSocket().close(); 200 } 201 logger.debug("Socket to "+entry.getPeerAddress()+" closed"); 202 } 203 catch (IOException iox) { 204 logger.debug(iox); 206 } 207 } 208 if (socketCleaner != null) { 209 socketCleaner.cancel(); 210 } 211 socketCleaner = null; 212 } 213 } 214 215 229 public synchronized boolean close(Address remoteAddress) throws IOException { 230 if (logger.isDebugEnabled()) { 231 logger.debug("Closing socket for peer address "+remoteAddress); 232 } 233 SocketEntry entry = (SocketEntry) sockets.remove(remoteAddress); 234 if (entry != null) { 235 synchronized (entry) { 236 entry.getSocket().close(); 237 } 238 logger.info("Socket to "+entry.getPeerAddress()+" closed"); 239 return true; 240 } 241 return false; 242 } 243 244 253 public void sendMessage(Address address, byte[] message) 254 throws java.io.IOException 255 { 256 if (server == null) { 257 listen(); 258 } 259 server.sendMessage(address, message); 260 } 261 262 268 public long getConnectionTimeout() { 269 return connectionTimeout; 270 } 271 272 280 public void setConnectionTimeout(long connectionTimeout) { 281 this.connectionTimeout = connectionTimeout; 282 } 283 284 288 public boolean isServerEnabled() { 289 return serverEnabled; 290 } 291 292 public MessageLengthDecoder getMessageLengthDecoder() { 293 return messageLengthDecoder; 294 } 295 296 305 public void setServerEnabled(boolean serverEnabled) { 306 this.serverEnabled = serverEnabled; 307 } 308 309 317 public void setMessageLengthDecoder(MessageLengthDecoder messageLengthDecoder) { 318 if (messageLengthDecoder == null) { 319 throw new NullPointerException (); 320 } 321 this.messageLengthDecoder = messageLengthDecoder; 322 } 323 324 331 public int getMaxInboundMessageSize() { 332 return super.getMaxInboundMessageSize(); 333 } 334 335 342 public void setMaxInboundMessageSize(int maxInboundMessageSize) { 343 this.maxInboundMessageSize = maxInboundMessageSize; 344 } 345 346 347 private synchronized void timeoutSocket(SocketEntry entry) { 348 if (connectionTimeout > 0) { 349 socketCleaner.schedule(new SocketTimeout(entry), connectionTimeout); 350 } 351 } 352 353 public boolean isListening() { 354 return (server != null); 355 } 356 357 class SocketEntry { 358 private Socket socket; 359 private TcpAddress peerAddress; 360 private long lastUse; 361 private LinkedList message = new LinkedList(); 362 private ByteBuffer readBuffer = null; 363 364 public SocketEntry(TcpAddress address, Socket socket) { 365 this.peerAddress = address; 366 this.socket = socket; 367 this.lastUse = System.currentTimeMillis(); 368 } 369 370 public long getLastUse() { 371 return lastUse; 372 } 373 374 public void used() { 375 lastUse = System.currentTimeMillis(); 376 } 377 378 public Socket getSocket() { 379 return socket; 380 } 381 382 public TcpAddress getPeerAddress() { 383 return peerAddress; 384 } 385 386 public synchronized void addMessage(byte[] message) { 387 this.message.add(message); 388 } 389 390 public byte[] nextMessage() { 391 if (this.message.size() > 0) { 392 return (byte[])this.message.removeFirst(); 393 } 394 return null; 395 } 396 397 public void setReadBuffer(ByteBuffer byteBuffer) { 398 this.readBuffer = byteBuffer; 399 } 400 401 public ByteBuffer getReadBuffer() { 402 return readBuffer; 403 } 404 405 public String toString() { 406 return "SocketEntry[peerAddress="+peerAddress+ 407 ",socket="+socket+",lastUse="+new Date(lastUse)+"]"; 408 } 409 } 410 411 public static class SnmpMesssageLengthDecoder implements MessageLengthDecoder { 412 public int getMinHeaderLength() { 413 return MIN_SNMP_HEADER_LENGTH; 414 } 415 public MessageLength getMessageLength(ByteBuffer buf) throws IOException { 416 MutableByte type = new MutableByte(); 417 BERInputStream is = new BERInputStream(buf); 418 int ml = BER.decodeHeader(is, type); 419 int hl = (int)is.getPosition(); 420 MessageLength messageLength = new MessageLength(hl, ml); 421 return messageLength; 422 } 423 } 424 425 class SocketTimeout extends TimerTask { 426 private SocketEntry entry; 427 428 public SocketTimeout(SocketEntry entry) { 429 this.entry = entry; 430 } 431 432 435 public void run() { 436 long now = System.currentTimeMillis(); 437 if ((socketCleaner == null) || 438 (now - entry.getLastUse() >= connectionTimeout)) { 439 if (logger.isDebugEnabled()) { 440 logger.debug("Socket has not been used for "+ 441 (now - entry.getLastUse())+ 442 " micro seconds, closing it"); 443 } 444 sockets.remove(entry.getPeerAddress()); 445 try { 446 synchronized (entry) { 447 entry.getSocket().close(); 448 } 449 logger.info("Socket to "+entry.getPeerAddress()+ 450 " closed due to timeout"); 451 } 452 catch (IOException ex) { 453 logger.error(ex); 454 } 455 } 456 else { 457 if (logger.isDebugEnabled()) { 458 logger.debug("Scheduling " + 459 ((entry.getLastUse() + connectionTimeout) - now)); 460 } 461 socketCleaner.schedule(new SocketTimeout(entry), 462 (entry.getLastUse() + connectionTimeout) - now); 463 } 464 } 465 } 466 467 class ServerThread extends Thread { 468 private byte[] buf; 469 private volatile boolean stop = false; 470 private Throwable lastError = null; 471 private ServerSocketChannel ssc; 472 private Selector selector; 473 474 private LinkedList pending = new LinkedList(); 475 476 public ServerThread() throws IOException { 477 setName("DefaultTCPTransportMapping_"+getAddress()); 478 buf = new byte[getMaxInboundMessageSize()]; 479 selector = Selector.open(); 481 482 if (serverEnabled) { 483 ssc = ServerSocketChannel.open(); 485 ssc.configureBlocking(false); 486 487 InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(), 489 tcpAddress.getPort()); 490 ssc.socket().bind(isa); 491 ssc.register(selector, SelectionKey.OP_ACCEPT); 496 } 497 } 498 499 private void processPending() { 500 synchronized (pending) { 501 for (int i=0; i<pending.size(); i++) { 502 SocketEntry entry = (SocketEntry)pending.getFirst(); 503 try { 504 if (entry.getSocket().isConnected()) { 510 entry.getSocket().getChannel().register(selector, 511 SelectionKey.OP_WRITE); 512 } 513 else { 514 entry.getSocket().getChannel().register(selector, 515 SelectionKey.OP_CONNECT); 516 } 517 518 } 519 catch (IOException iox) { 520 logger.error(iox); 521 try { 524 entry.getSocket().getChannel().close(); 525 TransportStateEvent e = 526 new TransportStateEvent(DefaultTcpTransportMapping.this, 527 entry.getPeerAddress(), 528 TransportStateEvent.STATE_CLOSED, 529 iox); 530 fireConnectionStateChanged(e); 531 } 532 catch (IOException ex) { 533 logger.error(ex); 534 } 535 lastError = iox; 536 if (SNMP4JSettings.isFowardRuntimeExceptions()) { 537 throw new RuntimeException (iox); 538 } 539 } 540 } 541 } 542 } 543 544 public Throwable getLastError() { 545 return lastError; 546 } 547 548 public void sendMessage(Address address, byte[] message) 549 throws java.io.IOException 550 { 551 Socket s = null; 552 SocketEntry entry = (SocketEntry) sockets.get(address); 553 if (logger.isDebugEnabled()) { 554 logger.debug("Looking up connection for destination '"+address+ 555 "' returned: "+entry); 556 logger.debug(sockets.toString()); 557 } 558 if (entry != null) { 559 s = entry.getSocket(); 560 } 561 if ((s == null) || (s.isClosed()) || (!s.isConnected())) { 562 if (logger.isDebugEnabled()) { 563 logger.debug("Socket for address '"+address+ 564 "' is closed, opening it..."); 565 } 566 SocketChannel sc = null; 567 try { 568 sc = SocketChannel.open(); 570 sc.configureBlocking(false); 571 sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(), 572 ((TcpAddress)address).getPort())); 573 s = sc.socket(); 574 entry = new SocketEntry((TcpAddress)address, s); 575 entry.addMessage(message); 576 sockets.put(address, entry); 577 578 synchronized (pending) { 579 pending.add(entry); 580 } 581 582 selector.wakeup(); 583 logger.debug("Trying to connect to "+address); 584 } 585 catch (IOException iox) { 586 logger.error(iox); 587 throw iox; 588 } 589 } 590 else { 591 entry.addMessage(message); 592 synchronized (pending) { 593 pending.add(entry); 594 } 595 selector.wakeup(); 596 } 597 } 598 599 600 public void run() { 601 try { 605 while (!stop) { 606 try { 607 if (selector.select() > 0) { 608 if (stop) { 609 break; 610 } 611 Set readyKeys = selector.selectedKeys(); 613 Iterator it = readyKeys.iterator(); 614 615 while (it.hasNext()) { 617 SelectionKey sk = (SelectionKey) it.next(); 618 it.remove(); 619 SocketChannel readChannel = null; 620 TcpAddress incomingAddress = null; 621 if (sk.isAcceptable()) { 622 ServerSocketChannel nextReady = 625 (ServerSocketChannel) sk.channel(); 626 Socket s = nextReady.accept().socket(); 627 readChannel = s.getChannel(); 628 readChannel.configureBlocking(false); 629 readChannel.register(selector, 630 SelectionKey.OP_READ); 631 632 incomingAddress = new TcpAddress(s.getInetAddress(), 633 s.getPort()); 634 SocketEntry entry = new SocketEntry(incomingAddress, s); 635 sockets.put(incomingAddress, entry); 636 timeoutSocket(entry); 637 TransportStateEvent e = 638 new TransportStateEvent(DefaultTcpTransportMapping.this, 639 incomingAddress, 640 TransportStateEvent. 641 STATE_CONNECTED, 642 null); 643 fireConnectionStateChanged(e); 644 if (e.isCancelled()) { 645 logger.warn("Incoming connection cancelled"); 646 s.close(); 647 sockets.remove(incomingAddress); 648 readChannel = null; 649 } 650 } 651 else if (sk.isReadable()) { 652 readChannel = (SocketChannel) sk.channel(); 653 incomingAddress = 654 new TcpAddress(readChannel.socket().getInetAddress(), 655 readChannel.socket().getPort()); 656 } 657 else if (sk.isWritable()) { 658 try { 659 SocketChannel sc = (SocketChannel) sk.channel(); 660 SocketEntry entry; 661 synchronized (pending) { 662 try { 663 entry = (SocketEntry) pending.removeFirst(); 664 } 665 catch (NoSuchElementException nsex) { 666 entry = null; 668 } 669 } 670 if (entry != null) { 671 writeMessage(entry, sc); 672 } 673 } 674 catch (IOException iox) { 675 if (logger.isDebugEnabled()) { 676 iox.printStackTrace(); 677 } 678 logger.warn(iox); 679 TransportStateEvent e = 680 new TransportStateEvent(DefaultTcpTransportMapping.this, 681 incomingAddress, 682 TransportStateEvent. 683 STATE_DISCONNECTED_REMOTELY, 684 iox); 685 fireConnectionStateChanged(e); 686 sk.cancel(); 687 } 688 } 689 else if (sk.isConnectable()) { 690 try { 691 SocketEntry entry; 692 synchronized (pending) { 693 try { 694 entry = (SocketEntry) pending.getFirst(); 695 if (entry != null) { 696 SocketChannel sc = (SocketChannel) sk.channel(); 697 if ((!sc.isConnected()) && (sc.finishConnect())) { 698 sc.configureBlocking(false); 699 logger.debug("Connected to " + entry.getPeerAddress()); 700 timeoutSocket(entry); 703 sc.register(selector, 704 SelectionKey.OP_WRITE); 705 } 706 } 707 } 708 catch (NoSuchElementException nsex) { 709 entry = null; 711 } 712 } 713 if (entry != null) { 714 TransportStateEvent e = 715 new TransportStateEvent(DefaultTcpTransportMapping.this, 716 incomingAddress, 717 TransportStateEvent. 718 STATE_CONNECTED, 719 null); 720 fireConnectionStateChanged(e); 721 } 722 else { 723 logger.warn("Message not found on finish connection"); 724 } 725 } 726 catch (IOException iox) { 727 if (logger.isDebugEnabled()) { 728 iox.printStackTrace(); 729 } 730 logger.warn(iox); 731 sk.cancel(); 732 } 733 } 734 735 if (readChannel != null) { 736 try { 737 readMessage(sk, readChannel, incomingAddress); 738 } 739 catch (IOException iox) { 740 if (logger.isDebugEnabled()) { 742 iox.printStackTrace(); 743 } 744 logger.warn(iox); 745 sk.cancel(); 746 readChannel.close(); 747 TransportStateEvent e = 748 new TransportStateEvent(DefaultTcpTransportMapping.this, 749 incomingAddress, 750 TransportStateEvent. 751 STATE_DISCONNECTED_REMOTELY, 752 iox); 753 fireConnectionStateChanged(e); 754 } 755 } 756 } 757 } 758 } 759 catch (NullPointerException npex) { 760 npex.printStackTrace(); 762 logger.warn("NullPointerException within select()?"); 763 } 764 processPending(); 765 } 766 if (ssc != null) { 767 ssc.close(); 768 } 769 if (selector != null) { 770 selector.close(); 771 } 772 } 773 catch (IOException iox) { 774 logger.error(iox); 775 lastError = iox; 776 } 777 if (!stop) { 778 stop = true; 779 synchronized (DefaultTcpTransportMapping.this) { 780 server = null; 781 } 782 } 783 } 784 785 private void readMessage(SelectionKey sk, SocketChannel readChannel, 786 TcpAddress incomingAddress) throws IOException { 787 SocketEntry entry = (SocketEntry) sockets.get(incomingAddress); 789 if (entry != null) { 790 entry.used(); 791 ByteBuffer readBuffer = entry.getReadBuffer(); 792 if (readBuffer != null) { 793 readChannel.read(readBuffer); 794 if (readBuffer.hasRemaining()) { 795 readChannel.register(selector, 796 SelectionKey.OP_READ, 797 entry); 798 } 799 else { 800 dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity()); 801 } 802 return; 803 } 804 } 805 ByteBuffer byteBuffer = ByteBuffer.wrap(buf); 806 byteBuffer.limit(messageLengthDecoder.getMinHeaderLength()); 807 long bytesRead = readChannel.read(byteBuffer); 808 if (logger.isDebugEnabled()) { 809 logger.debug("Reading header "+bytesRead+" bytes from " + 810 incomingAddress); 811 } 812 MessageLength messageLength = new MessageLength(0, Integer.MIN_VALUE); 813 if (bytesRead == messageLengthDecoder.getMinHeaderLength()) { 814 messageLength = 815 messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf)); 816 if (logger.isDebugEnabled()) { 817 logger.debug("Message length is "+messageLength); 818 } 819 if ((messageLength.getMessageLength() > getMaxInboundMessageSize()) || 820 (messageLength.getMessageLength() <= 0)) { 821 logger.error("Received message length "+messageLength+ 822 " is greater than inboundBufferSize "+ 823 getMaxInboundMessageSize()); 824 synchronized(entry) { 825 entry.getSocket().close(); 826 logger.info("Socket to "+entry.getPeerAddress()+ 827 " closed due to an error"); 828 } 829 } 830 else { 831 byteBuffer.limit(messageLength.getMessageLength()); 832 bytesRead += readChannel.read(byteBuffer); 833 if (bytesRead == messageLength.getMessageLength()) { 834 dispatchMessage(incomingAddress, byteBuffer, bytesRead); 835 } 836 else { 837 byte[] message = new byte[byteBuffer.limit()]; 838 byteBuffer.flip(); 839 byteBuffer.get(message, 0, 840 byteBuffer.limit() - byteBuffer.remaining()); 841 entry.setReadBuffer(ByteBuffer.wrap(message)); 842 } 843 readChannel.register(selector, 844 SelectionKey.OP_READ, 845 entry); 846 } 847 } 848 else if (bytesRead < 0) { 849 logger.debug("Socket closed remotely"); 850 sk.cancel(); 851 readChannel.close(); 852 TransportStateEvent e = 853 new TransportStateEvent(DefaultTcpTransportMapping.this, 854 incomingAddress, 855 TransportStateEvent. 856 STATE_DISCONNECTED_REMOTELY, 857 null); 858 fireConnectionStateChanged(e); 859 } 860 } 861 862 private void dispatchMessage(TcpAddress incomingAddress, 863 ByteBuffer byteBuffer, long bytesRead) { 864 byteBuffer.flip(); 865 if (logger.isDebugEnabled()) { 866 logger.debug("Received message from " + incomingAddress + 867 " with length " + bytesRead + ": " + 868 new OctetString(byteBuffer.array(), 0, 869 (int)bytesRead).toHexString()); 870 } 871 ByteBuffer bis; 872 if (isAsyncMsgProcessingSupported()) { 873 byte[] bytes = new byte[(int)bytesRead]; 874 System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead); 875 bis = ByteBuffer.wrap(bytes); 876 } 877 else { 878 bis = ByteBuffer.wrap(byteBuffer.array(), 879 0, (int) bytesRead); 880 } 881 fireProcessMessage(incomingAddress, bis); 882 } 883 884 private void writeMessage(SocketEntry entry, SocketChannel sc) throws 885 IOException { 886 byte[] message = entry.nextMessage(); 887 if (message != null) { 888 ByteBuffer buffer = ByteBuffer.wrap(message); 889 sc.write(buffer); 890 if (logger.isDebugEnabled()) { 891 logger.debug("Send message with length " + 892 message.length + " to " + 893 entry.getPeerAddress() + ": " + 894 new OctetString(message).toHexString()); 895 } 896 sc.register(selector, SelectionKey.OP_READ); 897 } 898 } 899 900 public void close() { 901 stop = true; 902 ServerThread st = server; 903 if (st != null) { 904 st.interrupt(); 905 } 906 } 907 } 908 909 } 910 | Popular Tags |