KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > ch > ethz > ssh2 > transport > TransportManager


1
2 package ch.ethz.ssh2.transport;
3
4 import java.io.IOException JavaDoc;
5 import java.io.InputStream JavaDoc;
6 import java.io.OutputStream JavaDoc;
7 import java.net.InetAddress JavaDoc;
8 import java.net.InetSocketAddress JavaDoc;
9 import java.net.Socket JavaDoc;
10 import java.net.UnknownHostException JavaDoc;
11 import java.security.SecureRandom JavaDoc;
12 import java.util.Vector JavaDoc;
13
14 import ch.ethz.ssh2.ConnectionInfo;
15 import ch.ethz.ssh2.ConnectionMonitor;
16 import ch.ethz.ssh2.DHGexParameters;
17 import ch.ethz.ssh2.HTTPProxyData;
18 import ch.ethz.ssh2.HTTPProxyException;
19 import ch.ethz.ssh2.ProxyData;
20 import ch.ethz.ssh2.ServerHostKeyVerifier;
21 import ch.ethz.ssh2.crypto.Base64;
22 import ch.ethz.ssh2.crypto.CryptoWishList;
23 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
24 import ch.ethz.ssh2.crypto.digest.MAC;
25 import ch.ethz.ssh2.log.Logger;
26 import ch.ethz.ssh2.packets.PacketDisconnect;
27 import ch.ethz.ssh2.packets.Packets;
28 import ch.ethz.ssh2.packets.TypesReader;
29 import ch.ethz.ssh2.util.Tokenizer;
30
31 /*
32  * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
33  * packets are allowed during kex exchange, on the other side we need to blindly
34  * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
35  * the next packet is not a channel data packet? Yes, we could check if it is in
36  * the KEX range. But the standard says nothing about this. The OpenSSH guys
37  * block local "normal" traffic during KEX. That's fine - however, they assume
38  * that the other side is doing the same. During re-key, if they receive traffic
39  * other than KEX, they become horribly irritated and kill the connection. Since
40  * we are very likely going to communicate with OpenSSH servers, we have to play
41  * the same game - even though we could do better.
42  *
43  * btw: having stdout and stderr on the same channel, with a shared window, is
44  * also a VERY good idea... =(
45  */

46
47 /**
48  * TransportManager.
49  *
50  * @author Christian Plattner, plattner@inf.ethz.ch
51  * @version $Id: TransportManager.java,v 1.16 2006/08/11 12:24:00 cplattne Exp $
52  */

