KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > snmp4j > transport > DefaultTcpTransportMapping


1 /*_############################################################################
2   _##
3   _## SNMP4J - DefaultTcpTransportMapping.java
4   _##
5   _## Copyright 2003-2007 Frank Fock and Jochen Katz (SNMP4J.org)
6   _##
7   _## Licensed under the Apache License, Version 2.0 (the "License");
8   _## you may not use this file except in compliance with the License.
9   _## You may obtain a copy of the License at
10   _##
11   _## http://www.apache.org/licenses/LICENSE-2.0
12   _##
13   _## Unless required by applicable law or agreed to in writing, software
14   _## distributed under the License is distributed on an "AS IS" BASIS,
15   _## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   _## See the License for the specific language governing permissions and
17   _## limitations under the License.
18   _##
19   _##########################################################################*/

20
21 package org.snmp4j.transport;
22
23 import java.io.*;
24 import java.net.*;
25 import java.nio.*;
26 import java.nio.channels.*;
27 import java.util.*;
28
29 import org.snmp4j.asn1.*;
30 import org.snmp4j.asn1.BER.*;
31 import org.snmp4j.log.*;
32 import org.snmp4j.smi.*;
33 import org.snmp4j.SNMP4JSettings;
34
35 /**
36  * The <code>DefaultTcpTransportMapping</code> implements a TCP transport
37  * mapping with the Java 1.4 new IO API.
38  * <p>
39  * It uses a single thread for processing incoming and outgoing messages.
40  * The thread is started when the <code>listen</code> method is called, or
41  * when an outgoing request is sent using the <code>sendMessage</code> method.
42  *
43  *
44  * @author Frank Fock
45  * @version 1.7.4a
46  */

