KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > HttpNetwork


1 /*
2  * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  *
19  * Initial developer(s): ScalAgent Distributed Technologies
20  * Contributor(s):
21  */

22 package fr.dyade.aaa.agent;
23
24 import java.io.*;
25 import java.net.*;
26 import java.util.Vector JavaDoc;
27
28 import org.objectweb.util.monolog.api.BasicLevel;
29 import org.objectweb.util.monolog.api.Logger;
30
31 import fr.dyade.aaa.util.*;
32
33 /**
34  * <tt>HttpNetwork</tt> is a simple implementation of <tt>StreamNetwork</tt>
35  * based on HTTP 1.1 protocol.
36  */

37 public class HttpNetwork extends StreamNetwork implements HttpNetworkMBean {
38   private InetAddress proxy = null;
39   /**
40    * Hostname (or IP dotted address) of proxy host, if not defined there
41    * is a direct connection between the client and the server.
42    * This value can be adjusted for all HttpNetwork components by setting
43    * <code>proxyhost</code> global property or for a particular
44    * network by setting <code>\<DomainName\>.proxyhost</code>
45    * specific property.
46    * <p>
47    * Theses properties can be fixed either from <code>java</code> launching
48    * command, or in <code>a3servers.xml</code> configuration file.
49    */

50   String JavaDoc proxyhost = null;
51   /**
52    * Port number of proxy if any.
53    * This value can be adjusted for all HttpNetwork components by setting
54    * <code>proxyport</code> global property or for a particular
55    * network by setting <code>\<DomainName\>.proxyport</code>
56    * specific property.
57    * <p>
58    * Theses properties can be fixed either from <code>java</code> launching
59    * command, or in <code>a3servers.xml</code> configuration file.
60    */

61   int proxyport = 0;
62
63   /**
64    * Period of time between two activation of NetServerOut, it matchs to the
65    * time between two requests from the client to the server when there is no
66    * message to transmit from client to server.
67    * This value can be adjusted for all HttpNetwork components by setting
68    * <code>ActivationPeriod</code> global property or for a particular
69    * network by setting <code>\<DomainName\>.ActivationPeriod</code>
70    * specific property.
71    * <p>
72    * Theses properties can be fixed either from <code>java</code> launching
73    * command, or in <code>a3servers.xml</code> configuration file. By default,
74    * its value is 10000 (10s).
75    */

76   protected long activationPeriod = 10000L;
77
78   /**
79    * Gets the activationPeriod value.
80    *
81    * @return the activationPeriod value
82    */

83   public long getActivationPeriod() {
84     return activationPeriod;
85   }
86
87   /**
88    * Sets the activationPeriod value.
89    *
90    * @param activationPeriod the activationPeriod value
91    */

92   public void setActivationPeriod(long activationPeriod) {
93     this.activationPeriod = activationPeriod;
94   }
95
96   /**
97    * Number of listening daemon, this value is only valid for the server
98    * part of the HttpNetwork.
99    * This value can be adjusted for all HttpNetwork components by setting
100    * <code>NbDaemon</code> global property or for a particular network by
101    * setting <code>\<DomainName\>.NbDaemon</code> specific property.
102    * <p>
103    * Theses properties can be fixed either from <code>java</code> launching
104    * command, or in <code>a3servers.xml</code> configuration file.
105    */

106   int NbDaemon = 1;
107
108   /**
109    * Gets the NbDaemon value.
110    *
111    * @return the NbDaemon value
112    */

113   public long getNbDaemon() {
114     return NbDaemon;
115   }
116
117   /**
118    * Creates a new network component.
119    */

120   public HttpNetwork() {
121     super();
122   }
123
124   /**
125    * Descriptor of the listen server, it is used only on the client side
126    * (NetServerOut component).
127    */

128   ServerDesc server = null;
129
130   /**
131    * Initializes a new network component. This method is used in order to
132    * easily creates and configure a Network component from a class name.
133    * So we can use the <code>Class.newInstance()</code> method for create
134    * (whitout any parameter) the component, then we can initialize it with
135    * this method.<br>
136    * This method initializes the logical clock for the domain.
137    *
138    * @param name The domain name.
139    * @param port The listen port.
140    * @param servers The list of servers directly accessible from this
141    * network interface.
142    */

143   public void init(String JavaDoc name, int port, short[] servers) throws Exception JavaDoc {
144     super.init(name, port, servers);
145
146     activationPeriod = Long.getLong("ActivationPeriod",
147                                     activationPeriod).longValue();
148     activationPeriod = Long.getLong(name + ".ActivationPeriod",
149                                     activationPeriod).longValue();
150     
151     NbDaemon = Integer.getInteger("NbDaemon", NbDaemon).intValue();
152     NbDaemon = Integer.getInteger(name + ".NbDaemon", NbDaemon).intValue();
153
154     proxyhost = System.getProperty("proxyhost");
155     proxyhost = System.getProperty(name + ".proxyhost", proxyhost);
156     if (proxyhost != null) {
157       proxyport = Integer.getInteger("proxyport", 8080).intValue();
158       proxyport = Integer.getInteger(name + ".proxyport", proxyport).intValue();
159       proxy = InetAddress.getByName(proxyhost);
160     }
161   }
162
163   /** Daemon component */
164   Daemon dmon[] = null;
165
166   /**
167    * Causes this network component to begin execution.
168    */

169   public void start() throws Exception JavaDoc {
170     logmon.log(BasicLevel.DEBUG, getName() + ", starting");
171     try {
172       if (isRunning()) return;
173
174       // AF: May be, we have to verify that there is only one 'listen' network.
175
for (int i=0; i<servers.length; i++) {
176         server = AgentServer.getServerDesc(servers[i]);
177         if ((server.getServerId() != AgentServer.getServerId()) &&
178             (server.getPort() > 0)) {
179           logmon.log(BasicLevel.DEBUG, getName() + ", server=" + server);
180           break;
181         }
182         server = null;
183       }
184
185       if (port != 0) {
186         dmon = new Daemon[NbDaemon];
187         ServerSocket listen = createServerSocket();
188
189         for (int i=0; i<NbDaemon; i++) {
190           dmon[i] = new NetServerIn(getName() + '.' + i, listen, logmon);
191         }
192       } else {
193         dmon = new Daemon[1];
194         dmon[0] = new NetServerOut(getName(), logmon);
195       }
196
197       for (int i=0; i<dmon.length; i++) {
198         dmon[i].start();
199       }
200     } catch (IOException exc) {
201       logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);
202       throw exc;
203     }
204     logmon.log(BasicLevel.DEBUG, getName() + ", started");
205   }
206
207   /**
208    * Wakes up the watch-dog thread.
209    */

