1 4 5 9 10 package org.openlaszlo.connection; 11 12 import java.io.IOException; 13 import java.io.OutputStream; 14 import java.io.StringReader; 15 import java.util.Collections; 16 import java.util.Date; 17 import java.util.Enumeration; 18 import java.util.Iterator; 19 import java.util.List; 20 import java.util.Properties; 21 import java.util.Random; 22 import java.util.Vector; 23 import javax.servlet.http.HttpSessionBindingListener; 24 import javax.servlet.http.HttpSessionBindingEvent; 25 import javax.servlet.http.HttpServletRequest; 26 import javax.servlet.http.HttpServletResponse; 27 import javax.servlet.ServletOutputStream; 28 import org.apache.log4j.Logger; 29 import org.jdom.Document; 30 import org.jdom.Element; 31 import org.jdom.input.SAXBuilder; 32 import org.jdom.JDOMException; 33 34 35 36 public class HTTPConnection 37 { 38 private static final String CONNECTED = "__LPSCONNECTED"; 39 private static final String RECONNECTED = "__LPSRECONNECTED"; 40 41 42 43 private List mQueue; 44 45 46 private String mUsername; 47 48 49 private String mSID; 50 51 52 private String mCID; 53 54 55 private HttpServletResponse mRes; 56 57 58 private OutputStream mOut; 59 60 61 private boolean mDoFlushQueue; 62 63 65 private boolean mDoPad; 66 67 68 private long mHeartbeatInterval; 69 70 71 private int mHeartbeatCount; 72 73 74 private int mRequestCount; 75 76 77 private int mSentCount; 78 79 80 private int mFlushCount; 81 82 83 private int mTotalNumOfBytes; 84 85 86 private int mNumOfBytes; 87 88 90 private String mConnectedString; 91 92 private boolean mEmitConnectHeaders = false; 93 94 95 private int mSWFVersion; 96 97 98 private static Logger mLogger = Logger.getLogger(HTTPConnection.class); 102 103 106 private static String mPad; 107 108 109 private static Object mStaticInitLock = new Object(); 110 111 112 private static boolean mDoStaticInit = true; 113 114 115 private static boolean mDoInit = true; 116 117 118 private static byte[] mSWFHeartbeat; 119 120 121 private static byte[] mSWFDoReconnect; 122 123 124 private static int mMaxMessageLen = 2000; 125 126 127 private static int mConnectionLength = 65536; 128 129 130 private boolean mDoDisconnect = false; 131 132 134 private boolean mIsReconnect = false; 135 136 137 private static int mReconnectionWaitInterval = 60000; 138 139 140 private static long mUIDCount = 0; 141 142 static { 147 if (mDoStaticInit) { 148 synchronized (mStaticInitLock) { 149 if (mDoStaticInit) { 150 StringBuffer buf = new StringBuffer(2000); 151 for (int i=0; i < 2000; i++) 152 buf.append(' '); 153 mPad = buf.toString(); 154 155 mSWFHeartbeat = new SwfByte() 156 .actionSetElement 157 (getElement 158 (getConnectionInfoXML("__LPSHB", null))) 159 .setShowFrame() 160 .getBuf(); 161 162 mSWFDoReconnect = new SwfByte() 163 .actionSetElement 164 (getElement 165 (getConnectionInfoXML("__LPSDORECONNECT", null))) 166 .setShowFrame() 167 .getBuf(); 168 169 mDoStaticInit = false; 170 } 171 } 172 } 173 } 174 175 176 synchronized static public void init(Properties properties) 181 { 182 if (! mDoInit) 183 return; 184 185 mLogger.debug("init(properties)"); 186 187 Enumeration enum = properties.propertyNames(); 188 while (enum.hasMoreElements()) { 189 190 String key = (String)enum.nextElement(); 191 String val = properties.getProperty(key); 192 193 try { 194 if (val != null) { 195 if (key.intern() == "maxMessageLen") 196 mMaxMessageLen = Integer.parseInt(val); 197 else if (key.intern() == "connectionLength") 198 mConnectionLength = Integer.parseInt(val); 199 else if (key.intern() == "reconnectionWaitInterval") 200 mReconnectionWaitInterval = Integer.parseInt(val); 201 } 202 } catch (NumberFormatException e) { 203 mLogger.debug(e.getMessage()); 204 } 205 } 206 207 if (mMaxMessageLen < 2000) 209 mMaxMessageLen = 2000; 210 if (mConnectionLength < (5 * mMaxMessageLen)) 211 mConnectionLength = 5 * mMaxMessageLen; 212 if (mReconnectionWaitInterval < 10000) mReconnectionWaitInterval = 60000; 215 mLogger.debug("maxMessageLen:" + mMaxMessageLen); 216 mLogger.debug("connectionLength:" + mConnectionLength); 217 mDoInit = false; 218 } 219 220 221 225 229 private synchronized static String generateUID() 230 { 231 return Long.toHexString( System.currentTimeMillis() ) + 232 Long.toHexString( mUIDCount++ ); 233 } 234 235 236 242 public HTTPConnection(HttpServletResponse res, String username, int swfversion) 243 throws IOException 244 { 245 mRes = res; 246 mOut = res.getOutputStream(); 247 mUsername = username; 248 mSID = generateUID(); 249 mCID = generateUID(); 250 mHeartbeatCount = 0; 251 mRequestCount = 0; 252 mSentCount = 0; 253 mFlushCount = 0; 254 mTotalNumOfBytes = 0; 255 mNumOfBytes = 0; 256 mDoDisconnect = false; 257 mSWFVersion = swfversion; 258 259 mQueue = new Vector(); 262 263 mConnectedString = CONNECTED; 264 265 mDoFlushQueue = true; 268 269 setDoPad(false); 270 setHeartbeatInterval(0); 271 } 272 273 280 public HTTPConnection(HttpServletResponse res, HTTPConnection hc) 281 throws IOException 282 { 283 mRes = res; 284 mOut = res.getOutputStream(); 285 mUsername = hc.mUsername; 286 mSID = hc.mSID; 287 mCID = generateUID(); 288 mHeartbeatCount += hc.mHeartbeatCount; 289 mRequestCount += hc.mRequestCount; 290 mSentCount += hc.mSentCount; 291 mFlushCount += hc.mFlushCount; 292 mTotalNumOfBytes += hc.mTotalNumOfBytes; 293 mEmitConnectHeaders = hc.mEmitConnectHeaders; 294 mDoDisconnect = false; 295 mSWFVersion = hc.mSWFVersion; 296 297 mConnectedString = RECONNECTED; 298 299 mQueue = hc.mQueue; 301 302 mNumOfBytes = 0; 303 304 mDoFlushQueue = true; 307 308 setDoPad(hc.mDoPad); 309 setHeartbeatInterval(hc.mHeartbeatInterval); 310 } 311 312 313 317 324 synchronized public void send(String msg) 325 throws IOException 326 { 327 mLogger.debug("send(msg=" + msg + ")"); 328 329 if (mDoDisconnect) { 330 mLogger.debug("connection is already disconnected, not sending"); 331 return; 332 } 333 334 Element element = getElement(msg); 335 if (element==null) { 336 mLogger.debug("Bad XML for message: " + msg); 337 return; 338 } 339 340 byte[] swfBuf = swfBuf = new SwfByte() 341 .actionSetElement(element) 342 .setShowFrame() 343 .getBuf(); 344 345 if (swfBuf.length > mMaxMessageLen) { 346 String info = "compiled message bytes are too large -- greater than" + mMaxMessageLen + ")"; 347 mLogger.debug(info); 348 throw new IOException(info); 349 } 350 351 synchronized (mQueue) { 352 mRequestCount++; 353 mQueue.add(swfBuf); 354 } 355 356 if (mDoFlushQueue) { 357 notify(); 358 } 359 360 } 361 362 365 public void disconnect() 366 { 367 disconnect(false); 368 } 369 370 376 synchronized public void disconnect(boolean isReconnect) 377 { 378 mLogger.debug("disconnect()"); 379 mIsReconnect = isReconnect; 380 mDoDisconnect = true; 381 notify(); 382 } 383 384 385 388 synchronized public boolean toBeReconnected() 389 { 390 return mIsReconnect; 391 } 392 393 400 public void connect() 401 throws IOException 402 { 403 mLogger.debug("connect()"); 404 405 try { 406 mNumOfBytes += sendHeader(); 407 mNumOfBytes += flushMessageQueue(); 408 409 mTotalNumOfBytes += mNumOfBytes; 410 411 int numOfBytesSent = 0; 412 long waitInterval = mHeartbeatInterval; 413 long reconnectRequestTime = 0; 415 while (true) { 416 417 mLogger.debug("[" + mUsername + "," + mSID + "] " 418 + " request: " + mRequestCount 419 + " sent: " + mSentCount 420 + ", flush: " + mFlushCount 421 + ", heartbeats: " + mHeartbeatCount 422 + ", bytes: " + mTotalNumOfBytes 423 + " (" + mNumOfBytes + "/" + mConnectionLength + ")"); 424 425 synchronized (this) { 426 wait(waitInterval); 427 } 428 429 if (mDoDisconnect) { 430 mLogger.debug("disconnecting..."); 431 if (reconnectRequestTime != 0) { 432 mLogger.debug("reconnect time: " + 433 ( new Date().getTime() - reconnectRequestTime ) + "ms"); 434 } 435 return; 436 } 437 438 if ( doReconnect() ) { 440 441 mLogger.debug("sending reconnect request " + mUsername + "..."); 442 443 if (reconnectRequestTime == 0) { 444 waitInterval = mReconnectionWaitInterval; 446 447 synchronized (this) { 448 mDoFlushQueue = false; 449 } 450 451 mLogger.debug("...for " + ( waitInterval / 1000 )+ " seconds"); 452 453 mOut.write(mSWFDoReconnect); 455 mOut.flush(); 456 457 mNumOfBytes += mSWFDoReconnect.length; 458 mTotalNumOfBytes += mSWFDoReconnect.length; 459 460 reconnectRequestTime = new Date().getTime(); 461 continue; 462 463 } 464 465 long now = new Date().getTime(); 468 long interval = now - reconnectRequestTime; 469 if (interval < waitInterval) { 470 mLogger.debug("still waiting for reconnect..."); 471 continue; 472 } 473 474 mLogger.debug("interval was " + interval + "; done waiting...goodbye!"); 476 return; 477 } 478 479 numOfBytesSent = 0; 480 481 if (mQueue.size()==0) { 483 mHeartbeatCount++; 484 mOut.write(mSWFHeartbeat); 485 mOut.flush(); 486 numOfBytesSent = mSWFHeartbeat.length; 487 } else { 488 numOfBytesSent = flushMessageQueue(); 489 } 490 491 mNumOfBytes += numOfBytesSent; 492 mTotalNumOfBytes += numOfBytesSent; 493 } 494 } catch (InterruptedException e) { 495 mLogger.debug("InterruptedException: " + e.getMessage()); 496 } 497 } 498 499 500 503 private boolean doReconnect() 504 { 505 return doReconnect(0); 506 } 507 508 512 private boolean doReconnect(int n) 513 { 514 return (mConnectionLength - (mMaxMessageLen * 2)) <= (mNumOfBytes + n); 515 } 516 517 518 521 private int sendHeader() 522 throws InterruptedException, IOException 523 { 524 mLogger.debug("sendHeader()"); 525 526 mRes.setContentLength(mConnectionLength); 530 mRes.setContentType("application/x-shockwave-flash"); 531 532 mRes.setHeader("Connection", "close"); 534 mRes.setHeader("Keep-Alive", "close"); 535 536 if (mEmitConnectHeaders) { 537 if (mConnectedString == CONNECTED) 538 mRes.setHeader("X-LPS-C", mCID); 539 else if (mConnectedString == RECONNECTED) 540 mRes.setHeader("X-LPS-R", mCID); 541 } 542 543 mLogger.debug("Sent connected string: " + mConnectedString); 544 545 String info; 546 if (mConnectedString == CONNECTED) { 547 info = getConnectionInfoXML (mConnectedString, 548 "<cid>" + mCID + "</cid>" + 549 "<sid>" + mSID + "</sid>" + 550 "<usr>" + mUsername + "</usr>"); 551 } else { 552 info = getConnectionInfoXML (mConnectedString, 553 "<cid>" + mCID + "</cid>"); 554 555 } 556 SwfByte swf = new SwfByte(); 557 558 swf.setHeader(mSWFVersion); 559 swf.setLength(mConnectionLength); 560 swf.actionSetElement(getElement(info)); 561 if (mDoPad) swf.actionSetVariable("pad", mPad); 562 swf.setShowFrame(); 563 564 byte[] buf = swf.getBuf(); 565 mOut.write(buf, 0, buf.length); 566 mOut.flush(); 567 568 return buf.length; 569 } 570 571 572 578 private int flushMessageQueue() 579 throws IOException, InterruptedException 580 { 581 mLogger.debug("flushMessageQueue()"); 582 583 byte[] swfBuf; 584 int numOfBytes = 0; 585 synchronized (mQueue) { 586 587 Iterator iter = mQueue.iterator(); 588 if (! iter.hasNext()) 589 return 0; 590 591 while (iter.hasNext()) { 592 swfBuf = (byte[])iter.next(); 593 mOut.write(swfBuf); 594 numOfBytes += swfBuf.length; 595 ++mSentCount; 596 iter.remove(); 597 598 if ( doReconnect(numOfBytes) ) 601 break; 602 } 603 604 mOut.flush(); 605 ++mFlushCount; 606 } 607 608 return numOfBytes; 609 } 610 611 612 615 public boolean doPad() 616 { 617 return mDoPad; 618 } 619 620 627 public HTTPConnection setDoPad(boolean doPad) 628 { 629 mDoPad = doPad; 630 return this; 631 } 632 633 636 public long getHeartbeatInterval() 637 { 638 return mHeartbeatInterval; 639 } 640 641 642 645 public int getSWFVersion() { 646 return mSWFVersion; 647 } 648 649 652 public HTTPConnection setHeartbeatInterval(long heartbeatInterval) 653 { 654 mHeartbeatInterval = heartbeatInterval; 655 return this; 656 } 657 658 659 662 public String getUsername() 663 { 664 return mUsername; 665 } 666 667 670 public String getCID() 671 { 672 return mCID; 673 } 674 675 677 public HTTPConnection setEmitConnectHeaders(boolean emitConnectHeaders) 678 { 679 mEmitConnectHeaders = emitConnectHeaders; 680 return this; 681 } 682 683 689 private static Element getElement(String xml) 690 { 691 Element el = null; 692 try { 693 el = new SAXBuilder(false) 694 .build(new StringReader(xml)) 695 .getRootElement(); 696 } catch (JDOMException e) { 697 mLogger.debug(e.getMessage()); 698 } 699 return el; 700 } 701 702 712 public static String getConnectionInfoXML(String type, String msg) 713 { 714 if (msg==null) 715 return "<resultset s=\"0\"><root dset=\""+type+"\" /></resultset>"; 716 else 717 return "<resultset s=\"0\"><root dset=\""+type+"\">" 718 + msg 719 + "</root></resultset>"; 720 } 721 722 public static int getMaxMessageLen() { 723 return mMaxMessageLen; 724 } 725 public static int getConnectionLength() { 726 return mConnectionLength; 727 } 728 public static int getReconnectionWaitInterval() { 729 return mReconnectionWaitInterval; 730 } 731 732 public String toString() { 733 return 734 new StringBuffer() 735 .append("<connection") 736 .append(" sid=\"").append(mSID).append("\"") 737 .append(" cid=\"").append(mCID).append("\"") 738 .append(" user=\"").append(mUsername).append("\"") 739 .append(" heartbeats=\"").append(mHeartbeatCount).append("\"") 740 .append(" flushes=\"").append(mFlushCount).append("\"") 741 .append(" current-bytes=\"").append(mNumOfBytes).append("\"") 742 .append(" total-bytes=\"").append(mTotalNumOfBytes).append("\"") 743 .append(" />").toString(); 744 } 745 746 } 747 | Popular Tags |