1 package com.ubermq.kernel; 2 3 import java.io.*; 4 import java.nio.*; 5 import java.util.*; 6 7 import org.apache.log4j.*; 8 9 import EDU.oswego.cs.dl.util.concurrent.*; 10 11 import com.ubermq.kernel.event.*; 12 import com.ubermq.util.*; 13 14 21 public abstract class AbstractConnectionInfo 22 implements IConnectionInfo 23 { 24 private static final Logger log = Logger.getLogger(AbstractConnectionInfo.class); 25 26 private String id; 27 28 33 protected boolean shouldProcess; 34 35 private SynchronizedBoolean open; 36 37 private ByteBuffer readBuffer; 38 private ByteBuffer writeBuffer; 39 40 private Sync readMutex, writeMutex; 41 42 private List eventHandlers; 43 44 private static long nextId = 2; 46 47 50 protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE, 51 "1048576")).intValue(); 52 53 56 protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue(); 57 58 private final IMessageProcessor proc; 60 61 private final IDatagramFactory factory; 63 64 72 public AbstractConnectionInfo(IMessageProcessor p, 73 IDatagramFactory f) 74 { 75 this(p, 76 f, 77 MAX_READ, 78 MAX_READ); 79 } 80 81 89 public AbstractConnectionInfo(IMessageProcessor p, 90 IDatagramFactory f, 91 int rbuf, 92 int wbuf) 93 { 94 this(p, 95 f, 96 (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null, 97 (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null); 98 } 99 100 108 public AbstractConnectionInfo(IMessageProcessor p, 109 IDatagramFactory f, 110 ByteBuffer r, 111 ByteBuffer w) 112 { 113 readBuffer = r; 114 if (readBuffer != null) 115 { 116 readMutex = new ReentrantLock(); 117 } 118 119 writeBuffer = w; 120 if (writeBuffer != null) 121 { 122 writeMutex = new ReentrantLock(); 123 } 124 125 this.factory = f; 126 id = String.valueOf(allocateProcessUniqueId()); 127 shouldProcess = true; 128 open = new SynchronizedBoolean(true); 129 eventHandlers = new LinkedList(); 130 proc = p; 131 132 sendEvent(ConnectionEvent.CONNECTION_CONNECTED); 134 } 135 136 public void close() 137 { 138 if (open.get()) 139 { 140 open.set(false); 141 proc.remove(this); 142 143 assert writeBuffer.position() == 0 : "buffer not empty"; 146 147 sendEvent(ConnectionEvent.CONNECTION_CLOSED); 149 } 150 } 151 152 public boolean isOpen() 153 { 154 return open.get(); 155 } 156 157 public void addEventListener(ConnectionEventListener l) 158 { 159 eventHandlers.add(l); 160 } 161 162 public void removeEventListener(ConnectionEventListener l) 163 { 164 eventHandlers.remove(l); 165 } 166 167 172 protected void sendEvent(ConnectionEvent event) 173 { 174 log.debug("sending connection event " + event); 175 176 Iterator iter = eventHandlers.iterator(); 177 while (iter.hasNext()) 178 { 179 ConnectionEventListener l = (ConnectionEventListener)iter.next(); 180 try 181 { 182 l.connectionEvent(event); 183 } 184 catch(RuntimeException x) 185 { 186 log.fatal("", x); 189 } 190 } 191 } 192 193 197 void sendEvent(int eventCode) 198 { 199 sendEvent(new ConnectionEvent(this, eventCode)); 200 } 201 202 public static synchronized long allocateProcessUniqueId() 203 { 204 return ++nextId; 205 } 206 207 221 public void output(IDatagram d, IOverflowHandler h) 222 throws IOException, BufferOverflowException 223 { 224 if (!open.get()) 225 throw new IOException("not open"); 226 227 try 228 { 229 while(open.get()) 230 { 231 try 232 { 233 writeMutex.acquire(); 234 235 ByteBuffer output = writeBuffer.slice(); 237 factory.outgoing(output, d); 238 239 writeBuffer.position(writeBuffer.position() + output.position()); 241 break; 242 } 243 catch(BufferOverflowException boe) 244 { 245 } 246 finally 247 { 248 writeMutex.release(); 249 } 250 251 requestWrite(); 254 255 int a = processOverflow(d, h); 257 if (a == IOverflowHandler.ACTION_RETRY) 258 { 259 h = h.getRetryHandler(); 260 } 261 else if (a == IOverflowHandler.ACTION_FAIL) 262 throw new BufferOverflowException(); 263 else 264 break; } 266 267 requestWrite(); 269 } 270 catch(InterruptedException ie) 271 { 272 } 276 } 277 278 283 private int processOverflow(IDatagram d, 284 IOverflowHandler h) 285 { 286 if (h instanceof IConnectionOverflowHandler) 287 { 288 return ((IConnectionOverflowHandler)h).overflow(d, this, proc); 289 } 290 else 291 { 292 return h.overflow(d); 293 } 294 } 295 296 public void flush() 297 throws IOException 298 { 299 try 300 { 301 writeMutex.acquire(); 302 doFlush(); 303 } 304 catch(InterruptedException ie) 305 { 306 } 310 finally 311 { 312 writeMutex.release(); 313 } 314 } 315 316 320 private void doFlush() 321 throws IOException 322 { 323 try 328 { 329 if (writeBuffer.position() > 0) 330 { 331 writeBuffer.flip(); 332 int n = doWrite(writeBuffer); 333 log.debug(this + " flushed " + n + " octets"); 334 efficientCompact(writeBuffer); 335 } 336 337 if (writeBuffer.position() == 0) 339 cancelWriteRequest(); 340 } 341 catch(java.io.IOException iox) 342 { 343 log.debug("Unable to write bytes", iox); 344 345 sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); 347 348 cancelWriteRequest(); 350 close(); 351 352 throw iox; 354 } 355 } 356 357 365 protected abstract int doWrite(ByteBuffer writeBuffer) 366 throws java.io.IOException ; 367 368 protected void requestWrite() 369 throws IOException 370 { 371 doFlush(); 372 } 373 374 protected void cancelWriteRequest() 375 { 376 } 377 378 protected boolean readyToWrite() 379 { 380 try 381 { 382 writeMutex.acquire(); 383 return writeBuffer.hasRemaining(); 384 } 385 catch(InterruptedException ie) 386 { 387 return false; 391 } 392 finally 393 { 394 writeMutex.release(); 395 } 396 } 397 398 400 404 protected ByteBuffer getReadBuffer() 405 throws InterruptedException 406 { 407 readMutex.acquire(); 408 return readBuffer; 409 } 410 411 415 protected void releaseReadBuffer(ByteBuffer rb) 416 { 417 readMutex.release(); 418 } 419 420 424 public void processData() 425 { 426 try 427 { 428 readMutex.acquire(); 429 430 int expecting=0; 432 preProcessData(); 433 434 while(true) 436 { 437 expecting = factory.frame(readBuffer); 441 442 if (readBuffer.remaining() >= expecting) 446 { 447 ByteBuffer process = readBuffer.slice(); 449 process.limit(expecting); 450 451 readBuffer.position(readBuffer.position() + expecting); 456 457 if (shouldProcess) 459 { 460 IDatagram d = factory.incoming(process); 462 463 proc.process(this, d); 465 } 466 } 467 else 468 { 469 break; 470 } 471 } 472 } 473 catch (java.io.IOException ise) 474 { 475 log.debug("Invalid Protocol Detected", ise); 476 477 sendEvent(ConnectionEvent.CONNECTION_INVALID_PROTOCOL); 479 480 close(); 483 } 484 catch (InterruptedException ie) 485 { 486 } 488 finally 489 { 490 postProcessData(); 492 493 readMutex.release(); 495 } 496 } 497 498 503 protected void preProcessData() 504 { 505 readBuffer.flip(); 506 } 507 508 511 protected void postProcessData() 512 { 513 efficientCompact(readBuffer); 515 } 516 517 523 private void efficientCompact(ByteBuffer bb) 524 { 525 if (bb.hasRemaining()) 526 { 527 bb.compact(); 528 } 529 else 530 bb.clear(); 531 } 532 533 public String toString() 534 { 535 return getId(); 536 } 537 538 public final String getId() 539 { 540 return id; 541 } 542 543 public boolean equals(Object o) 544 { 545 try 546 { 547 return (getId().equals( ((ConnectionInfo)o).getId())); 548 } 549 catch (ClassCastException e) 550 { 551 return false; 552 } 553 } 554 555 public int hashCode() 556 { 557 return getId().hashCode(); 558 } 559 } 560 561 | Popular Tags |