210   public void wakeup() {
211 // if (dmon != null) dmon.wakeup();
212
}
213
214   /**
215    * Forces the network component to stop executing.
216    */

217   public void stop() {
218     if (dmon != null) {
219       for (int i=0; i<dmon.length; i++) {
220         dmon[i].stop();
221       }
222     }
223     logmon.log(BasicLevel.DEBUG, getName() + ", stopped");
224   }
225
226   /**
227    * Tests if the network component is alive.
228    *
229    * @return true if this <code>MessageConsumer</code> is alive; false
230    * otherwise.
231    */

232   public boolean isRunning() {
233     if (dmon != null) {
234       for (int i=0; i<dmon.length; i++) {
235         if (dmon[i].isRunning()) return true;
236       }
237     }
238     return false;
239   }
240
241   /**
242    * Returns a string representation of this consumer, including the
243    * daemon's name and status.
244    *
245    * @return A string representation of this consumer.
246    */

247   public String JavaDoc toString() {
248     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
249
250     strbuf.append(super.toString()).append("\n\t");
251     if (dmon != null) {
252       for (int i=0; i<dmon.length; i++) {
253         strbuf.append(dmon[i].toString()).append("\n\t");
254       }
255     }
256     return strbuf.toString();
257   }
258
259   protected String JavaDoc readLine(InputStream is, byte[] buf) throws IOException {
260     int i = 0;
261     while ((buf[i++] = (byte) is.read()) != -1) {
262       if ((buf[i-1] == '\n') && (buf[i-2] == '\r')) {
263     i -= 2;
264     break;
265       }
266     }
267
268     if (i > 0) return new String JavaDoc(buf, 0, i);
269
270     return null;
271   }
272   
273   protected void sendRequest(Message msg,
274                              OutputStream os,
275                              MessageOutputStream nos,
276                              int ack,
277                              long currentTimeMillis) throws Exception JavaDoc {
278     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
279
280     strbuf.append("PUT ");
281     if (proxy != null) {
282       strbuf.append("http://").append(server.getHostname()).append(':').append(server.getPort());
283     }
284     strbuf.append("/msg?from=").append(AgentServer.getServerId());
285     strbuf.append("&stamp=");
286     if (msg != null) {
287       strbuf.append(msg.getStamp());
288     } else {
289       strbuf.append("-1");
290     }
291     strbuf.append(" HTTP/1.1");
292     nos.writeMessage(msg, ack, currentTimeMillis);
293
294     if (proxy != null)
295       strbuf.append("\r\nHost: ").append(server.getHostname());
296     strbuf.append("\r\n" +
297                   "User-Agent: ScalAgent 1.0\r\n" +
298                   "Accept: image/jpeg;q=0.2\r\n" +
299                   "Accept-Language: fr, en-us;q=0.50\r\n" +
300                   "Accept-Encoding: gzip;q=0.9\r\n" +
301                   "Accept-Charset: ISO-8859-1, utf-8;q=0.66\r\n" +
302                   "Cache-Control: no-cache\r\n" +
303                   "Cache-Control: no-store\r\n" +
304                   "Keep-Alive: 300\r\n" +
305                   "Connection: keep-alive\r\n" +
306                   "Proxy-Connection: keep-alive\r\n" +
307                   "Pragma: no-cache\r\n");
308     strbuf.append("Content-Length: ").append(nos.size());
309     strbuf.append("\r\n" +
310                   "Content-Type: image/jpeg\r\n");
311     strbuf.append("\r\n");
312
313     os.write(strbuf.toString().getBytes());
314     
315     if (logmon.isLoggable(BasicLevel.DEBUG))
316       logmon.log(BasicLevel.DEBUG, name + ", writes:" + nos.size());
317     nos.writeTo(os);
318     nos.reset();
319
320     os.flush();
321   }
322
323   protected short getRequest(InputStream is,
324                              MessageInputStream nis,
325                              byte[] buf) throws Exception JavaDoc {
326     String JavaDoc line = null;
327
328     line = readLine(is, buf);
329     if ((line == null) ||
330         (! (line.startsWith("GET ") || line.startsWith("PUT ")))) {
331       throw new Exception JavaDoc("Bad request: " + line);
332     }
333
334     int idx1 = line.indexOf("?from=");
335     if (idx1 == -1) throw new Exception JavaDoc("Bad request: " + line);
336     int idx2 = line.indexOf("&", idx1);
337     if (idx2 == -1) throw new Exception JavaDoc("Bad request: " + line);
338     short from = Short.parseShort(line.substring(idx1+6, idx2));
339
340     // Skip all header lines, get length
341
int length = 0;
342     while (line != null) {
343       line = readLine(is, buf);
344       if ((line != null) && line.startsWith("Content-Length: ")) {
345         // get content length
346
length = Integer.parseInt(line.substring(16));
347         if (logmon.isLoggable(BasicLevel.DEBUG))
348           logmon.log(BasicLevel.DEBUG, name + ", length:" + length);
349       }
350     }
351
352     if (nis.readFrom(is) != length)
353       logmon.log(BasicLevel.WARN, name + "Bad request length: " + length);
354
355     return from;
356   }
357
358   protected void sendReply(Message msg,
359                            OutputStream os,
360                            MessageOutputStream nos,
361                            int ack,
362                            long currentTimeMillis) throws Exception JavaDoc {
363     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
364
365     strbuf.append("HTTP/1.1 200 OK\r\n");
366
367     nos.writeMessage(msg, ack, currentTimeMillis);
368
369     strbuf.append("Date: ").append("Fri, 21 Feb 2003 14:30:51 GMT");
370     strbuf.append("\r\n" +
371                   "Server: ScalAgent 1.0\r\n" +
372                   "Last-Modified: ").append("Wed, 19 Apr 2000 08:16:28 GMT");
373     strbuf.append("\r\n" +
374                   "Cache-Control: no-cache\r\n" +
375                   "Cache-Control: no-store\r\n" +
376                   "Accept-Ranges: bytes\r\n" +
377                   "Keep-Alive: timeout=15, max=100\r\n" +
378                   "Connection: Keep-Alive\r\n" +
379                   "Proxy-Connection: Keep-Alive\r\n" +
380                   "Pragma: no-cache\r\n");
381     strbuf.append("Content-Length: ").append(nos.size());
382     strbuf.append("\r\n" +
383                   "Content-Type: image/gif\r\n");
384     strbuf.append("\r\n");
385
386     os.write(strbuf.toString().getBytes());
387     
388     if (logmon.isLoggable(BasicLevel.DEBUG))
389       logmon.log(BasicLevel.DEBUG, name + ", writes:" + nos.size());
390     nos.writeTo(os);
391     nos.reset();
392
393     os.flush();
394   }
395
396   protected void getReply(InputStream is,
397                           MessageInputStream nis,
398                           byte[] buf) throws Exception JavaDoc {
399     String JavaDoc line = null;
400
401     line = readLine(is, buf);
402     if ((line == null) ||
403         ((! line.equals("HTTP/1.1 200 OK")) &&
404          (! line.equals("HTTP/1.1 204 No Content")))) {
405       throw new Exception JavaDoc("Bad reply: " + line);
406     }
407
408     // Skip all header lines, get length
409
int length = 0;
410     while (line != null) {
411       line = readLine(is, buf);
412       if ((line != null) && line.startsWith("Content-Length: ")) {
413         // get content length
414
length = Integer.parseInt(line.substring(16));
415         if (logmon.isLoggable(BasicLevel.DEBUG))
416           logmon.log(BasicLevel.DEBUG, name + ", length:" + length);
417       }
418     }
419
420     if (nis.readFrom(is) != length)
421       logmon.log(BasicLevel.WARN, name + "Bad reply length: " + length);
422   }
423
424   protected int handle(Message msgout,
425                        MessageInputStream nis) throws Exception JavaDoc {
426     int ack = nis.getAckStamp();
427
428     if (logmon.isLoggable(BasicLevel.DEBUG))
429       logmon.log(BasicLevel.DEBUG,
430                  this.getName() + ", handle: " + msgout + ", ack=" + ack);
431
432     if ((msgout != null) && (msgout.stamp == ack)) {
433       AgentServer.getTransaction().begin();
434       // Suppress the processed notification from message queue,
435
// and deletes it.
436
qout.removeMessage(msgout);
437       msgout.delete();
438       msgout.free();
439       AgentServer.getTransaction().commit();
440       AgentServer.getTransaction().release();
441     }
442
443     Message msg = nis.getMessage();
444     if (logmon.isLoggable(BasicLevel.DEBUG))
445       logmon.log(BasicLevel.DEBUG,
446                  this.getName() + ", get: " + msg);
447
448     if (msg != null) {
449       ack = msg.stamp;
450       testBootTS(msg.getSource(), nis.getBootTS());
451       deliver(msg);
452       return ack;
453     }
454
455     return -1;
456   }
457
458   /**
459    * This method creates a tunnelling socket if a proxy is used.
460    *
461    * @param host the server host.
462    * @param port the server port.
463    * @param proxy the proxy host.
464    * @param proxyport the proxy port.
465    * @return a socket connected to a ServerSocket at the specified
466    * network address and port.
467    *
468    * @exception IOException if the connection can't be established
469    */

