KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > jegg > impl > EggShell


1 /*
2  * Copyright (c) 2004, Bruce Lowery
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * - Redistributions of source code must retain the above copyright notice,
9  * this list of conditions and the following disclaimer.
10  * - Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * - Neither the name of JEGG nor the names of its contributors may be used
14  * to endorse or promote products derived from this software without
15  * specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */

29 package jegg.impl;
30 import java.lang.reflect.InvocationTargetException JavaDoc;
31 import java.lang.reflect.Method JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.util.Collection JavaDoc;
34 import java.util.HashMap JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.Properties JavaDoc;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41
42 import jegg.Egg;
43 import jegg.Message;
44 import jegg.Port;
45 import jegg.PortException;
46 import jegg.timer.Timeout;
47 import jegg.timer.Timer;
48 import jegg.timer.TimeoutListener;
49 import jegg.type.Node;
50 import jegg.type.TypeTree;
51
52 /**
53  * Base class for implementing an egg. Concrete classes that extend this
54  * class will be plugged into the JEgg framework and be able to send and
55  * receive arbitrary messages. The Egg base class also provides
56  * convenience methods for the following:
57  * <ul>
58  * <li>Responding to a message (see {@link #respond(Object) respond})</li>
59  * <li>Publishing this egg's message port (see {@link #publishPort(String) publishPort})</li>
60  * <li>Looking up another egg's message port (see {@link #requestPort(String) requestPort})</li>
61  * <li>Broadcasting a message (see {@link #send(Object) send})
62  * <li>'Binding' to another egg's port (see {@link #bindToPort(Port) bindToPort})</li>
63  * <li>Creating an egg timer (see {@link #createTimer(long, long, boolean) createTimer})</li>
64  * <li>Creation of a JEgg message from an application object (see {@link #createMessage(Object)})
65  * </ul>
66  * <p>
67  * The default constructor will assign this egg to the default thread. Use the
68  * constructor that takes a {@link jegg.impl.Dispatcher Dispatcher} in order to
69  * assign the egg to a specific thread.
70  * <p>
71  * The derived class should implement handlers for the message types that it expects
72  * to receive. A message handler is implemented by overloading the {@link #handle(Object) handle}
73  * method with a version that accepts the expected message type. When the derived
74  * class is loaded, the <code>handle</code> methods are reflected and cached in
75  * a lookup table to improve the message delivery performance.
76  * <p>
77  * The message dispatcher assigned to the egg will only deliver one message at
78  * a time to the Egg subclass. If an egg is expected to take a long time to
79  * process some messages, then it should be assigned its own dispatcher (see
80  * above).
81  * <p>
82  * Messages are dispatched to an egg according to their priority, in the order
83  * they were sent. So, a more recent message with a high priority can be
84  * delivered to the egg before a message that was sent earlier but with a
85  * lower priority.
86  */