53 public class TransportManager
54 {
55     private static final Logger log = Logger.getLogger(TransportManager.class);
56
57     class HandlerEntry
58     {
59         MessageHandler mh;
60         int low;
61         int high;
62     }
63
64     private final Vector JavaDoc asynchronousQueue = new Vector JavaDoc();
65     private Thread JavaDoc asynchronousThread = null;
66
67     class AsynchronousWorker extends Thread JavaDoc
68     {
69         public void run()
70         {
71             while (true)
72             {
73                 byte[] msg = null;
74
75                 synchronized (asynchronousQueue)
76                 {
77                     if (asynchronousQueue.size() == 0)
78                     {
79                         /* After the queue is empty for about 2 seconds, stop this thread */
80
81                         try
82                         {
83                             asynchronousQueue.wait(2000);
84                         }
85                         catch (InterruptedException JavaDoc e)
86                         {
87                             /* OKOK, if somebody interrupts us, then we may die earlier. */
88                         }
89
90                         if (asynchronousQueue.size() == 0)
91                         {
92                             asynchronousThread = null;
93                             return;
94                         }
95                     }
96
97                     msg = (byte[]) asynchronousQueue.remove(0);
98                 }
99
100                 /* The following invocation may throw an IOException.
101                  * There is no point in handling it - it simply means
102                  * that the connection has a problem and we should stop
103                  * sending asynchronously messages. We do not need to signal that
104                  * we have exited (asynchronousThread = null): further
105                  * messages in the queue cannot be sent by this or any
106                  * other thread.
107                  * Other threads will sooner or later (when receiving or
108                  * sending the next message) get the same IOException and
109                  * get to the same conclusion.
110                  */

111
112                 try
113                 {
114                     sendMessage(msg);
115                 }
116                 catch (IOException JavaDoc e)
117                 {
118                     return;
119                 }
120             }
121         }
122     }
123
124     String JavaDoc hostname;
125     int port;
126     final Socket JavaDoc sock = new Socket JavaDoc();
127
128     Object JavaDoc connectionSemaphore = new Object JavaDoc();
129
130     boolean flagKexOngoing = false;
131     boolean connectionClosed = false;
132
133     Throwable JavaDoc reasonClosedCause = null;
134
135     TransportConnection tc;
136     KexManager km;
137
138     Vector JavaDoc messageHandlers = new Vector JavaDoc();
139
140     Thread JavaDoc receiveThread;
141
142     Vector JavaDoc connectionMonitors = new Vector JavaDoc();
143     boolean monitorsWereInformed = false;
144
145     /**
146      * There were reports that there are JDKs which use
147      * the resolver even though one supplies a dotted IP
148      * address in the Socket constructor. That is why we
149      * try to generate the InetAdress "by hand".
150      *
151      * @param host
152      * @return the InetAddress
153      * @throws UnknownHostException
154      */

155     private InetAddress JavaDoc createInetAddress(String JavaDoc host) throws UnknownHostException JavaDoc
156     {
157         /* Check if it is a dotted IP4 address */
158
159         InetAddress JavaDoc addr = parseIPv4Address(host);
160
161         if (addr != null)
162             return addr;
163
164         return InetAddress.getByName(host);
165     }
166
167     private InetAddress JavaDoc parseIPv4Address(String JavaDoc host) throws UnknownHostException JavaDoc
168     {
169         if (host == null)
170             return null;
171
172         String JavaDoc[] quad = Tokenizer.parseTokens(host, '.');
173
174         if ((quad == null) || (quad.length != 4))
175             return null;
176
177         byte[] addr = new byte[4];
178
179         for (int i = 0; i < 4; i++)
180         {
181             int part = 0;
182
183             if ((quad[i].length() == 0) || (quad[i].length() > 3))
184                 return null;
185
186             for (int k = 0; k < quad[i].length(); k++)
187             {
188                 char c = quad[i].charAt(k);
189
190                 /* No, Character.isDigit is not the same */
191                 if ((c < '0') || (c > '9'))
192                     return null;
193
194                 part = part * 10 + (c - '0');
195             }
196
197             if (part > 255) /* 300.1.2.3 is invalid =) */
198                 return null;
199
200             addr[i] = (byte) part;
201         }
202
203         return InetAddress.getByAddress(host, addr);
204     }
205
206     public TransportManager(String JavaDoc host, int port) throws IOException JavaDoc
207     {
208         this.hostname = host;
209         this.port = port;
210     }
211
212     public int getPacketOverheadEstimate()
213     {
214         return tc.getPacketOverheadEstimate();
215     }
216
217     public void setTcpNoDelay(boolean state) throws IOException JavaDoc
218     {
219         sock.setTcpNoDelay(state);
220     }
221
222     public void setSoTimeout(int timeout) throws IOException JavaDoc
223     {
224         sock.setSoTimeout(timeout);
225     }
226
227     public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException JavaDoc
228     {
229         return km.getOrWaitForConnectionInfo(kexNumber);
230     }
231
232     public Throwable JavaDoc getReasonClosedCause()
233     {
234         synchronized (connectionSemaphore)
235         {
236             return reasonClosedCause;
237         }
238     }
239
240     public byte[] getSessionIdentifier()
241     {
242         return km.sessionId;
243     }
244
245     public void close(Throwable JavaDoc cause, boolean useDisconnectPacket)
246     {
247         if (useDisconnectPacket == false)
248         {
249             /* OK, hard shutdown - do not aquire the semaphore,
250              * perhaps somebody is inside (and waits until the remote
251              * side is ready to accept new data). */

252
253             try
254             {
255                 sock.close();
256             }
257             catch (IOException JavaDoc ignore)
258             {
259             }
260
261             /* OK, whoever tried to send data, should now agree that
262              * there is no point in further waiting =)
263              * It is safe now to aquire the semaphore.
264              */

265         }
266
267         synchronized (connectionSemaphore)
268         {
269             if (connectionClosed == false)
270             {
271                 if (useDisconnectPacket == true)
272                 {
273                     try
274                     {
275                         byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
276                                 .getPayload();
277                         if (tc != null)
278                             tc.sendMessage(msg);
279                     }
280                     catch (IOException JavaDoc ignore)
281                     {
282                     }
283
284                     try
285                     {
286                         sock.close();
287                     }
288                     catch (IOException JavaDoc ignore)
289                     {
290                     }
291                 }
292
293                 connectionClosed = true;
294                 reasonClosedCause = cause; /* may be null */
295             }
296             connectionSemaphore.notifyAll();
297         }
298
299         /* No check if we need to inform the monitors */
300
301         Vector JavaDoc monitors = null;
302
303         synchronized (this)
304         {
305             /* Short term lock to protect "connectionMonitors"
306              * and "monitorsWereInformed"
307              * (they may be modified concurrently)
308              */

309
310             if (monitorsWereInformed == false)
311             {
312                 monitorsWereInformed = true;
313                 monitors = (Vector JavaDoc) connectionMonitors.clone();
314             }
315         }
316
317         if (monitors != null)
318         {
319             for (int i = 0; i < monitors.size(); i++)
320             {
321                 try
322                 {
323                     ConnectionMonitor cmon = (ConnectionMonitor) monitors.elementAt(i);
324                     cmon.connectionLost(reasonClosedCause);
325                 }
326                 catch (Exception JavaDoc ignore)
327                 {
328                 }
329             }
330         }
331     }
332
333     private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException JavaDoc
334     {
335         /* See the comment for createInetAddress() */
336
337         if (proxyData == null)
338         {
339             InetAddress JavaDoc addr = createInetAddress(hostname);
340             sock.connect(new InetSocketAddress JavaDoc(addr, port), connectTimeout);
341             sock.setSoTimeout(0);
342             return;
343         }
344
345         if (proxyData instanceof HTTPProxyData)
346         {
347             HTTPProxyData pd = (HTTPProxyData) proxyData;
348
349             /* At the moment, we only support HTTP proxies */
350
351             InetAddress JavaDoc addr = createInetAddress(pd.proxyHost);
352             sock.connect(new InetSocketAddress JavaDoc(addr, pd.proxyPort), connectTimeout);
353             sock.setSoTimeout(0);
354
355             /* OK, now tell the proxy where we actually want to connect to */
356
357             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
358
359             sb.append("CONNECT ");
360             sb.append(hostname);
361             sb.append(':');
362             sb.append(port);
363             sb.append(" HTTP/1.0\r\n");
364
365             if ((pd.proxyUser != null) && (pd.proxyPass != null))
366             {
367                 String JavaDoc credentials = pd.proxyUser + ":" + pd.proxyPass;
368                 char[] encoded = Base64.encode(credentials.getBytes());
369                 sb.append("Proxy-Authorization: Basic ");
370                 sb.append(encoded);
371                 sb.append("\r\n");
372             }
373
374             if (pd.requestHeaderLines != null)
375             {
376                 for (int i = 0; i < pd.requestHeaderLines.length; i++)
377                 {
378                     if (pd.requestHeaderLines[i] != null)
379                     {
380                         sb.append(pd.requestHeaderLines[i]);
381                         sb.append("\r\n");
382                     }
383                 }
384             }
385
386             sb.append("\r\n");
387
388             OutputStream JavaDoc out = sock.getOutputStream();
389
390             out.write(sb.toString().getBytes());
391             out.flush();
392
393             /* Now parse the HTTP response */
394
395             byte[] buffer = new byte[1024];
396             InputStream JavaDoc in = sock.getInputStream();
397
398             int len = ClientServerHello.readLineRN(in, buffer);
399
400             String JavaDoc httpReponse = new String JavaDoc(buffer, 0, len);
401
402             if (httpReponse.startsWith("HTTP/") == false)
403                 throw new IOException JavaDoc("The proxy did not send back a valid HTTP response.");
404
405             /* "HTTP/1.X XYZ X" => 14 characters minimum */
406
407             if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
408                 throw new IOException JavaDoc("The proxy did not send back a valid HTTP response.");
409
410             int errorCode = 0;
411
412             try
413             {
414                 errorCode = Integer.parseInt(httpReponse.substring(9, 12));
415             }
416             catch (NumberFormatException JavaDoc ignore)
417             {
418                 throw new IOException JavaDoc("The proxy did not send back a valid HTTP response.");
419             }
420
421             if ((errorCode < 0) || (errorCode > 999))
422                 throw new IOException JavaDoc("The proxy did not send back a valid HTTP response.");
423
424             if (errorCode != 200)
425             {
426                 throw new HTTPProxyException(httpReponse.substring(13), errorCode);
427             }
428
429             /* OK, read until empty line */
430
431             while (true)
432             {
433                 len = ClientServerHello.readLineRN(in, buffer);
434                 if (len == 0)
435                     break;
436             }
437             return;
438         }
439
440         throw new IOException JavaDoc("Unsupported ProxyData");
441     }
442
443     public void initialize(CryptoWishList cwl, ServerHostKeyVerifier verifier, DHGexParameters dhgex,
444             int connectTimeout, SecureRandom JavaDoc rnd, ProxyData proxyData) throws IOException JavaDoc
445     {
446         /* First, establish the TCP connection to the SSH-2 server */
447
448         establishConnection(proxyData, connectTimeout);
449
450         /* Parse the server line and say hello - important: this information is later needed for the
451          * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
452          * for later use.
453          */

454
455         ClientServerHello csh = new ClientServerHello(sock.getInputStream(), sock.getOutputStream());
456
457         tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
458
459         km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd);
460         km.initiateKEX(cwl, dhgex);
461
462         receiveThread = new Thread JavaDoc(new Runnable JavaDoc()
463         {
464             public void run()
465             {
466                 try
467                 {
468                     receiveLoop();
469                 }
470                 catch (IOException JavaDoc e)
471                 {
472                     close(e, false);
473
474                     if (log.isEnabled())
475                         log.log(10, "Receive thread: error in receiveLoop: " + e.getMessage());
476                 }
477
478                 if (log.isEnabled())
479                     log.log(50, "Receive thread: back from receiveLoop");
480
481                 /* Tell all handlers that it is time to say goodbye */
482
483                 if (km != null)
484                 {
485                     try
486                     {
487                         km.handleMessage(null, 0);
488                     }
489                     catch (IOException JavaDoc e)
490                     {
491                     }
492                 }
493
494                 for (int i = 0; i < messageHandlers.size(); i++)
495                 {
496                     HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i);
497                     try
498                     {
499                         he.mh.handleMessage(null, 0);
500                     }
501                     catch (Exception JavaDoc ignore)
502                     {
503                     }
504                 }
505             }
506         });
507
508         receiveThread.setDaemon(true);
509         receiveThread.start();
510     }
511
512     public void registerMessageHandler(MessageHandler mh, int low, int high)
513     {
514         HandlerEntry he = new HandlerEntry();
515         he.mh = mh;
516         he.low = low;
517         he.high = high;
518
519         synchronized (messageHandlers)
520         {
521             messageHandlers.addElement(he);
522         }
523     }
524
525     public void removeMessageHandler(MessageHandler mh, int low, int high)
526     {
527         synchronized (messageHandlers)
528         {
529             for (int i = 0; i < messageHandlers.size(); i++)
530             {
531                 HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i);
532                 if ((he.mh == mh) && (he.low == low) && (he.high == high))
533                 {
534                     messageHandlers.removeElementAt(i);
535                     break;
536                 }
537             }
538         }
539     }
540
541     public void sendKexMessage(byte[] msg) throws IOException JavaDoc
542     {
543         synchronized (connectionSemaphore)
544         {
545             if (connectionClosed)
546             {
547                 throw (IOException JavaDoc) new IOException JavaDoc("Sorry, this connection is closed.").initCause(reasonClosedCause);
548             }
549
550             flagKexOngoing = true;
551
552             try
553             {
554                 tc.sendMessage(msg);
555             }
556             catch (IOException JavaDoc e)
557             {
558                 close(e, false);
559                 throw e;
560             }
561         }
562     }
563
564     public void kexFinished() throws IOException JavaDoc
565     {
566         synchronized (connectionSemaphore)
567         {
568             flagKexOngoing = false;
569             connectionSemaphore.notifyAll();
570         }
571     }
572
573     public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException JavaDoc
574     {
575         km.initiateKEX(cwl, dhgex);
576     }
577
578     public void changeRecvCipher(BlockCipher bc, MAC mac)
579     {
580         tc.changeRecvCipher(bc, mac);
581     }
582
583     public void changeSendCipher(BlockCipher bc, MAC mac)
584     {
585         tc.changeSendCipher(bc, mac);
586     }
587
588     public void sendAsynchronousMessage(byte[] msg) throws IOException JavaDoc
589     {
590         synchronized (asynchronousQueue)
591         {
592             asynchronousQueue.addElement(msg);
593
594             /* This limit should be flexible enough. We need this, otherwise the peer
595              * can flood us with global requests (and other stuff where we have to reply
596              * with an asynchronous message) and (if the server just sends data and does not
597              * read what we send) this will probably put us in a low memory situation
598              * (our send queue would grow and grow and...) */

599
600             if (asynchronousQueue.size() > 100)
601                 throw new IOException JavaDoc("Error: the peer is not consuming our asynchronous replies.");
602
603             /* Check if we have an asynchronous sending thread */
604
605             if (asynchronousThread == null)
606             {
607                 asynchronousThread = new AsynchronousWorker();
608                 asynchronousThread.setDaemon(true);
609                 asynchronousThread.start();
610
611                 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
612             }
613         }
614     }
615
616     public void setConnectionMonitors(Vector JavaDoc monitors)
617     {
618         synchronized (this)
619         {
620             connectionMonitors = (Vector JavaDoc) monitors.clone();
621         }
622     }
623
624     public void sendMessage(byte[] msg) throws IOException JavaDoc
625     {
626         if (Thread.currentThread() == receiveThread)
627             throw new IOException JavaDoc("Assertion error: sendMessage may never be invoked by the receiver thread!");
628
629         synchronized (connectionSemaphore)
630         {
631             while (true)
632             {
633                 if (connectionClosed)
634                 {
635                     throw (IOException JavaDoc) new IOException JavaDoc("Sorry, this connection is closed.")
636                             .initCause(reasonClosedCause);
637                 }
638
639                 if (flagKexOngoing == false)
640                     break;
641
642                 try
643                 {
644                     connectionSemaphore.wait();
645                 }
646                 catch (InterruptedException JavaDoc e)
647                 {
648                 }
649             }
650
651             try
652             {
653                 tc.sendMessage(msg);
654             }
655             catch (IOException JavaDoc e)
656             {
657                 close(e, false);
658                 throw e;
659             }
660         }
661     }
662
663     public void receiveLoop() throws IOException JavaDoc
664     {
665         byte[] msg = new byte[35000];
666
667         while (true)
668         {
669             int msglen = tc.receiveMessage(msg, 0, msg.length);
670
671             int type = msg[0] & 0xff;
672
673             if (type == Packets.SSH_MSG_IGNORE)
674                 continue;
675
676             if (type == Packets.SSH_MSG_DEBUG)
677             {
678                 if (log.isEnabled())
679                 {
680                     TypesReader tr = new TypesReader(msg, 0, msglen);
681                     tr.readByte();
682                     tr.readBoolean();
683                     StringBuffer JavaDoc debugMessageBuffer = new StringBuffer JavaDoc();
684                     debugMessageBuffer.append(tr.readString("UTF-8"));
685
686                     for (int i = 0; i < debugMessageBuffer.length(); i++)
687                     {
688                         char c = debugMessageBuffer.charAt(i);
689
690                         if ((c >= 32) && (c <= 126))
691                             continue;
692                         debugMessageBuffer.setCharAt(i, '\uFFFD');
693                     }
694
695                     log.log(50, "DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
696                 }
697                 continue;
698             }
699
700             if (type == Packets.SSH_MSG_UNIMPLEMENTED)
701             {
702                 throw new IOException JavaDoc("Peer sent UNIMPLEMENTED message, that should not happen.");
703             }
704
705             if (type == Packets.SSH_MSG_DISCONNECT)
706             {
707                 TypesReader tr = new TypesReader(msg, 0, msglen);
708                 tr.readByte();
709                 int reason_code = tr.readUINT32();
710                 StringBuffer JavaDoc reasonBuffer = new StringBuffer JavaDoc();
711                 reasonBuffer.append(tr.readString("UTF-8"));
712
713                 /*
714                  * Do not get fooled by servers that send abnormal long error
715                  * messages
716                  */

717
718                 if (reasonBuffer.length() > 255)
719                 {
720                     reasonBuffer.setLength(255);
721                     reasonBuffer.setCharAt(254, '.');
722                     reasonBuffer.setCharAt(253, '.');
723                     reasonBuffer.setCharAt(252, '.');
724                 }
725
726                 /*
727                  * Also, check that the server did not send charcaters that may
728                  * screw up the receiver -> restrict to reasonable US-ASCII
729                  * subset -> "printable characters" (ASCII 32 - 126). Replace
730                  * all others with 0xFFFD (UNICODE replacement character).
731                  */

732
733                 for (int i = 0; i < reasonBuffer.length(); i++)
734                 {
735                     char c = reasonBuffer.charAt(i);
736
737                     if ((c >= 32) && (c <= 126))
738                         continue;
739                     reasonBuffer.setCharAt(i, '\uFFFD');
740                 }
741
742                 throw new IOException JavaDoc("Peer sent DISCONNECT message (reason code " + reason_code + "): "
743                         + reasonBuffer.toString());
744             }
745
746             /*
747              * Is it a KEX Packet?
748              */

749
750             if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
751                     || ((type >= 30) && (type <= 49)))
752             {
753                 km.handleMessage(msg, msglen);
754                 continue;
755             }
756
757             MessageHandler mh = null;
758
759             for (int i = 0; i < messageHandlers.size(); i++)
760             {
761                 HandlerEntry he = (HandlerEntry) messageHandlers.elementAt(i);
762                 if ((he.low <= type) && (type <= he.high))
763                 {
764                     mh = he.mh;
765                     break;
766                 }
767             }
768
769             if (mh == null)
770                 throw new IOException JavaDoc("Unexpected SSH message (type " + type + ")");
771
772             mh.handleMessage(msg, msglen);
773         }
774     }
775 }
776
Popular Tags