KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > openlaszlo > connection > HTTPConnection


1 /* *****************************************************************************
2  * HTTPConnection.java
3 * ****************************************************************************/

4
5 /* J_LZ_COPYRIGHT_BEGIN *******************************************************
6 * Copyright 2001-2004 Laszlo Systems, Inc. All Rights Reserved. *
7 * Use is subject to license terms. *
8 * J_LZ_COPYRIGHT_END *********************************************************/

9
10 package org.openlaszlo.connection;
11
12 import java.io.IOException;
13 import java.io.OutputStream;
14 import java.io.StringReader;
15 import java.util.Collections;
16 import java.util.Date;
17 import java.util.Enumeration;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Properties;
21 import java.util.Random;
22 import java.util.Vector;
23 import javax.servlet.http.HttpSessionBindingListener;
24 import javax.servlet.http.HttpSessionBindingEvent;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import javax.servlet.ServletOutputStream;
28 import org.apache.log4j.Logger;
29 import org.jdom.Document;
30 import org.jdom.Element;
31 import org.jdom.input.SAXBuilder;
32 import org.jdom.JDOMException;
33
34
35 /** Persistent connection for SWF files. */
36 public class HTTPConnection
37 {
38     private static final String CONNECTED = "__LPSCONNECTED";
39     private static final String RECONNECTED = "__LPSRECONNECTED";
40
41
42     /** Queue to store events */
43     private List mQueue;
44
45     /** Username for connection -- this may not be unique */
46     private String mUsername;
47
48     /** Session id */
49     private String mSID;
50
51     /** Connection id -- changes with each reconnection */
52     private String mCID;
53
54     /** Servlet response object */
55     private HttpServletResponse mRes;
56
57     /** Output stream */
58     private OutputStream mOut;
59
60     /** Flag to immediately flush message queue */
61     private boolean mDoFlushQueue;
62
63     /** Flag to pad headers with bytes. Used for IE, which doesn't display
64      * anything until byte 2000. */

65     private boolean mDoPad;
66
67     /** Heartbeat interval */
68     private long mHeartbeatInterval;
69
70     /** Heartbeat count */
71     private int mHeartbeatCount;
72
73     /** Request count */
74     private int mRequestCount;
75
76     /** Sent count */
77     private int mSentCount;
78
79     /** Flush count */
80     private int mFlushCount;
81
82     /** Total number of bytes sent out during application session */
83     private int mTotalNumOfBytes;
84
85     /** Number of bytes sent out */
86     private int mNumOfBytes;
87
88     /** String to send out depending on whether it's a connection or
89      * reconnection */

90     private String mConnectedString;
91
92     private boolean mEmitConnectHeaders = false;
93
94     /** Version of SWF bytes for connection messages. */
95     private int mSWFVersion;
96
97
98     //------------------------------------------------------------
99
// Statics
100
//------------------------------------------------------------
101
private static Logger mLogger = Logger.getLogger(HTTPConnection.class);
102
103     /** Used to send IE a pad of 2000 spaces. IE doesn't display information
104      * until it receives 2000 bytes or the connection is closed by the web
105      * server. */

106     private static String mPad;
107
108     /** Static initializer synchronization lock. */
109     private static Object mStaticInitLock = new Object();
110  
111     /** Check for static initialize block. */
112     private static boolean mDoStaticInit = true;
113
114     /** Check if the class was inited (see doInit()). */
115     private static boolean mDoInit = true;
116
117     /** Compiled heartbeat SWF bytes. */
118     private static byte[] mSWFHeartbeat;
119
120     /** Compiled reconnect command SWF bytes. */
121     private static byte[] mSWFDoReconnect;
122
123     /** Maximum length of a message. */
124     private static int mMaxMessageLen = 2000;
125
126     /** Content length of connection SWF. */
127     private static int mConnectionLength = 65536;
128
129     /** Check if disconnect was requested. */
130     private boolean mDoDisconnect = false;
131
132     /** Flag to check if this connection will be replaced by another
133      * connection. */

134     private boolean mIsReconnect = false;
135
136     /** Interval to wait after client reconnection command is sent. */
137     private static int mReconnectionWaitInterval = 60000;
138
139     /** Count to generate UIDs. */
140     private static long mUIDCount = 0;
141
142     //------------------------------------------------------------
143
// Not clear whether a static initializer needs to be
144
// synchronized, but not taking any chances.
145
//------------------------------------------------------------
146
static {
147         if (mDoStaticInit) {
148             synchronized (mStaticInitLock) {
149                 if (mDoStaticInit) {
150                     StringBuffer buf = new StringBuffer(2000);
151                     for (int i=0; i < 2000; i++)
152                         buf.append(' ');
153                     mPad = buf.toString();
154
155                     mSWFHeartbeat = new SwfByte()
156                         .actionSetElement
157                         (getElement
158                          (getConnectionInfoXML("__LPSHB", null)))
159                         .setShowFrame()
160                         .getBuf();
161
162                     mSWFDoReconnect = new SwfByte()
163                         .actionSetElement
164                         (getElement
165                          (getConnectionInfoXML("__LPSDORECONNECT", null)))
166                         .setShowFrame()
167                         .getBuf();
168
169                     mDoStaticInit = false;
170                 }
171             }
172         }
173     }
174
175
176     //------------------------------------------------------------
177
// Initialize static properties. This should only get called
178
// once.
179
//------------------------------------------------------------
180
synchronized static public void init(Properties properties)
181     {
182         if (! mDoInit)
183             return;
184
185         mLogger.debug("init(properties)");
186
187         Enumeration enum = properties.propertyNames();
188         while (enum.hasMoreElements()) {
189
190             String key = (String)enum.nextElement();
191             String val = properties.getProperty(key);
192
193             try {
194                 if (val != null) {
195                     if (key.intern() == "maxMessageLen")
196                         mMaxMessageLen = Integer.parseInt(val);
197                     else if (key.intern() == "connectionLength")
198                         mConnectionLength = Integer.parseInt(val);
199                     else if (key.intern() == "reconnectionWaitInterval")
200                         mReconnectionWaitInterval = Integer.parseInt(val);
201                 }
202             } catch (NumberFormatException e) {
203                 mLogger.debug(e.getMessage());
204             }
205         }
206
207         // These are the minimum values.
208
if (mMaxMessageLen < 2000)
209             mMaxMessageLen = 2000;
210         if (mConnectionLength < (5 * mMaxMessageLen))
211             mConnectionLength = 5 * mMaxMessageLen;
212         if (mReconnectionWaitInterval < 10000) // wait a minimum of 10 seconds
213
mReconnectionWaitInterval = 60000; // default 60 seconds
214

215         mLogger.debug("maxMessageLen:" + mMaxMessageLen);
216         mLogger.debug("connectionLength:" + mConnectionLength);
217         mDoInit = false;
218     }
219
220
221     //------------------------------------------------------------
222
// Constructor
223
//------------------------------------------------------------
224

225     /**
226      * Generates a unique identifier.
227      * @return hexadecimal unique string identifier.
228      */

229     private synchronized static String generateUID()
230     {
231         return Long.toHexString( System.currentTimeMillis() ) +
232             Long.toHexString( mUIDCount++ );
233     }
234
235
236     /**
237      * Constructor.
238      *
239      * @param res http servlet response to use as persistent connection
240      * @param username username associated with connection
241      */

242     public HTTPConnection(HttpServletResponse res, String username, int swfversion)
243         throws IOException
244     {
245         mRes = res;
246         mOut = res.getOutputStream();
247         mUsername = username;
248         mSID = generateUID();
249         mCID = generateUID();
250         mHeartbeatCount = 0;
251         mRequestCount = 0;
252         mSentCount = 0;
253         mFlushCount = 0;
254         mTotalNumOfBytes = 0;
255         mNumOfBytes = 0;
256         mDoDisconnect = false;
257         mSWFVersion = swfversion;
258
259         // Don't have to make queue thread-safe since synchronization is handled
260
// below.
261
mQueue = new Vector();
262
263         mConnectedString = CONNECTED;
264
265         // This is false when attempting a reconnect with client so messages can
266
// be saved.
267
mDoFlushQueue = true;
268
269         setDoPad(false);
270         setHeartbeatInterval(0);
271     }
272
273     /**
274      * Will update the request count, heartbeat count, total number of bytes
275      * sent.
276      *
277      * @param res http servlet response to user as persistent connect
278      * @param hc HTTPConnection to copy parameters from
279      */

280     public HTTPConnection(HttpServletResponse res, HTTPConnection hc)
281         throws IOException
282     {
283         mRes = res;
284         mOut = res.getOutputStream();
285         mUsername = hc.mUsername;
286         mSID = hc.mSID;
287         mCID = generateUID();
288         mHeartbeatCount += hc.mHeartbeatCount;
289         mRequestCount += hc.mRequestCount;
290         mSentCount += hc.mSentCount;
291         mFlushCount += hc.mFlushCount;
292         mTotalNumOfBytes += hc.mTotalNumOfBytes;
293         mEmitConnectHeaders = hc.mEmitConnectHeaders;
294         mDoDisconnect = false;
295         mSWFVersion = hc.mSWFVersion;
296
297         mConnectedString = RECONNECTED;
298
299         // Easy way to save the queue (or is it too easy? I think this is ok...)
300
mQueue = hc.mQueue;
301
302         mNumOfBytes = 0;
303
304         // This is false when attempting a reconnect with client so messages can
305
// be saved.
306
mDoFlushQueue = true;
307
308         setDoPad(hc.mDoPad);
309         setHeartbeatInterval(hc.mHeartbeatInterval);
310     }
311
312
313     //------------------------------------------------------------
314
// Methods
315
//------------------------------------------------------------
316

317     /**
318      * Queue up event to send.
319      *
320      * @param msg event message
321      * @param doFlushQueue if true, flushes the message event queue
322      * @throws IOException if connection does not exist
323      */

324     synchronized public void send(String msg)
325         throws IOException
326     {
327         mLogger.debug("send(msg=" + msg + ")");
328
329         if (mDoDisconnect) {
330             mLogger.debug("connection is already disconnected, not sending");
331             return;
332         }
333
334         Element element = getElement(msg);
335         if (element==null) {
336             mLogger.debug("Bad XML for message: " + msg);
337             return;
338         }
339
340         byte[] swfBuf = swfBuf = new SwfByte()
341             .actionSetElement(element)
342             .setShowFrame()
343             .getBuf();
344
345         if (swfBuf.length > mMaxMessageLen) {
346             String info = "compiled message bytes are too large -- greater than" + mMaxMessageLen + ")";
347             mLogger.debug(info);
348             throw new IOException(info);
349         }
350
351         synchronized (mQueue) {
352             mRequestCount++;
353             mQueue.add(swfBuf);
354         }
355
356         if (mDoFlushQueue) {
357             notify();
358         }
359
360     }
361
362     /**
363      * Close connection.
364      */

365     public void disconnect()
366     {
367         disconnect(false);
368     }
369
370     /**
371      * Close connection.
372      *
373      * @param isReconnect if this connection will be replaced by another
374      * connection.
375      */

376     synchronized public void disconnect(boolean isReconnect)
377     {
378         mLogger.debug("disconnect()");
379         mIsReconnect = isReconnect;
380         mDoDisconnect = true;
381         notify();
382     }
383
384
385     /**
386      * Check if connection is to be replaced by another connection.
387      */

388     synchronized public boolean toBeReconnected()
389     {
390         return mIsReconnect;
391     }
392
393     /**
394      * This call will block and wait for events to handle. Keeps HTTP
395      * connection alive for an asynchronous event.
396      *
397      * @param res servlet response object
398      * @return true if connection was lost to reconnection, else false
399      */

400     public void connect()
401         throws IOException
402     {
403         mLogger.debug("connect()");
404
405         try {
406             mNumOfBytes += sendHeader();
407             mNumOfBytes += flushMessageQueue();
408
409             mTotalNumOfBytes += mNumOfBytes;
410
411             int numOfBytesSent = 0;
412             long waitInterval = mHeartbeatInterval;
413             long reconnectRequestTime = 0; // time we made reconnect request
414

415             while (true) {
416
417                 mLogger.debug("[" + mUsername + "," + mSID + "] "
418                               + " request: " + mRequestCount
419                               + " sent: " + mSentCount
420                               + ", flush: " + mFlushCount
421                               + ", heartbeats: " + mHeartbeatCount
422                               + ", bytes: " + mTotalNumOfBytes
423                               + " (" + mNumOfBytes + "/" + mConnectionLength + ")");
424
425                 synchronized (this) {
426                     wait(waitInterval);
427                 }
428
429                 if (mDoDisconnect) {
430                     mLogger.debug("disconnecting...");
431                     if (reconnectRequestTime != 0) {
432                         mLogger.debug("reconnect time: " +
433                                       ( new Date().getTime() - reconnectRequestTime ) + "ms");
434                     }
435                     return;
436                 }
437
438                 // Check to see if we're nearing the limit of bytes sent out
439
if ( doReconnect() ) {
440
441                     mLogger.debug("sending reconnect request " + mUsername + "...");
442
443                     if (reconnectRequestTime == 0) {
444                         // ...give client a few seconds to reconnect...
445
waitInterval = mReconnectionWaitInterval;
446                         
447                         synchronized (this) {
448                             mDoFlushQueue = false;
449                         }
450
451                         mLogger.debug("...for " + ( waitInterval / 1000 )+ " seconds");
452
453                         // ...send reconnect request to client
454
mOut.write(mSWFDoReconnect);
455                         mOut.flush();
456
457                         mNumOfBytes += mSWFDoReconnect.length;
458                         mTotalNumOfBytes += mSWFDoReconnect.length;
459
460                         reconnectRequestTime = new Date().getTime();
461                         continue;
462
463                     }
464
465                     // If our wait is less than the wait interval, keep
466
// waiting...
467
long now = new Date().getTime();
468                     long interval = now - reconnectRequestTime;
469                     if (interval < waitInterval) {
470                         mLogger.debug("still waiting for reconnect...");
471                         continue;
472                     }
473                     
474                     // ...else, really quit.
475
mLogger.debug("interval was " + interval + "; done waiting...goodbye!");
476                     return;
477                 }
478
479                 numOfBytesSent = 0;
480
481                 // If queue is 0, just send heartbeat.
482
if (mQueue.size()==0) {
483                     mHeartbeatCount++;
484                     mOut.write(mSWFHeartbeat);
485                     mOut.flush();
486                     numOfBytesSent = mSWFHeartbeat.length;
487                 } else {
488                     numOfBytesSent = flushMessageQueue();
489                 }
490
491                 mNumOfBytes += numOfBytesSent;
492                 mTotalNumOfBytes += numOfBytesSent;
493             }
494         } catch (InterruptedException e) {
495             mLogger.debug("InterruptedException: " + e.getMessage());
496         }
497     }
498
499
500     /**
501      * @return true if reconnect needs to happen.
502      */

503     private boolean doReconnect()
504     {
505         return doReconnect(0);
506     }
507
508     /**
509      * @param n number of potential bytes that will be going out.
510      * @return true if reconnect needs to happen.
511      */

512     private boolean doReconnect(int n)
513     {
514         return (mConnectionLength - (mMaxMessageLen * 2)) <= (mNumOfBytes + n);
515     }
516
517
518     /**
519      * Send a connection header and flush any messages in the queue, if any.
520      */

521     private int sendHeader()
522         throws InterruptedException, IOException
523     {
524         mLogger.debug("sendHeader()");
525
526         // Content-length used to be commented out because w/Tomcat 3.3
527
// connections seemed open to the servlet container even after browser
528
// close.
529
mRes.setContentLength(mConnectionLength);
530         mRes.setContentType("application/x-shockwave-flash");
531
532         // This is to keep browsers from doing connection keep-alives.
533
mRes.setHeader("Connection", "close");
534         mRes.setHeader("Keep-Alive", "close");
535
536         if (mEmitConnectHeaders) {
537             if (mConnectedString == CONNECTED)
538                 mRes.setHeader("X-LPS-C", mCID);
539             else if (mConnectedString == RECONNECTED)
540                 mRes.setHeader("X-LPS-R", mCID);
541         }
542
543         mLogger.debug("Sent connected string: " + mConnectedString);
544
545         String info;
546         if (mConnectedString == CONNECTED) {
547             info = getConnectionInfoXML (mConnectedString,
548                                          "<cid>" + mCID + "</cid>" +
549                                          "<sid>" + mSID + "</sid>" +
550                                          "<usr>" + mUsername + "</usr>");
551         } else {
552             info = getConnectionInfoXML (mConnectedString,
553                                          "<cid>" + mCID + "</cid>");
554
555         }
556         SwfByte swf = new SwfByte();
557
558         swf.setHeader(mSWFVersion);
559         swf.setLength(mConnectionLength);
560         swf.actionSetElement(getElement(info));
561         if (mDoPad) swf.actionSetVariable("pad", mPad);
562         swf.setShowFrame();
563
564         byte[] buf = swf.getBuf();
565         mOut.write(buf, 0, buf.length);
566         mOut.flush();
567
568         return buf.length;
569     }
570
571
572     /**
573      * Flush out messages in queue for the client.
574      *
575      * @return number of messages sent
576      * @throws IOException if there's a problem with the output stream
577      */

578     private int flushMessageQueue()
579         throws IOException, InterruptedException
580     {
581         mLogger.debug("flushMessageQueue()");
582
583         byte[] swfBuf;
584         int numOfBytes = 0;
585         synchronized (mQueue) {
586
587             Iterator iter = mQueue.iterator();
588             if (! iter.hasNext())
589                 return 0;
590
591             while (iter.hasNext()) {
592                 swfBuf = (byte[])iter.next();
593                 mOut.write(swfBuf);
594                 numOfBytes += swfBuf.length;
595                 ++mSentCount;
596                 iter.remove();
597
598                 // If data sent is above the byte limit, reconnect before
599
// flushing the rest of the queue.
600
if ( doReconnect(numOfBytes) )
601                     break;
602             }
603
604             mOut.flush();
605             ++mFlushCount;
606         }
607
608         return numOfBytes;
609     }
610
611
612     /**
613      * Check if padding is required.
614      */

615     public boolean doPad()
616     {
617         return mDoPad;
618     }
619
620     /**
621      * Set if you want padding of bytes to headers. Used for browser like IE
622      * that don't display anything until the Nth byte.
623      *
624      * @param doPad flag for padding
625      * @return this object
626      */

627     public HTTPConnection setDoPad(boolean doPad)
628     {
629         mDoPad = doPad;
630         return this;
631     }
632
633     /**
634      * Get heartbeat interval in milliseconds.
635      */

636     public long getHeartbeatInterval()
637     {
638         return mHeartbeatInterval;
639     }
640
641
642     /**
643      * @return version for SWF bytes.
644      */

645     public int getSWFVersion() {
646         return mSWFVersion;
647     }
648
649     /**
650      * Set heartbeat in milliseconds.
651      */

652     public HTTPConnection setHeartbeatInterval(long heartbeatInterval)
653     {
654         mHeartbeatInterval = heartbeatInterval;
655         return this;
656     }
657
658
659     /**
660      * Get username.
661      */

662     public String getUsername()
663     {
664         return mUsername;
665     }
666
667     /**
668      * Get unique id.
669      */

670     public String getCID()
671     {
672         return mCID;
673     }
674
675     /**
676      */

677     public HTTPConnection setEmitConnectHeaders(boolean emitConnectHeaders)
678     {
679         mEmitConnectHeaders = emitConnectHeaders;
680         return this;
681     }
682
683     /**
684      * Turn xml into a JDOM element.
685      *
686      * @param xml xml string
687      * @return JDOM element
688      */

689     private static Element getElement(String xml)
690     {
691         Element el = null;
692         try {
693             el = new SAXBuilder(false)
694             .build(new StringReader(xml))
695             .getRootElement();
696         } catch (JDOMException e) {
697             mLogger.debug(e.getMessage());
698         }
699         return el;
700     }
701
702     /**
703      * XML message structure used to push server information to the
704      * client. Used for heartbeats, sending connection startup and informing
705      * users someone's been disconnected.
706      *
707      * @param type type of information. This will be received by the client as a
708      * dataset.
709      * @param msg message of type, if any
710      * @return an xml string with connection information
711      */

712     public static String getConnectionInfoXML(String type, String msg)
713     {
714         if (msg==null)
715             return "<resultset s=\"0\"><root dset=\""+type+"\" /></resultset>";
716         else
717             return "<resultset s=\"0\"><root dset=\""+type+"\">"
718                 + msg
719                 + "</root></resultset>";
720     }
721
722     public static int getMaxMessageLen() {
723         return mMaxMessageLen;
724     }
725     public static int getConnectionLength() {
726         return mConnectionLength;
727     }
728     public static int getReconnectionWaitInterval() {
729         return mReconnectionWaitInterval;
730     }
731
732     public String toString() {
733         return
734             new StringBuffer()
735             .append("<connection")
736             .append(" sid=\"").append(mSID).append("\"")
737             .append(" cid=\"").append(mCID).append("\"")
738             .append(" user=\"").append(mUsername).append("\"")
739             .append(" heartbeats=\"").append(mHeartbeatCount).append("\"")
740             .append(" flushes=\"").append(mFlushCount).append("\"")
741             .append(" current-bytes=\"").append(mNumOfBytes).append("\"")
742             .append(" total-bytes=\"").append(mTotalNumOfBytes).append("\"")
743             .append(" />").toString();
744     }
745
746 }
747
Popular Tags