470   Socket createTunnelSocket(InetAddress host, int port,
471                             InetAddress proxy, int proxyport) throws IOException {
472     return createSocket(proxy, proxyport);
473   }
474
475   final class NetServerOut extends Daemon {
476     Socket socket = null;
477
478     InputStream is = null;
479     OutputStream os = null;
480
481     MessageInputStream nis = null;
482     MessageOutputStream nos = null;
483
484     NetServerOut(String JavaDoc name, Logger logmon) throws IOException {
485       super(name + ".NetServerOut");
486       // Overload logmon definition in Daemon
487
this.logmon = logmon;
488
489       nis = new MessageInputStream();
490       nos = new MessageOutputStream();
491     }
492
493     protected void open() throws IOException {
494       // Open the connection.
495
socket = null;
496
497       if (proxy == null) {
498         socket = createSocket(server);
499       } else {
500         try {
501           socket = createTunnelSocket(server.getAddr(), server.getPort(),
502                                       proxy, proxyport);
503         } catch (IOException exc) {
504           logmon.log(BasicLevel.WARN,
505                      this.getName() + ", connection refused, reset addr");
506           server.resetAddr();
507           proxy = InetAddress.getByName(proxyhost);
508           socket = createTunnelSocket(server.getAddr(), server.getPort(),
509                                       proxy, proxyport);
510         }
511       }
512       setSocketOption(socket);
513
514       os = socket.getOutputStream();
515       is = socket.getInputStream();
516     }
517
518     protected void close() {
519       if (socket != null) {
520         try {
521           os.close();
522         } catch (Exception JavaDoc exc) {}
523         try {
524           is.close();
525         } catch (Exception JavaDoc exc) {}
526         try {
527           socket.close();
528         } catch (Exception JavaDoc exc) {}
529       }
530     }
531
532     protected void shutdown() {
533       thread.interrupt();
534       close();
535     }
536
537     public void run() {
538       Message msgout = null;
539       int ack = -1;
540
541       byte[] buf = new byte[120];
542
543       try {
544     while (running) {
545           canStop = true;
546       try {
547         try {
548               if (logmon.isLoggable(BasicLevel.DEBUG))
549                 logmon.log(BasicLevel.DEBUG,
550                            this.getName() + ", waiting message");
551
552               msgout = qout.get(activationPeriod);
553         } catch (InterruptedException JavaDoc exc) {
554               if (logmon.isLoggable(BasicLevel.DEBUG))
555                 logmon.log(BasicLevel.DEBUG,
556                            this.getName() + ", interrupted");
557         }
558             open();
559
560             do {
561               if (logmon.isLoggable(BasicLevel.DEBUG))
562                 logmon.log(BasicLevel.DEBUG,
563                            this.getName() + ", sendRequest: " + msgout + ", ack=" + ack);
564
565               if ((msgout != null) &&(msgout.not.expiration != -1))
566                 logmon.log(BasicLevel.FATAL,
567                            getName() + ": AF YYY " + msgout.not);
568
569               long currentTimeMillis = System.currentTimeMillis();
570               do {
571                 if ((msgout != null) &&
572                     (msgout.not.expiration > 0) &&
573                     (msgout.not.expiration < currentTimeMillis)) {
574                   if (logmon.isLoggable(BasicLevel.DEBUG))
575                     logmon.log(BasicLevel.DEBUG,
576                                getName() + ": AF removes expired notification XXX " +
577                                msgout.from + ", " + msgout.not);
578                   // Suppress the processed notification from message queue,
579
// and deletes it. It can be done outside of a transaction
580
// and commited later (on next handle).
581
qout.removeMessage(msgout);
582                   msgout.delete();
583                   msgout.free();
584
585                   msgout = qout.get(0L);
586                   continue;
587                 }
588                 break;
589               } while (true);
590
591               sendRequest(msgout, os, nos, ack, currentTimeMillis);
592               getReply(is, nis, buf);
593
594               canStop = false;
595               ack = handle(msgout, nis);
596               canStop = true;
597               // Get next message to send if any
598
msgout = qout.get(0);
599             } while (running && ((msgout != null) || (ack != -1)));
600           } catch (Exception JavaDoc exc) {
601             if (logmon.isLoggable(BasicLevel.DEBUG))
602               logmon.log(BasicLevel.DEBUG,
603                          this.getName() + ", connection closed", exc);
604           } finally {
605             if (logmon.isLoggable(BasicLevel.DEBUG))
606               logmon.log(BasicLevel.DEBUG,
607                          this.getName() + ", connection ends");
608             try {
609               os.close();
610             } catch (Exception JavaDoc exc) {}
611             os = null;
612             try {
613               is.close();
614             } catch (Exception JavaDoc exc) {}
615             is = null;
616             try {
617               socket.close();
618             } catch (Exception JavaDoc exc) {}
619             socket = null;
620           }
621         }
622       } finally {
623         logmon.log(BasicLevel.WARN, ", exited");
624     finish();
625       }
626     }
627   }
628
629   final class NetServerIn extends Daemon {
630     ServerSocket listen = null;
631     
632     Socket socket = null;
633
634     InputStream is = null;
635     OutputStream os = null;
636
637     MessageInputStream nis = null;
638     MessageOutputStream nos = null;
639
640     NetServerIn(String JavaDoc name, ServerSocket listen, Logger logmon) throws IOException {
641       super(name + ".NetServerIn");
642       this.listen = listen;
643       // Overload logmon definition in Daemon
644
this.logmon = logmon;
645
646       nis = new MessageInputStream();
647       nos = new MessageOutputStream();
648     }
649
650     protected void open(Socket socket) throws IOException {
651       setSocketOption(socket);
652
653       os = socket.getOutputStream();
654       is = socket.getInputStream();
655
656       if (logmon.isLoggable(BasicLevel.DEBUG))
657         logmon.log(BasicLevel.DEBUG, this.getName() + ", connected");
658     }
659
660     protected void close() {
661       if (socket != null) {
662         try {
663           os.close();
664         } catch (Exception JavaDoc exc) {}
665         try {
666           is.close();
667         } catch (Exception JavaDoc exc) {}
668         try {
669           socket.close();
670         } catch (Exception JavaDoc exc) {}
671       }
672       try {
673     listen.close();
674       } catch (Exception JavaDoc exc) {}
675     }
676
677     protected void shutdown() {
678       close();
679     }
680
681     public void run() {
682       Message msgout= null;
683       int ack = -1;
684
685       byte[] buf = new byte[120];
686
687       try {
688     while (running) {
689           canStop = true;
690
691           // Get the connection
692
try {
693             if (logmon.isLoggable(BasicLevel.DEBUG))
694               logmon.log(BasicLevel.DEBUG,
695                          this.getName() + ", waiting connection");
696             socket = listen.accept();
697             open(socket);
698             
699             short from = getRequest(is, nis, buf);
700             long currentTimeMillis = System.currentTimeMillis();
701             do {
702               canStop = false;
703               ack = handle(msgout, nis);
704               canStop = true;
705
706               do {
707                 msgout = qout.getMessageTo(from);
708
709                 if ((msgout != null) &&(msgout.not.expiration != -1))
710                   logmon.log(BasicLevel.FATAL,
711                              getName() + ": AF YYY " + msgout.not);
712
713                 if ((msgout != null) &&
714                     (msgout.not.expiration > 0) &&
715                     (msgout.not.expiration < currentTimeMillis)) {
716                   if (logmon.isLoggable(BasicLevel.DEBUG))
717                     logmon.log(BasicLevel.DEBUG,
718                                getName() + ": AF removes expired notification " +
719                                msgout.from + ", " + msgout.not);
720                   // Suppress the processed notification from message queue,
721
// and deletes it. It can be done outside of a transaction
722
// and commited later (on next handle).
723
qout.removeMessage(msgout);
724                   msgout.delete();
725                   msgout.free();
726
727                   continue;
728                 }
729                 break;
730               } while (true);
731  
732               if (logmon.isLoggable(BasicLevel.DEBUG))
733                 logmon.log(BasicLevel.DEBUG,
734                            this.getName() + ", sendReply: " + msgout);
735
736               sendReply(msgout, os, nos, ack, currentTimeMillis);
737
738               logmon.log(BasicLevel.DEBUG,
739                          getName() + ": AF WWW " + msgout);
740
741               getRequest(is, nis, buf);
742             } while (running);
743           } catch (Exception JavaDoc exc) {
744             if (logmon.isLoggable(BasicLevel.DEBUG))
745               logmon.log(BasicLevel.DEBUG, ", connection closed", exc);
746           } finally {
747             if (logmon.isLoggable(BasicLevel.DEBUG))
748               logmon.log(BasicLevel.DEBUG, ", connection ends");
749             try {
750               os.close();
751             } catch (Exception JavaDoc exc) {}
752             os = null;
753             try {
754               is.close();
755             } catch (Exception JavaDoc exc) {}
756             is = null;
757             try {
758               socket.close();
759             } catch (Exception JavaDoc exc) {}
760             socket = null;
761           }
762         }
763       } finally {
764         logmon.log(BasicLevel.WARN, ", exited");
765         finish();
766       }
767     }
768   }
769
770   /**
771    * Class used to read messages through a stream.
772    */