47 public class DefaultTcpTransportMapping extends TcpTransportMapping {
48
49   private static final LogAdapter logger =
50       LogFactory.getLogger(DefaultTcpTransportMapping.class);
51
52   private Map sockets = new Hashtable();
53   private ServerThread server;
54
55   private Timer socketCleaner;
56   // 1 minute default timeout
57
private long connectionTimeout = 60000;
58   private boolean serverEnabled = false;
59
60   private static final int MIN_SNMP_HEADER_LENGTH = 6;
61   private MessageLengthDecoder messageLengthDecoder =
62       new SnmpMesssageLengthDecoder();
63
64   /**
65    * Creates a default TCP transport mapping with the server for incoming
66    * messages disabled.
67    * @throws UnknownHostException
68    * @throws IOException
69    * on failure of binding a local port.
70    */

71   public DefaultTcpTransportMapping() throws UnknownHostException, IOException {
72     super(new TcpAddress(InetAddress.getLocalHost(), 0));
73   }
74
75   /**
76    * Creates a default TCP transport mapping that binds to the given address
77    * (interface) on the local host.
78    *
79    * @param serverAddress
80    * the TcpAddress instance that describes the server address to listen
81    * on incoming connection requests.
82    * @throws UnknownHostException
83    * if the specified interface does not exist.
84    * @throws IOException
85    * if the given address cannot be bound.
86    */

87   public DefaultTcpTransportMapping(TcpAddress serverAddress)
88       throws UnknownHostException, IOException
89   {
90     super(serverAddress);
91     this.serverEnabled = true;
92   }
93
94   /**
95    * Listen for incoming and outgoing requests. If the <code>serverEnabled</code>
96    * member is <code>false</code> the server for incoming requests is not
97    * started. This starts the internal server thread that processes messages.
98    * @throws SocketException
99    * when the transport is already listening for incoming/outgoing messages.
100    * @throws IOException
101    */

102   public synchronized void listen() throws java.io.IOException JavaDoc {
103     if (server != null) {
104       throw new SocketException("Port already listening");
105     }
106     server = new ServerThread();
107     if (connectionTimeout > 0) {
108       socketCleaner = new Timer(true); // run as daemon
109
}
110     server.setDaemon(true);
111     server.start();
112   }
113
114   /**
115    * Changes the priority of the server thread for this TCP transport mapping.
116    * This method has no effect, if called before {@link #listen()} has been
117    * called for this transport mapping.
118    *
119    * @param newPriority
120    * the new priority.
121    * @see Thread#setPriority
122    * @since 1.2.2
123    */

124   public void setPriority(int newPriority) {
125     ServerThread st = server;
126     if (st != null) {
127       st.setPriority(newPriority);
128     }
129   }
130
131   /**
132    * Returns the priority of the internal listen thread.
133    * @return
134    * a value between {@link Thread#MIN_PRIORITY} and
135    * {@link Thread#MAX_PRIORITY}.
136    * @since 1.2.2
137    */

138   public int getPriority() {
139     ServerThread st = server;
140     if (st != null) {
141       return st.getPriority();
142     }
143     else {
144       return Thread.NORM_PRIORITY;
145     }
146   }
147
148   /**
149    * Sets the name of the listen thread for this UDP transport mapping.
150    * This method has no effect, if called before {@link #listen()} has been
151    * called for this transport mapping.
152    *
153    * @param name
154    * the new thread name.
155    * @since 1.6
156    */

157   public void setThreadName(String JavaDoc name) {
158     ServerThread st = server;
159     if (st != null) {
160       st.setName(name);
161     }
162   }
163
164   /**
165    * Returns the name of the listen thread.
166    * @return
167    * the thread name if in listening mode, otherwise <code>null</code>.
168    * @since 1.6
169    */

170   public String JavaDoc getThreadName() {
171     ServerThread st = server;
172     if (st != null) {
173       return st.getName();
174     }
175     else {
176       return null;
177     }
178   }
179
180   /**
181    * Closes all open sockets and stops the internal server thread that
182    * processes messages.
183    */

184   public void close() {
185     ServerThread st = server;
186     if (st != null) {
187       st.close();
188       try {
189         st.join();
190       }
191       catch (InterruptedException JavaDoc ex) {
192         logger.warn(ex);
193       }
194       server = null;
195       for (Iterator it = sockets.values().iterator(); it.hasNext(); ) {
196         SocketEntry entry = (SocketEntry)it.next();
197         try {
198           synchronized (entry) {
199              entry.getSocket().close();
200           }
201           logger.debug("Socket to "+entry.getPeerAddress()+" closed");
202         }
203         catch (IOException iox) {
204           // ingore
205
logger.debug(iox);
206         }
207       }
208       if (socketCleaner != null) {
209         socketCleaner.cancel();
210       }
211       socketCleaner = null;
212     }
213   }
214
215   /**
216    * Closes a connection to the supplied remote address, if it is open. This
217    * method is particularly useful when not using a timeout for remote
218    * connections.
219    *
220    * @param remoteAddress
221    * the address of the peer socket.
222    * @return
223    * <code>true</code> if the connection has been closed and
224    * <code>false</code> if there was nothing to close.
225    * @throws IOException
226    * if the remote address cannot be closed due to an IO exception.
227    * @since 1.7.1
228    */

229   public synchronized boolean close(Address remoteAddress) throws IOException {
230     if (logger.isDebugEnabled()) {
231       logger.debug("Closing socket for peer address "+remoteAddress);
232     }
233     SocketEntry entry = (SocketEntry) sockets.remove(remoteAddress);
234     if (entry != null) {
235       synchronized (entry) {
236         entry.getSocket().close();
237       }
238       logger.info("Socket to "+entry.getPeerAddress()+" closed");
239       return true;
240     }
241     return false;
242   }
243
244   /**
245    * Sends a SNMP message to the supplied address.
246    * @param address
247    * an <code>TcpAddress</code>. A <code>ClassCastException</code> is thrown
248    * if <code>address</code> is not a <code>TcpAddress</code> instance.
249    * @param message byte[]
250    * the message to sent.
251    * @throws IOException
252    */

253   public void sendMessage(Address address, byte[] message)
254       throws java.io.IOException JavaDoc
255   {
256     if (server == null) {
257       listen();
258     }
259     server.sendMessage(address, message);
260   }
261
262   /**
263    * Gets the connection timeout. This timeout specifies the time a connection
264    * may be idle before it is closed.
265    * @return long
266    * the idle timeout in milliseconds.
267    */

268   public long getConnectionTimeout() {
269     return connectionTimeout;
270   }
271
272   /**
273    * Sets the connection timeout. This timeout specifies the time a connection
274    * may be idle before it is closed.
275    * @param connectionTimeout
276    * the idle timeout in milliseconds. A zero or negative value will disable
277    * any timeout and connections opened by this transport mapping will stay
278    * opened until they are explicitly closed.
279    */

280   public void setConnectionTimeout(long connectionTimeout) {
281     this.connectionTimeout = connectionTimeout;
282   }
283
284   /**
285    * Checks whether a server for incoming requests is enabled.
286    * @return boolean
287    */

288   public boolean isServerEnabled() {
289     return serverEnabled;
290   }
291
292   public MessageLengthDecoder getMessageLengthDecoder() {
293     return messageLengthDecoder;
294   }
295
296   /**
297    * Sets whether a server for incoming requests should be created when
298    * the transport is set into listen state. Setting this value has no effect
299    * until the {@link #listen()} method is called (if the transport is already
300    * listening, {@link #close()} has to be called before).
301    * @param serverEnabled
302    * if <code>true</code> if the transport will listens for incoming
303    * requests after {@link #listen()} has been called.
304    */

305   public void setServerEnabled(boolean serverEnabled) {
306     this.serverEnabled = serverEnabled;
307   }
308
309   /**
310    * Sets the message length decoder. Default message length decoder is the
311    * {@link SnmpMesssageLengthDecoder}. The message length decoder must be
312    * able to decode the total length of a message for this transport mapping
313    * protocol(s).
314    * @param messageLengthDecoder
315    * a <code>MessageLengthDecoder</code> instance.
316    */

317   public void setMessageLengthDecoder(MessageLengthDecoder messageLengthDecoder) {
318     if (messageLengthDecoder == null) {
319       throw new NullPointerException JavaDoc();
320     }
321     this.messageLengthDecoder = messageLengthDecoder;
322   }
323
324   /**
325    * Gets the inbound buffer size for incoming requests. When SNMP packets are
326    * received that are longer than this maximum size, the messages will be
327    * silently dropped and the connection will be closed.
328    * @return
329    * the maximum inbound buffer size in bytes.
330    */

331   public int getMaxInboundMessageSize() {
332     return super.getMaxInboundMessageSize();
333   }
334
335   /**
336    * Sets the maximum buffer size for incoming requests. When SNMP packets are
337    * received that are longer than this maximum size, the messages will be
338    * silently dropped and the connection will be closed.
339    * @param maxInboundMessageSize
340    * the length of the inbound buffer in bytes.
341    */

342   public void setMaxInboundMessageSize(int maxInboundMessageSize) {
343     this.maxInboundMessageSize = maxInboundMessageSize;
344   }
345
346
347   private synchronized void timeoutSocket(SocketEntry entry) {
348     if (connectionTimeout > 0) {
349       socketCleaner.schedule(new SocketTimeout(entry), connectionTimeout);
350     }
351   }
352
353   public boolean isListening() {
354     return (server != null);
355   }
356
357   class SocketEntry {
358     private Socket socket;
359     private TcpAddress peerAddress;
360     private long lastUse;
361     private LinkedList message = new LinkedList();
362     private ByteBuffer readBuffer = null;
363
364     public SocketEntry(TcpAddress address, Socket socket) {
365       this.peerAddress = address;
366       this.socket = socket;
367       this.lastUse = System.currentTimeMillis();
368     }
369
370     public long getLastUse() {
371       return lastUse;
372     }
373
374     public void used() {
375       lastUse = System.currentTimeMillis();
376     }
377
378     public Socket getSocket() {
379       return socket;
380     }
381
382     public TcpAddress getPeerAddress() {
383       return peerAddress;
384     }
385
386     public synchronized void addMessage(byte[] message) {
387       this.message.add(message);
388     }
389
390     public byte[] nextMessage() {
391       if (this.message.size() > 0) {
392         return (byte[])this.message.removeFirst();
393       }
394       return null;
395     }
396
397     public void setReadBuffer(ByteBuffer byteBuffer) {
398       this.readBuffer = byteBuffer;
399     }
400
401     public ByteBuffer getReadBuffer() {
402       return readBuffer;
403     }
404
405     public String JavaDoc toString() {
406       return "SocketEntry[peerAddress="+peerAddress+
407           ",socket="+socket+",lastUse="+new Date(lastUse)+"]";
408     }
409   }
410
411   public static class SnmpMesssageLengthDecoder implements MessageLengthDecoder {
412     public int getMinHeaderLength() {
413       return MIN_SNMP_HEADER_LENGTH;
414     }
415     public MessageLength getMessageLength(ByteBuffer buf) throws IOException {
416       MutableByte type = new MutableByte();
417       BERInputStream is = new BERInputStream(buf);
418       int ml = BER.decodeHeader(is, type);
419       int hl = (int)is.getPosition();
420       MessageLength messageLength = new MessageLength(hl, ml);
421       return messageLength;
422     }
423   }
424
425   class SocketTimeout extends TimerTask {
426     private SocketEntry entry;
427
428     public SocketTimeout(SocketEntry entry) {
429       this.entry = entry;
430     }
431
432     /**
433      * run
434      */

435     public void run() {
436       long now = System.currentTimeMillis();
437       if ((socketCleaner == null) ||
438           (now - entry.getLastUse() >= connectionTimeout)) {
439         if (logger.isDebugEnabled()) {
440           logger.debug("Socket has not been used for "+
441                        (now - entry.getLastUse())+
442                        " micro seconds, closing it");
443         }
444         sockets.remove(entry.getPeerAddress());
445         try {
446           synchronized (entry) {
447             entry.getSocket().close();
448           }
449           logger.info("Socket to "+entry.getPeerAddress()+
450                       " closed due to timeout");
451         }
452         catch (IOException ex) {
453           logger.error(ex);
454         }
455       }
456       else {
457         if (logger.isDebugEnabled()) {
458           logger.debug("Scheduling " +
459                        ((entry.getLastUse() + connectionTimeout) - now));
460         }
461         socketCleaner.schedule(new SocketTimeout(entry),
462                                (entry.getLastUse() + connectionTimeout) - now);
463       }
464     }
465   }
466
467   class ServerThread extends Thread JavaDoc {
468     private byte[] buf;
469     private volatile boolean stop = false;
470     private Throwable JavaDoc lastError = null;
471     private ServerSocketChannel ssc;
472     private Selector selector;
473
474     private LinkedList pending = new LinkedList();
475
476     public ServerThread() throws IOException {
477       setName("DefaultTCPTransportMapping_"+getAddress());
478       buf = new byte[getMaxInboundMessageSize()];
479       // Selector for incoming requests
480
selector = Selector.open();
481
482       if (serverEnabled) {
483         // Create a new server socket and set to non blocking mode
484
ssc = ServerSocketChannel.open();
485         ssc.configureBlocking(false);
486
487         // Bind the server socket
488
InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
489             tcpAddress.getPort());
490         ssc.socket().bind(isa);
491         // Register accepts on the server socket with the selector. This
492
// step tells the selector that the socket wants to be put on the
493
// ready list when accept operations occur, so allowing multiplexed
494
// non-blocking I/O to take place.
495
ssc.register(selector, SelectionKey.OP_ACCEPT);
496       }
497     }
498
499     private void processPending() {
500       synchronized (pending) {
501         for (int i=0; i<pending.size(); i++) {
502           SocketEntry entry = (SocketEntry)pending.getFirst();
503           try {
504             // Register the channel with the selector, indicating
505
// interest in connection completion and attaching the
506
// target object so that we can get the target back
507
// after the key is added to the selector's
508
// selected-key set
509
if (entry.getSocket().isConnected()) {
510               entry.getSocket().getChannel().register(selector,
511                   SelectionKey.OP_WRITE);
512             }
513             else {
514               entry.getSocket().getChannel().register(selector,
515                                                       SelectionKey.OP_CONNECT);
516             }
517
518           }
519           catch (IOException iox) {
520             logger.error(iox);
521             // Something went wrong, so close the channel and
522
// record the failure
523
try {
524               entry.getSocket().getChannel().close();
525               TransportStateEvent e =
526                   new TransportStateEvent(DefaultTcpTransportMapping.this,
527                                           entry.getPeerAddress(),
528                                           TransportStateEvent.STATE_CLOSED,
529                                           iox);
530               fireConnectionStateChanged(e);
531             }
532             catch (IOException ex) {
533               logger.error(ex);
534             }
535             lastError = iox;
536             if (SNMP4JSettings.isFowardRuntimeExceptions()) {
537               throw new RuntimeException JavaDoc(iox);
538             }
539           }
540         }
541       }
542     }
543
544     public Throwable JavaDoc getLastError() {
545       return lastError;
546     }
547
548     public void sendMessage(Address address, byte[] message)
549         throws java.io.IOException JavaDoc
550     {
551       Socket s = null;
552       SocketEntry entry = (SocketEntry) sockets.get(address);
553       if (logger.isDebugEnabled()) {
554         logger.debug("Looking up connection for destination '"+address+
555                      "' returned: "+entry);
556         logger.debug(sockets.toString());
557       }
558       if (entry != null) {
559         s = entry.getSocket();
560       }
561       if ((s == null) || (s.isClosed()) || (!s.isConnected())) {
562         if (logger.isDebugEnabled()) {
563           logger.debug("Socket for address '"+address+
564                        "' is closed, opening it...");
565         }
566         SocketChannel sc = null;
567         try {
568             // Open the channel, set it to non-blocking, initiate connect
569
sc = SocketChannel.open();
570             sc.configureBlocking(false);
571             sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
572                                              ((TcpAddress)address).getPort()));
573             s = sc.socket();
574             entry = new SocketEntry((TcpAddress)address, s);
575             entry.addMessage(message);
576             sockets.put(address, entry);
577
578             synchronized (pending) {
579               pending.add(entry);
580             }
581
582             selector.wakeup();
583             logger.debug("Trying to connect to "+address);
584         }
585         catch (IOException iox) {
586           logger.error(iox);
587           throw iox;
588         }
589       }
590       else {
591         entry.addMessage(message);
592         synchronized (pending) {
593           pending.add(entry);
594         }
595         selector.wakeup();
596       }
597     }
598
599
600     public void run() {
601       // Here's where everything happens. The select method will
602
// return when any operations registered above have occurred, the
603
// thread has been interrupted, etc.
604
try {
605         while (!stop) {
606           try {
607             if (selector.select() > 0) {
608               if (stop) {
609                 break;
610               }
611               // Someone is ready for I/O, get the ready keys
612
Set readyKeys = selector.selectedKeys();
613               Iterator it = readyKeys.iterator();
614
615               // Walk through the ready keys collection and process date requests.
616
while (it.hasNext()) {
617                 SelectionKey sk = (SelectionKey) it.next();
618                 it.remove();
619                 SocketChannel readChannel = null;
620                 TcpAddress incomingAddress = null;
621                 if (sk.isAcceptable()) {
622                   // The key indexes into the selector so you
623
// can retrieve the socket that's ready for I/O
624
ServerSocketChannel nextReady =
625                       (ServerSocketChannel) sk.channel();
626                   Socket s = nextReady.accept().socket();
627                   readChannel = s.getChannel();
628                   readChannel.configureBlocking(false);
629                   readChannel.register(selector,
630                                        SelectionKey.OP_READ);
631
632                   incomingAddress = new TcpAddress(s.getInetAddress(),
633                       s.getPort());
634                   SocketEntry entry = new SocketEntry(incomingAddress, s);
635                   sockets.put(incomingAddress, entry);
636                   timeoutSocket(entry);
637                   TransportStateEvent e =
638                       new TransportStateEvent(DefaultTcpTransportMapping.this,
639                                               incomingAddress,
640                                               TransportStateEvent.
641                                               STATE_CONNECTED,
642                                               null);
643                   fireConnectionStateChanged(e);
644                   if (e.isCancelled()) {
645                     logger.warn("Incoming connection cancelled");
646                     s.close();
647                     sockets.remove(incomingAddress);
648                     readChannel = null;
649                   }
650                 }
651                 else if (sk.isReadable()) {
652                   readChannel = (SocketChannel) sk.channel();
653                   incomingAddress =
654                       new TcpAddress(readChannel.socket().getInetAddress(),
655                                      readChannel.socket().getPort());
656                 }
657                 else if (sk.isWritable()) {
658                   try {
659                     SocketChannel sc = (SocketChannel) sk.channel();
660                     SocketEntry entry;
661                     synchronized (pending) {
662                       try {
663                         entry = (SocketEntry) pending.removeFirst();
664                       }
665                       catch (NoSuchElementException nsex) {
666                         // ignore
667
entry = null;
668                       }
669                     }
670                     if (entry != null) {
671                       writeMessage(entry, sc);
672                     }
673                   }
674                   catch (IOException iox) {
675                     if (logger.isDebugEnabled()) {
676                       iox.printStackTrace();
677                     }
678                     logger.warn(iox);
679                     TransportStateEvent e =
680                         new TransportStateEvent(DefaultTcpTransportMapping.this,
681                                                 incomingAddress,
682                                                 TransportStateEvent.
683                                                 STATE_DISCONNECTED_REMOTELY,
684                                                 iox);
685                     fireConnectionStateChanged(e);
686                     sk.cancel();
687                   }
688                 }
689                 else if (sk.isConnectable()) {
690                   try {
691                     SocketEntry entry;
692                     synchronized (pending) {
693                       try {
694                         entry = (SocketEntry) pending.getFirst();
695                         if (entry != null) {
696                           SocketChannel sc = (SocketChannel) sk.channel();
697                           if ((!sc.isConnected()) && (sc.finishConnect())) {
698                             sc.configureBlocking(false);
699                             logger.debug("Connected to " + entry.getPeerAddress());
700                             // make sure conncetion is closed if not used for timeout
701
// micro seconds
702
timeoutSocket(entry);
703                             sc.register(selector,
704                                         SelectionKey.OP_WRITE);
705                           }
706                         }
707                       }
708                       catch (NoSuchElementException nsex) {
709                         // ignore
710
entry = null;
711                       }
712                     }
713                     if (entry != null) {
714                       TransportStateEvent e =
715                           new TransportStateEvent(DefaultTcpTransportMapping.this,
716                                                   incomingAddress,
717                                                   TransportStateEvent.
718                                                   STATE_CONNECTED,
719                                                   null);
720                       fireConnectionStateChanged(e);
721                     }
722                     else {
723                       logger.warn("Message not found on finish connection");
724                     }
725                   }
726                   catch (IOException iox) {
727                     if (logger.isDebugEnabled()) {
728                       iox.printStackTrace();
729                     }
730                     logger.warn(iox);
731                     sk.cancel();
732                   }
733                 }
734
735                 if (readChannel != null) {
736                   try {
737                     readMessage(sk, readChannel, incomingAddress);
738                   }
739                   catch (IOException iox) {
740                     // IO exception -> channel closed remotely
741
if (logger.isDebugEnabled()) {
742                       iox.printStackTrace();
743                     }
744                     logger.warn(iox);
745                     sk.cancel();
746                     readChannel.close();
747                     TransportStateEvent e =
748                         new TransportStateEvent(DefaultTcpTransportMapping.this,
749                                                 incomingAddress,
750                                                 TransportStateEvent.
751                                                 STATE_DISCONNECTED_REMOTELY,
752                                                 iox);
753                     fireConnectionStateChanged(e);
754                   }
755                 }
756               }
757             }
758           }
759           catch (NullPointerException JavaDoc npex) {
760             // There seems to happen a NullPointerException within the select()
761
npex.printStackTrace();
762             logger.warn("NullPointerException within select()?");
763           }
764           processPending();
765         }
766         if (ssc != null) {
767           ssc.close();
768         }
769         if (selector != null) {
770           selector.close();
771         }
772       }
773       catch (IOException iox) {
774         logger.error(iox);
775         lastError = iox;
776       }
777       if (!stop) {
778         stop = true;
779         synchronized (DefaultTcpTransportMapping.this) {
780           server = null;
781         }
782       }
783     }
784
785     private void readMessage(SelectionKey sk, SocketChannel readChannel,
786                              TcpAddress incomingAddress) throws IOException {
787       // note that socket has been used
788
SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
789       if (entry != null) {
790         entry.used();
791         ByteBuffer readBuffer = entry.getReadBuffer();
792         if (readBuffer != null) {
793           readChannel.read(readBuffer);
794           if (readBuffer.hasRemaining()) {
795             readChannel.register(selector,
796                                  SelectionKey.OP_READ,
797                                  entry);
798           }
799           else {
800             dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
801           }
802           return;
803         }
804       }
805       ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
806       byteBuffer.limit(messageLengthDecoder.getMinHeaderLength());
807       long bytesRead = readChannel.read(byteBuffer);
808       if (logger.isDebugEnabled()) {
809         logger.debug("Reading header "+bytesRead+" bytes from " +
810                      incomingAddress);
811       }
812       MessageLength messageLength = new MessageLength(0, Integer.MIN_VALUE);
813       if (bytesRead == messageLengthDecoder.getMinHeaderLength()) {
814         messageLength =
815             messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf));
816         if (logger.isDebugEnabled()) {
817           logger.debug("Message length is "+messageLength);
818         }
819         if ((messageLength.getMessageLength() > getMaxInboundMessageSize()) ||
820             (messageLength.getMessageLength() <= 0)) {
821           logger.error("Received message length "+messageLength+
822                        " is greater than inboundBufferSize "+
823                        getMaxInboundMessageSize());
824           synchronized(entry) {
825             entry.getSocket().close();
826             logger.info("Socket to "+entry.getPeerAddress()+
827                         " closed due to an error");
828           }
829         }
830         else {
831           byteBuffer.limit(messageLength.getMessageLength());
832           bytesRead += readChannel.read(byteBuffer);
833           if (bytesRead == messageLength.getMessageLength()) {
834             dispatchMessage(incomingAddress, byteBuffer, bytesRead);
835           }
836           else {
837             byte[] message = new byte[byteBuffer.limit()];
838             byteBuffer.flip();
839             byteBuffer.get(message, 0,
840                            byteBuffer.limit() - byteBuffer.remaining());
841             entry.setReadBuffer(ByteBuffer.wrap(message));
842           }
843           readChannel.register(selector,
844                                SelectionKey.OP_READ,
845                                entry);
846         }
847       }
848       else if (bytesRead < 0) {
849         logger.debug("Socket closed remotely");
850         sk.cancel();
851         readChannel.close();
852         TransportStateEvent e =
853             new TransportStateEvent(DefaultTcpTransportMapping.this,
854                                     incomingAddress,
855                                     TransportStateEvent.
856                                     STATE_DISCONNECTED_REMOTELY,
857                                     null);
858         fireConnectionStateChanged(e);
859       }
860     }
861
862     private void dispatchMessage(TcpAddress incomingAddress,
863                                  ByteBuffer byteBuffer, long bytesRead) {
864       byteBuffer.flip();
865       if (logger.isDebugEnabled()) {
866         logger.debug("Received message from " + incomingAddress +
867                      " with length " + bytesRead + ": " +
868                      new OctetString(byteBuffer.array(), 0,
869                                      (int)bytesRead).toHexString());
870       }
871       ByteBuffer bis;
872       if (isAsyncMsgProcessingSupported()) {
873         byte[] bytes = new byte[(int)bytesRead];
874         System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
875         bis = ByteBuffer.wrap(bytes);
876       }
877       else {
878         bis = ByteBuffer.wrap(byteBuffer.array(),
879                               0, (int) bytesRead);
880       }
881       fireProcessMessage(incomingAddress, bis);
882     }
883
884     private void writeMessage(SocketEntry entry, SocketChannel sc) throws
885         IOException {
886       byte[] message = entry.nextMessage();
887       if (message != null) {
888         ByteBuffer buffer = ByteBuffer.wrap(message);
889         sc.write(buffer);
890         if (logger.isDebugEnabled()) {
891           logger.debug("Send message with length " +
892                        message.length + " to " +
893                        entry.getPeerAddress() + ": " +
894                        new OctetString(message).toHexString());
895         }
896         sc.register(selector, SelectionKey.OP_READ);
897       }
898     }
899
900     public void close() {
901       stop = true;
902       ServerThread st = server;
903       if (st != null) {
904         st.interrupt();
905       }
906     }
907   }
908
909 }
910
Popular Tags