87 public class EggShell
88 {
89     /** Logger */
90     private static final Log LOG = LogFactory.getLog(EggShell.class);
91     /** The name of the [overloaded] method that subclasses must implement. */
92     private static final String JavaDoc HANDLE_METHOD_NAME = "handle";
93     
94     /** Message queue */
95     private PriorityQueue _mqueue = new PriorityQueue();
96     /** The message dispatcher for this egg*/
97     private Dispatcher _dispatcher;
98     private Object JavaDoc _dispatcherLock = new Object JavaDoc();
99     /** Lookup table */
100     private Map JavaDoc _mcache = new HashMap JavaDoc();
101     /** Holds this Eggs message types */
102     private TypeTree _typeTable = new TypeTree();
103     /** Cached list of types handled by this egg */
104     private Class JavaDoc[] _cachedTypeList;
105     /** Port */
106     private Port _port;
107     /** Name */
108     private Object JavaDoc _id;
109     /** True if stopped */
110     private boolean _stopped = false;
111     /** Current active message */
112     private Message _currentMessage;
113     /** Message handler */
114     private Egg _handler;
115     /** Properties */
116     private Properties JavaDoc _properties;
117     private Property[] _cachedProperties;
118     
119     /**
120      * Construct an egg that dispatches messages to a user-supplied
121      * message handler using a specific message dispatcher.
122      * @param id a unique object that can be used to identify this egg. The
123      * name should be unique among the eggs instantiated in the JVM.
124      * @param d the message dispatcher to use to dispatch messages to this egg.
125      * @param h a user-supplied message handler.
126      * @throws IllegalArgumentException if any argument is null.
127      */

128     public EggShell(final Object JavaDoc id, final Egg h)
129     {
130         super();
131         if (null == id) throw new IllegalArgumentException JavaDoc("null id");
132         if (null == h) throw new IllegalArgumentException JavaDoc("null handler");
133         _id = id;
134         _handler = h;
135         fillLookupTable(_handler.getClass());
136         _handler.setContext(new EggContextImpl(this));
137     }
138     
139     public EggShell(final Object JavaDoc id, final Egg h, Dispatcher d)
140     {
141         this(id,h);
142         if (null == d)
143             assignTo(Dispatcher.getDefaultScheduler());
144         else
145             assignTo(d);
146     }
147     void setProperties(Properties JavaDoc p)
148     {
149         _properties = p;
150     }
151     
152     String JavaDoc getProperty(String JavaDoc key)
153     {
154         return (null != _properties) ? _properties.getProperty(key) : null;
155     }
156     
157     String JavaDoc getProperty(String JavaDoc key, String JavaDoc defValue)
158     {
159         return (null != _properties) ? _properties.getProperty(key,defValue) : defValue;
160     }
161     
162     Property[] getProperties()
163     {
164         if (null == _properties) {return new Property[0];}
165         if (null != _cachedProperties)
166             return _cachedProperties;
167         _cachedProperties = new Property[_properties.size()];
168         int nxt = 0;
169         for (Iterator JavaDoc it=_properties.entrySet().iterator(); it.hasNext(); )
170         {
171             Map.Entry JavaDoc e = (Map.Entry JavaDoc) it.next();
172             Property p = new Property((String JavaDoc)e.getKey(),(String JavaDoc)e.getValue());
173             _cachedProperties[nxt++] = p;
174         }
175         return _cachedProperties;
176     }
177     
178     public void assignTo(final Dispatcher d)
179     {
180         if (LOG.isDebugEnabled())
181             LOG.debug("Assigning "+_id+" to dispatcher "+d.getName());
182         
183         _dispatcher = d;
184         _dispatcher.add(this);
185     }
186     
187     /**
188      * Register to receive <i>all</i> messages broadcast by the egg that the
189      * specified port belongs to. A egg can broadcast a message using the
190      * {@link #send(Object) send} method.
191      *
192      * @param p the port to register for broadcast messages with.
193      */

194     final void bindToPort(final Port p)
195     {
196         ((PortImpl)p).connect(getPort());
197     }
198     
199     /**
200      * Create JEgg message from an application-defined message. The message
201      * will be created with a medium priority level assigned to it.
202      * Additionally, this egg's port will be saved in the JEgg message.
203      * <p>
204      * This is useful to do when the same message has to be sent to many
205      * different eggs.
206      * @param message the application message to wrap in a JEgg message.
207      * @return a JEgg message wrapping the application message.
208      */

209     final Message createMessage(final Object JavaDoc m)
210     {
211         return createMessage(m,Priority.MEDIUM);
212     }
213
214     /**
215      * Create JEgg message from an application-defined message. The message
216      * will be created with the specified priority level assigned to it.
217      * Additionally, this egg's port will be saved in the JEgg message.
218      * <p>
219      * This is useful to do when the same message has to be sent to many
220      * different eggs.
221      * @param message the application message to wrap in a JEgg message.
222      * @return a JEgg message wrapping the application message.
223      */

224     final Message createMessage(final Object JavaDoc m, final Priority p)
225     {
226         return new MessageImpl(m,getPort(),p);
227     }
228     
229     /**
230      * Create an egg timer. The timer will deliver {@link #jegg.timer.Timeout Timeout}
231      * messages to this egg until {@link #jegg.timer.Timer.cancel() Timer.cancel()}
232      * is called.
233      * <p>
234      * The timer is automatically started before being returned.
235      *
236      * @param interval_msec the number of milliseconds between timeout messages.
237      * @param delay_msec the initial delay that the timer will wait before it
238      * starts delivering timeout messages.
239      * @return A new timer instance.
240      */

241     final Timer createRepeatingTimer(final long interval_msec, final long delay_msec)
242     {
243         TimeoutListener tl = new TimeoutListener()
244         {
245             public void timeout(Timer tt)
246             {
247                 emitTimeout(tt);
248             }
249         };
250         return Timer.createRepeatingTimer(tl, interval_msec, delay_msec);
251     }
252     
253     /**
254      * Create an egg timer. The timer will deliver a single {@link #jegg.timer.Timeout Timeout}
255      * message to this egg.
256      * @param delay_msec the number of milliseconds after which the timeout
257      * message will be delivered.
258      * @return a new timer instance.
259      */

260     final Timer createSingleShotTimer(final long delay_msec)
261     {
262         TimeoutListener tl = new TimeoutListener()
263         {
264             public void timeout(Timer tt)
265             {
266                 emitTimeout(tt);
267             }
268         };
269         return Timer.createSingleShotTimer(tl, delay_msec);
270     }
271     
272     /**
273      * Deliver a message to the derived class handler. The message is
274      * delivered to the most specific handler based on the concrete type
275      * of the message.
276      * <p>
277      * If no handler can be found, the message is dropped.
278      * <p>
279      * This method is used by a message dispatcher to deliver a message to
280      * the egg.
281      * @param message to handle.
282      */

283     final void dispatch(final Message incoming)
284     {
285         if (LOG.isDebugEnabled())
286         {
287             LOG.debug(_id + ": Dispatching: " + incoming.toString());
288         }
289
290         Object JavaDoc message = incoming.getMessage();
291         Class JavaDoc msgType = message.getClass();
292         Method JavaDoc method = lookup(msgType);
293
294         if (null == method)
295         {
296             LOG.error(
297                 "No handler found - dropping message: " + incoming.toString());
298             return;
299         }
300         
301         try
302         {
303             if (LOG.isDebugEnabled())
304             {
305                 LOG.debug(
306                     _id
307                         + ": Invoking handler: "
308                         + method.toString());
309             }
310             
311             _currentMessage = incoming;
312             
313             method.invoke(
314                     _handler,
315                     new Object JavaDoc[] {message}
316             );
317         }
318         catch (InvocationTargetException JavaDoc e)
319         {
320             LOG.error("Invocation error: "+e.getCause());
321             e.printStackTrace();
322         }
323         catch (Throwable JavaDoc t)
324         {
325             LOG.error(
326                 _handler.getClass().getName()+": " +
327                 "Error raised while handling message ["
328                     + incoming.toString()
329                     + "]: "
330                     + t);
331         }
332         finally
333         {
334             _currentMessage = null;
335         }
336     }
337
338     /**
339      * Send a timeout notification to the derived class. This method is
340      * used to deliver timeout messages from a running timer (see {@link
341      * #createTimer(long,long,boolean) createTimer}).
342      *
343      * @param t the timer that is sending the timeout message.
344      */

345     private final void emitTimeout(final Timer t)
346     {
347         try
348         {
349             getPort().send(createMessage(new Timeout(t)));
350         }
351         catch (PortException e)
352         {
353             LOG.error("Failed to emit timeout: ", e);
354         }
355     }
356     
357     /**
358      * Add a message to this egg's message queue. This method is only
359      * used by the egg's port when another egg sends a message to this egg.
360      *
361      * @param message the message to add to the queue.
362      */

363     final void enqueue(final Message message)
364     {
365         if (_stopped)
366         {
367             throw new IllegalStateException JavaDoc("stopped");
368         }
369
370         synchronized (_dispatcherLock)
371         {
372             if (LOG.isDebugEnabled())
373             {
374                 LOG.debug(_id + ": enqueue: " + message.toString());
375             }
376
377             Priority p = message.getPriority();
378             _mqueue.add(p,message);
379
380             if (LOG.isDebugEnabled())
381             {
382                 LOG.debug(
383                     _id
384                         + ": enqueue: has "
385                         + _mqueue.size()
386                         + " messages");
387             }
388
389             if (LOG.isDebugEnabled())
390             {
391                 LOG.debug(_id + ": enqueue: notifying scheduler");
392             }
393
394             if (null != _dispatcher)
395             {
396                synchronized (_dispatcher)
397                {
398                    _dispatcher.notifyAll();
399                }
400             }
401
402             if (LOG.isDebugEnabled())
403             {
404                 LOG.debug(_id + ": enqueue: done");
405             }
406         }
407     }
408     
409     /**
410      * Populate the lookup table used to find message handlers in the
411      * derived class. This method is called once when the egg is created.
412      *
413      * @param fromClass the derived class to search for message handlers.
414      */

415     private final void fillLookupTable(final Class JavaDoc fromClass)
416     {
417         if (LOG.isDebugEnabled())
418         {
419             LOG.debug(
420                 _id
421                     + ": fillLookupTable("
422                     + fromClass.getName()
423                     + ")");
424         }
425
426         Method JavaDoc[] methods = fromClass.getDeclaredMethods();
427         for (int i = 0; i < methods.length; ++i)
428         {
429             Method JavaDoc m = methods[i];
430             String JavaDoc methodName = m.getName();
431             if (!methodName.equals(HANDLE_METHOD_NAME))
432             {
433                 continue;
434             }
435             Class JavaDoc[] methodParameterList = m.getParameterTypes();
436             if (null == methodParameterList || 1 != methodParameterList.length)
437             {
438                 continue;
439             }
440             _typeTable.insert(methodParameterList[0],m);
441         }
442     }
443     
444     /**
445      * Return the current message being processed.
446      * @return the`current message.
447      */

448     final Message getCurrentMessage()
449     {
450         return _currentMessage;
451     }
452
453     /**
454      * Return this egg's message dispatcher.
455      * @return the dispatcher assigned to this egg.
456      */

457     Dispatcher getDispatcher()
458     {
459         return _dispatcher;
460     }
461     
462     /**
463      * Return the port of the egg that sent the current message.
464      * @return the sender's port
465      */

466     final Port getFromPort()
467     {
468         return _currentMessage.getFrom();
469     }
470     
471     final Egg getHandler()
472     {
473         return _handler;
474     }
475     
476     /**
477      * Return the ID for this egg. The egg's ID is arbitrary and not used
478      * by the JEgg framework; this is purely for user convenience.
479      *
480      * @return the id assigned to this egg.
481      */

482     final Object JavaDoc getId()
483     {
484         return _id;
485     }
486
487     /**
488      * Return the message types that this egg will handle.
489      * @return array of class objects. Each element in the
490      * array is a type for which this egg has a specific
491      * handler method.
492      * @return array of class objects.
493      */

494     public final Class JavaDoc[] getInterface()
495     {
496         if (null == _cachedTypeList)
497         {
498             final Collection JavaDoc c = new ArrayList JavaDoc();
499             for (final Iterator JavaDoc it=_typeTable.iterator();it.hasNext(); )
500                 c.add(it.next());
501             _cachedTypeList = (Class JavaDoc[]) c.toArray(new Class JavaDoc[c.size()]);
502         }
503         
504         // Return a copy, not the data member.
505
Class JavaDoc[] copy = new Class JavaDoc[_cachedTypeList.length];
506         System.arraycopy(_cachedTypeList, 0, copy, 0, copy.length);
507         
508         return _cachedTypeList;
509     }
510     
511     /**
512      * Return next message in order of priority.
513      * @return
514      */

515     final MessageImpl getNextMessage()
516     {
517         MessageImpl m = null;
518         try
519         {
520             m = (MessageImpl) _mqueue.next();
521         }
522         catch (Throwable JavaDoc t)
523         {
524             // EMPTY
525
}
526         return m;
527     }
528     
529     /**
530      * Return the number of undelivered messages in this egg's
531      * message queue.
532      * @return the number of undelivered messages.
533      */

534     final long getNumPendingMessages()
535     {
536         long num = _mqueue.size();
537         if (LOG.isDebugEnabled())
538         {
539             LOG.debug(
540                     _id + ": getNumPendingMessages: " + Long.toString(num));
541         }
542         return num;
543     }
544     
545     void setPort(final Port p )
546     {
547         _port = p;
548     }
549     
550     /**
551      * Return the port used to deliver messages to this egg.
552      * @return this egg's message port.
553      */

554     public final Port getPort()
555     {
556         return _port;
557     }
558     
559     /**
560      * Return a reference to this Egg's message queue.
561      * @return message queue.
562      */

563     final PriorityQueue getQueue() {return _mqueue;}
564     
565     /**
566      * Return the derived class handler method for a message of the
567      * type specified by the argument.
568      * @param argType the argument type of the handler to return.
569      * @return the handler for messages of the specified type.
570      */

571     private final Method JavaDoc lookup(final Class JavaDoc argType)
572     {
573         Method JavaDoc m = (Method JavaDoc) _mcache.get(argType);
574         //Method m = null;
575
if (null == m)
576         {
577             Node nd = _typeTable.find(argType);
578             if (null != nd)
579             {
580                 m = (Method JavaDoc) nd.getCookie();
581                 _mcache.put(argType,m);
582             }
583         }
584         
585         if (m == null)
586         {
587             // This should never, ever execute since every Egg should
588
// at least have a handler for java.lang.Object.
589
throw new IllegalStateException JavaDoc("Egg is missing required handler for java.lang.Object");
590         }
591
592         return m;
593     }
594
595     /**
596      * Publish this egg's message port in the port registry. Other eggs
597      * can lookup the port in the registry and use it to send messages to
598      * this egg.
599      * @param name the name to register the port under.
600      * @throws PortException
601      */

602     final void publishPort(final String JavaDoc n)
603     {
604         try
605         {
606             PortRegistry.getInstance().getPort().send(createMessage(new PublishPortMessage(n, getPort())));
607         }
608         catch (PortException e)
609         {
610             LOG.error("Failed to publish port "+n+": ", e);
611         }
612     }
613     
614     /**
615      * Send a lookup request to the port registry. The port will be delivered
616      * to this egg as a {@link jegg.Port Port} message.
617      * <p>
618      * If the port is not registered in the registry when the registry receives
619      * the port request, the request will be saved until the target port is registered.
620      *
621      * @param name the name of the registered port.
622      */

623     final void requestPort(final String JavaDoc n)
624     {
625         try
626         {
627             PortRegistry.getInstance().getPort().send(createMessage(new LocatePortMessage(n)));
628         }
629         catch (PortException e)
630         {
631             LOG.error("Failed to publish port "+n+": ", e);
632         }
633     }
634
635     
636     // TODO implement unbindFromPort(Port p)
637

638     /**
639      * Send a message to the egg that sent the current message.
640      * The message will be sent with a <i>medium</i> priority
641      * level assigned to it.
642      *
643      * @param message the message to send to the sending egg.
644      */

645     final void respond(final Object JavaDoc message)
646     {
647         respond(message,Priority.MEDIUM);
648     }
649     
650     /**
651      * Send a message to the egg that sent the current message.
652      * The message will be sent with the specified priority
653      * level assigned to it.
654      *
655      * @param message the message to send to the sending egg.
656      * @param priority the priority to assign to the message.
657      */

658     final void respond(final Object JavaDoc message, final Priority priority)
659     {
660         respond(getFromPort(),message,priority);
661     }
662     
663     /**
664      * Send a message to the egg that the specified port
665      * belongs to. This method is useful when a message
666      * response has to be delayed until later (so the port
667      * is saved for use later). The message will be sent
668      * with a medium priority level assigned to it.
669      *
670      * @param port the port to send the message on.
671      * @param message the message to send to the egg that
672      * the specified port belongs to.
673      */

674     final void respond(final Port port, final Object JavaDoc message)
675     {
676         respond(port,message,Priority.MEDIUM);
677     }
678     
679     /**
680      * Send a message to the egg that the specified port
681      * belongs to. This method is useful when a message
682      * response has to be delayed until later (so the port
683      * is saved for use later). The message will be sent
684      * with the specified priority level assigned to it.
685      *
686      * @param port the port to send the message on.
687      * @param message the message to send to the egg that
688      * the specified port belongs to.
689      * @param priority the message's priority assignment.
690      * @throws PortException
691      */

692     final void respond(final Port port, final Object JavaDoc message, final Priority priority)
693     {
694         try
695         {
696             port.send(createMessage(message,priority));
697         }
698         catch (PortException e)
699         {
700             LOG.error("Failed to send response "+message+": ", e);
701         }
702     }
703     
704     /**
705      * Broadcast a message to all eggs that have bound to this egg's port
706      * (see {@link #bindToPort(Port) bindToPort}).
707      *
708      * @param message the message to send.
709      */

710     protected final void send(final Object JavaDoc msg)
711     {
712         ((PortImpl)getPort()).broadcast(msg);
713     }
714
715     /**
716      * Stop this egg from execution. Messages will no longer
717      * be delivered to this egg.
718      * <p>
719      * Warning: this is an irreversible step. The egg can not be
720      * restarted after this method is called.
721      */

722     final void stop()
723     {
724         _stopped = true;
725         synchronized (_dispatcher)
726         {
727             _mqueue.clear();
728         }
729         _dispatcher.remove(this);
730     }
731 } // END OF CLASS Egg
732
Popular Tags