1 29 package jegg.impl; 30 import java.lang.reflect.InvocationTargetException ; 31 import java.lang.reflect.Method ; 32 import java.util.ArrayList ; 33 import java.util.Collection ; 34 import java.util.HashMap ; 35 import java.util.Iterator ; 36 import java.util.Map ; 37 import java.util.Properties ; 38 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 42 import jegg.Egg; 43 import jegg.Message; 44 import jegg.Port; 45 import jegg.PortException; 46 import jegg.timer.Timeout; 47 import jegg.timer.Timer; 48 import jegg.timer.TimeoutListener; 49 import jegg.type.Node; 50 import jegg.type.TypeTree; 51 52 87 public class EggShell 88 { 89 90 private static final Log LOG = LogFactory.getLog(EggShell.class); 91 92 private static final String HANDLE_METHOD_NAME = "handle"; 93 94 95 private PriorityQueue _mqueue = new PriorityQueue(); 96 97 private Dispatcher _dispatcher; 98 private Object _dispatcherLock = new Object (); 99 100 private Map _mcache = new HashMap (); 101 102 private TypeTree _typeTable = new TypeTree(); 103 104 private Class [] _cachedTypeList; 105 106 private Port _port; 107 108 private Object _id; 109 110 private boolean _stopped = false; 111 112 private Message _currentMessage; 113 114 private Egg _handler; 115 116 private Properties _properties; 117 private Property[] _cachedProperties; 118 119 128 public EggShell(final Object id, final Egg h) 129 { 130 super(); 131 if (null == id) throw new IllegalArgumentException ("null id"); 132 if (null == h) throw new IllegalArgumentException ("null handler"); 133 _id = id; 134 _handler = h; 135 fillLookupTable(_handler.getClass()); 136 _handler.setContext(new EggContextImpl(this)); 137 } 138 139 public EggShell(final Object id, final Egg h, Dispatcher d) 140 { 141 this(id,h); 142 if (null == d) 143 assignTo(Dispatcher.getDefaultScheduler()); 144 else 145 assignTo(d); 146 } 147 void setProperties(Properties p) 148 { 149 _properties = p; 150 } 151 152 String getProperty(String key) 153 { 154 return (null != _properties) ? _properties.getProperty(key) : null; 155 } 156 157 String getProperty(String key, String defValue) 158 { 159 return (null != _properties) ? _properties.getProperty(key,defValue) : defValue; 160 } 161 162 Property[] getProperties() 163 { 164 if (null == _properties) {return new Property[0];} 165 if (null != _cachedProperties) 166 return _cachedProperties; 167 _cachedProperties = new Property[_properties.size()]; 168 int nxt = 0; 169 for (Iterator it=_properties.entrySet().iterator(); it.hasNext(); ) 170 { 171 Map.Entry e = (Map.Entry ) it.next(); 172 Property p = new Property((String )e.getKey(),(String )e.getValue()); 173 _cachedProperties[nxt++] = p; 174 } 175 return _cachedProperties; 176 } 177 178 public void assignTo(final Dispatcher d) 179 { 180 if (LOG.isDebugEnabled()) 181 LOG.debug("Assigning "+_id+" to dispatcher "+d.getName()); 182 183 _dispatcher = d; 184 _dispatcher.add(this); 185 } 186 187 194 final void bindToPort(final Port p) 195 { 196 ((PortImpl)p).connect(getPort()); 197 } 198 199 209 final Message createMessage(final Object m) 210 { 211 return createMessage(m,Priority.MEDIUM); 212 } 213 214 224 final Message createMessage(final Object m, final Priority p) 225 { 226 return new MessageImpl(m,getPort(),p); 227 } 228 229 241 final Timer createRepeatingTimer(final long interval_msec, final long delay_msec) 242 { 243 TimeoutListener tl = new TimeoutListener() 244 { 245 public void timeout(Timer tt) 246 { 247 emitTimeout(tt); 248 } 249 }; 250 return Timer.createRepeatingTimer(tl, interval_msec, delay_msec); 251 } 252 253 260 final Timer createSingleShotTimer(final long delay_msec) 261 { 262 TimeoutListener tl = new TimeoutListener() 263 { 264 public void timeout(Timer tt) 265 { 266 emitTimeout(tt); 267 } 268 }; 269 return Timer.createSingleShotTimer(tl, delay_msec); 270 } 271 272 283 final void dispatch(final Message incoming) 284 { 285 if (LOG.isDebugEnabled()) 286 { 287 LOG.debug(_id + ": Dispatching: " + incoming.toString()); 288 } 289 290 Object message = incoming.getMessage(); 291 Class msgType = message.getClass(); 292 Method method = lookup(msgType); 293 294 if (null == method) 295 { 296 LOG.error( 297 "No handler found - dropping message: " + incoming.toString()); 298 return; 299 } 300 301 try 302 { 303 if (LOG.isDebugEnabled()) 304 { 305 LOG.debug( 306 _id 307 + ": Invoking handler: " 308 + method.toString()); 309 } 310 311 _currentMessage = incoming; 312 313 method.invoke( 314 _handler, 315 new Object [] {message} 316 ); 317 } 318 catch (InvocationTargetException e) 319 { 320 LOG.error("Invocation error: "+e.getCause()); 321 e.printStackTrace(); 322 } 323 catch (Throwable t) 324 { 325 LOG.error( 326 _handler.getClass().getName()+": " + 327 "Error raised while handling message [" 328 + incoming.toString() 329 + "]: " 330 + t); 331 } 332 finally 333 { 334 _currentMessage = null; 335 } 336 } 337 338 345 private final void emitTimeout(final Timer t) 346 { 347 try 348 { 349 getPort().send(createMessage(new Timeout(t))); 350 } 351 catch (PortException e) 352 { 353 LOG.error("Failed to emit timeout: ", e); 354 } 355 } 356 357 363 final void enqueue(final Message message) 364 { 365 if (_stopped) 366 { 367 throw new IllegalStateException ("stopped"); 368 } 369 370 synchronized (_dispatcherLock) 371 { 372 if (LOG.isDebugEnabled()) 373 { 374 LOG.debug(_id + ": enqueue: " + message.toString()); 375 } 376 377 Priority p = message.getPriority(); 378 _mqueue.add(p,message); 379 380 if (LOG.isDebugEnabled()) 381 { 382 LOG.debug( 383 _id 384 + ": enqueue: has " 385 + _mqueue.size() 386 + " messages"); 387 } 388 389 if (LOG.isDebugEnabled()) 390 { 391 LOG.debug(_id + ": enqueue: notifying scheduler"); 392 } 393 394 if (null != _dispatcher) 395 { 396 synchronized (_dispatcher) 397 { 398 _dispatcher.notifyAll(); 399 } 400 } 401 402 if (LOG.isDebugEnabled()) 403 { 404 LOG.debug(_id + ": enqueue: done"); 405 } 406 } 407 } 408 409 415 private final void fillLookupTable(final Class fromClass) 416 { 417 if (LOG.isDebugEnabled()) 418 { 419 LOG.debug( 420 _id 421 + ": fillLookupTable(" 422 + fromClass.getName() 423 + ")"); 424 } 425 426 Method [] methods = fromClass.getDeclaredMethods(); 427 for (int i = 0; i < methods.length; ++i) 428 { 429 Method m = methods[i]; 430 String methodName = m.getName(); 431 if (!methodName.equals(HANDLE_METHOD_NAME)) 432 { 433 continue; 434 } 435 Class [] methodParameterList = m.getParameterTypes(); 436 if (null == methodParameterList || 1 != methodParameterList.length) 437 { 438 continue; 439 } 440 _typeTable.insert(methodParameterList[0],m); 441 } 442 } 443 444 448 final Message getCurrentMessage() 449 { 450 return _currentMessage; 451 } 452 453 457 Dispatcher getDispatcher() 458 { 459 return _dispatcher; 460 } 461 462 466 final Port getFromPort() 467 { 468 return _currentMessage.getFrom(); 469 } 470 471 final Egg getHandler() 472 { 473 return _handler; 474 } 475 476 482 final Object getId() 483 { 484 return _id; 485 } 486 487 494 public final Class [] getInterface() 495 { 496 if (null == _cachedTypeList) 497 { 498 final Collection c = new ArrayList (); 499 for (final Iterator it=_typeTable.iterator();it.hasNext(); ) 500 c.add(it.next()); 501 _cachedTypeList = (Class []) c.toArray(new Class [c.size()]); 502 } 503 504 Class [] copy = new Class [_cachedTypeList.length]; 506 System.arraycopy(_cachedTypeList, 0, copy, 0, copy.length); 507 508 return _cachedTypeList; 509 } 510 511 515 final MessageImpl getNextMessage() 516 { 517 MessageImpl m = null; 518 try 519 { 520 m = (MessageImpl) _mqueue.next(); 521 } 522 catch (Throwable t) 523 { 524 } 526 return m; 527 } 528 529 534 final long getNumPendingMessages() 535 { 536 long num = _mqueue.size(); 537 if (LOG.isDebugEnabled()) 538 { 539 LOG.debug( 540 _id + ": getNumPendingMessages: " + Long.toString(num)); 541 } 542 return num; 543 } 544 545 void setPort(final Port p ) 546 { 547 _port = p; 548 } 549 550 554 public final Port getPort() 555 { 556 return _port; 557 } 558 559 563 final PriorityQueue getQueue() {return _mqueue;} 564 565 571 private final Method lookup(final Class argType) 572 { 573 Method m = (Method ) _mcache.get(argType); 574 if (null == m) 576 { 577 Node nd = _typeTable.find(argType); 578 if (null != nd) 579 { 580 m = (Method ) nd.getCookie(); 581 _mcache.put(argType,m); 582 } 583 } 584 585 if (m == null) 586 { 587 throw new IllegalStateException ("Egg is missing required handler for java.lang.Object"); 590 } 591 592 return m; 593 } 594 595 602 final void publishPort(final String n) 603 { 604 try 605 { 606 PortRegistry.getInstance().getPort().send(createMessage(new PublishPortMessage(n, getPort()))); 607 } 608 catch (PortException e) 609 { 610 LOG.error("Failed to publish port "+n+": ", e); 611 } 612 } 613 614 623 final void requestPort(final String n) 624 { 625 try 626 { 627 PortRegistry.getInstance().getPort().send(createMessage(new LocatePortMessage(n))); 628 } 629 catch (PortException e) 630 { 631 LOG.error("Failed to publish port "+n+": ", e); 632 } 633 } 634 635 636 638 645 final void respond(final Object message) 646 { 647 respond(message,Priority.MEDIUM); 648 } 649 650 658 final void respond(final Object message, final Priority priority) 659 { 660 respond(getFromPort(),message,priority); 661 } 662 663 674 final void respond(final Port port, final Object message) 675 { 676 respond(port,message,Priority.MEDIUM); 677 } 678 679 692 final void respond(final Port port, final Object message, final Priority priority) 693 { 694 try 695 { 696 port.send(createMessage(message,priority)); 697 } 698 catch (PortException e) 699 { 700 LOG.error("Failed to send response "+message+": ", e); 701 } 702 } 703 704 710 protected final void send(final Object msg) 711 { 712 ((PortImpl)getPort()).broadcast(msg); 713 } 714 715 722 final void stop() 723 { 724 _stopped = true; 725 synchronized (_dispatcher) 726 { 727 _mqueue.clear(); 728 } 729 _dispatcher.remove(this); 730 } 731 } | Popular Tags |