773   final class MessageInputStream extends ByteArrayInputStream {
774     MessageInputStream() {
775       super(new byte[256]);
776     }
777
778     private void readFully(InputStream is, int length) throws IOException {
779       count = 0;
780       if (length > buf.length) buf = new byte[length];
781       
782       int nb = -1;
783       do {
784         nb = is.read(buf, count, length-count);
785         if (logmon.isLoggable(BasicLevel.DEBUG))
786           logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb);
787         if (nb < 0) throw new EOFException();
788         count += nb;
789       } while (count != length);
790       pos = 0;
791     }
792
793     int msgLen;
794     int msgBoot;
795     int msgAck;
796
797     Message msg = null;
798
799     int readFrom(InputStream is) throws Exception JavaDoc {
800       readFully(is, 12);
801       // Reads message length
802
msgLen = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) +
803         ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0);
804       // Reads boot timestamp of source server
805
msgBoot = ((buf[4] & 0xFF) << 24) + ((buf[5] & 0xFF) << 16) +
806         ((buf[6] & 0xFF) << 8) + ((buf[7] & 0xFF) << 0);
807       msgAck = ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) +
808         ((buf[10] & 0xFF) << 8) + ((buf[11] & 0xFF) << 0);
809
810       if (msgLen > (Message.LENGTH +11)) {
811         msg = Message.alloc();
812         readFully(is, Message.LENGTH);
813
814         int idx = msg.readFromBuf(buf, 0);
815         // Reads notification attributes
816
boolean persistent = ((buf[idx] & Message.PERSISTENT) == 0)?false:true;
817         boolean detachable = ((buf[idx] & Message.DETACHABLE) == 0)?false:true;
818
819         readFully(is, msgLen - (Message.LENGTH +12));
820         // Reads notification object
821
ObjectInputStream ois = new ObjectInputStream(this);
822         msg.not = (Notification) ois.readObject();
823         if (msg.not.expiration > 0) {
824           msg.not.expiration += System.currentTimeMillis();
825         }
826         msg.not.persistent = persistent;
827         msg.not.detachable = detachable;
828         msg.not.detached = false;
829
830         return msgLen;
831       }
832       msg = null;
833       return 12;
834     }
835
836     int getLength() {
837       return msgLen;
838     }
839
840     int getBootTS() {
841       return msgBoot;
842     }
843
844     int getAckStamp() {
845       return msgAck;
846     }
847
848     Message getMessage() {
849       return msg;
850     }
851   }
852
853   /**
854    * Class used to send messages through a stream.
855    */

856   final class MessageOutputStream extends ByteArrayOutputStream {
857     private ObjectOutputStream oos = null;
858
859     MessageOutputStream() throws IOException {
860       super(256);
861       oos = new ObjectOutputStream(this);
862       count = 0;
863         buf[Message.LENGTH +12] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);
864         buf[Message.LENGTH +13] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);
865         buf[Message.LENGTH +14] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);
866         buf[Message.LENGTH +15] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);
867     }
868
869     void writeMessage(Message msg, int ack,
870                       long currentTimeMillis) throws IOException {
871       // Writes boot timestamp of source server
872
buf[4] = (byte) (getBootTS() >>> 24);
873       buf[5] = (byte) (getBootTS() >>> 16);
874       buf[6] = (byte) (getBootTS() >>> 8);
875       buf[7] = (byte) (getBootTS() >>> 0);
876
877       // Writes stamp of last received message
878
buf[8] = (byte) (ack >>> 24);
879       buf[9] = (byte) (ack >>> 16);
880       buf[10] = (byte) (ack >>> 8);
881       buf[11] = (byte) (ack >>> 0);
882
883       count = 12;
884       if (msg != null) {
885         int idx = msg.writeToBuf(buf, 12);
886         // Writes notification attributes
887
buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) |
888                              (msg.not.detachable?Message.DETACHABLE:0));
889         // Be careful, the stream header is hard-written in buf
890
count = (Message.LENGTH + 12 +4);
891
892         try {
893           if (msg.not.expiration > 0) {
894             msg.not.expiration -= currentTimeMillis;
895           }
896           oos.writeObject(msg.not);
897           oos.reset();
898           oos.flush();
899         } finally {
900           if ((msg.not != null) && (msg.not.expiration > 0)) {
901             msg.not.expiration += currentTimeMillis;
902           }
903         }
904       }
905       // Writes boot timestamp of source server
906
buf[0] = (byte) (count >>> 24);
907       buf[1] = (byte) (count >>> 16);
908       buf[2] = (byte) (count >>> 8);
909       buf[3] = (byte) (count >>> 0);
910     }
911   }
912 }
913
Popular Tags