KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > remoting > ServerInvoker


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9 package org.jboss.remoting;
10
11 import java.io.IOException JavaDoc;
12 import java.net.InetAddress JavaDoc;
13 import java.util.HashMap JavaDoc;
14 import java.util.Iterator JavaDoc;
15 import java.util.Map JavaDoc;
16 import javax.management.MBeanServer JavaDoc;
17 import javax.management.MBeanServerInvocationHandler JavaDoc;
18 import javax.management.MalformedObjectNameException JavaDoc;
19 import javax.management.ObjectName JavaDoc;
20 import org.jboss.remoting.callback.Callback;
21 import org.jboss.remoting.callback.InvokerCallbackHandler;
22 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
23 import org.jboss.remoting.invocation.InternalInvocation;
24 import org.jboss.remoting.invocation.OnewayInvocation;
25 import org.jboss.remoting.loading.ClassBytes;
26 import org.jboss.remoting.stream.StreamHandler;
27 import org.jboss.remoting.stream.StreamInvocationHandler;
28 import org.jboss.remoting.transport.PortUtil;
29 import org.jboss.util.threadpool.BasicThreadPool;
30 import org.jboss.util.threadpool.BlockingMode;
31 import org.jboss.util.threadpool.ThreadPool;
32 import org.jboss.util.threadpool.ThreadPoolMBean;
33
34 /**
35  * ServerInvoker is the server-side part of a remote Invoker. The ServerInvoker implementation is
36  * responsible for calling transport, depending on how the protocol receives the incoming data.
37  *
38  * @author <a HREF="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
39  * @author <a HREF="mailto:tom.elrod@jboss.com">Tom Elrod</a>
40  * @version $Revision: 1.15 $
41  */

42 public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
43 {
44    public static final String JavaDoc MAX_NUM_ONEWAY_THREADS_KEY = "maxNumThreadsOneway";
45    public static final String JavaDoc ONEWAY_THREAD_POOL_CLASS_KEY = "onewayThreadPool";
46    public static final String JavaDoc SERVER_BIND_ADDRESS_KEY = "serverBindAddress";
47    public static final String JavaDoc CLIENT_CONNECT_ADDRESS_KEY = "clientConnectAddress";
48    public static final String JavaDoc SERVER_BIND_PORT_KEY = "serverBindPort";
49    public static final String JavaDoc CLIENT_CONNECT_PORT_KEY = "clientConnectPort";
50
51    /**
52     * The max number of worker threads to be used in the
53     * pool for processing one way calls on the server side.
54     * Value is is 100.
55     */

56    public static final int MAX_NUM_ONEWAY_THREADS = 100;
57
58    /**
59     * Indicated the max number of threads used within oneway thread pool.
60     */

61    private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS;
62    private String JavaDoc onewayThreadPoolClass = null;
63    private ThreadPool onewayThreadPool;
64
65    protected Map JavaDoc handlers = new HashMap JavaDoc();
66    protected Map JavaDoc callbackHandlers = new HashMap JavaDoc();
67    private Map JavaDoc clientCallbackListener = new HashMap JavaDoc();
68    private boolean started = false;
69    private boolean created = false;
70
71    private MBeanServer JavaDoc mbeanServer = null;
72
73    private Map JavaDoc configuration = new HashMap JavaDoc();
74
75    private String JavaDoc dataType;
76
77    private String JavaDoc serverBindAddress = null;
78    private int serverBindPort = 0;
79    private String JavaDoc clientConnectAddress = null;
80    private int clientConnectPort = -1;
81
82
83    public ServerInvoker(InvokerLocator locator)
84    {
85       super(locator);
86       Map JavaDoc params = locator.getParameters();
87       if(configuration != null && params != null)
88       {
89          configuration.putAll(locator.getParameters());
90       }
91    }
92
93    public ServerInvoker(InvokerLocator locator, Map JavaDoc configuration)
94    {
95       super(locator);
96       this.configuration = configuration;
97       Map JavaDoc locatorParams = locator.getParameters();
98       if(configuration != null && locatorParams != null)
99       {
100          configuration.putAll(locator.getParameters());
101       }
102    }
103
104    protected void setup() throws Exception JavaDoc
105    {
106       Map JavaDoc config = getConfiguration();
107       String JavaDoc maxNumOfThreads = (String JavaDoc) config.get(MAX_NUM_ONEWAY_THREADS_KEY);
108       if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
109       {
110          try
111          {
112             maxNumberThreads = Integer.parseInt(maxNumOfThreads);
113          }
114          catch(NumberFormatException JavaDoc e)
115          {
116             log.error("Can not convert max number of threads value (" + maxNumOfThreads + ") into a number.");
117          }
118       }
119       onewayThreadPoolClass = (String JavaDoc) config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
120
121       String JavaDoc locatorHost = locator.getHost();
122       InetAddress JavaDoc addr = null;
123       if(locatorHost != null)
124       {
125          addr = InetAddress.getByName(locator.getHost());
126       }
127       else
128       {
129          addr = InetAddress.getLocalHost();
130       }
131       int port = locator.getPort();
132       if(port <= 0)
133       {
134          port = PortUtil.findFreePort();
135          // re-write locator since the port is different
136
InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port, locator.getPath(), locator.getParameters());
137          // need to update the locator key used in the invoker registry
138
InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
139          this.locator = newLocator;
140       }
141
142       // set the bind address
143
serverBindAddress = (String JavaDoc) config.get(SERVER_BIND_ADDRESS_KEY);
144       clientConnectAddress = (String JavaDoc) config.get(CLIENT_CONNECT_ADDRESS_KEY);
145       if(serverBindAddress == null)
146       {
147          if(clientConnectAddress != null)
148          {
149             // can't use uri address, as is for client only
150
serverBindAddress = InetAddress.getLocalHost().getHostAddress();
151          }
152          else
153          {
154             serverBindAddress = addr.getHostAddress();
155          }
156       }
157
158       // set the bind port
159
String JavaDoc serverBindPortString = (String JavaDoc) config.get(SERVER_BIND_PORT_KEY);
160       String JavaDoc clientConnectPortString = (String JavaDoc) config.get(CLIENT_CONNECT_PORT_KEY);
161       if(clientConnectPortString != null)
162       {
163          try
164          {
165             clientConnectPort = Integer.parseInt(clientConnectPortString);
166          }
167          catch(NumberFormatException JavaDoc e)
168          {
169             throw new InvalidConfigurationException("Can not set client bind port because can not convert given value (" + clientConnectPortString + ") to a number.");
170          }
171       }
172       if(serverBindPortString != null)
173       {
174          try
175          {
176             serverBindPort = Integer.parseInt(serverBindPortString);
177             if(serverBindPort <= 0)
178             {
179                serverBindPort = PortUtil.findFreePort();
180                // re-write locator since the port is different
181
InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), serverBindPort, locator.getPath(), locator.getParameters());
182                // need to update the locator key used in the invoker registry
183
InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
184                this.locator = newLocator;
185             }
186
187          }
188          catch(NumberFormatException JavaDoc e)
189          {
190             throw new InvalidConfigurationException("Can not set server bind port because can not convert given value (" + serverBindPortString + ") to a number.");
191          }
192       }
193       else
194       {
195          if(clientConnectPort > 0)
196          {
197             // can't use uri port, as is for client only
198
serverBindPort = PortUtil.findFreePort();
199          }
200          else
201          {
202             serverBindPort = port;
203          }
204       }
205    }
206
207    /**
208     * @jmx:managed-attribute
209     */

210    public String JavaDoc getClientConnectAddress()
211    {
212       return clientConnectAddress;
213    }
214
215    public int getClientConnectPort()
216    {
217       return clientConnectPort;
218    }
219
220    public void setClientConnectPort(int clientConnectPort)
221    {
222       this.clientConnectPort = clientConnectPort;
223    }
224
225    /**
226     * This method should only be called by the service controller when this invoker is
227     * specified within the Connector configuration of a service xml. Calling this directly
228     * will have no effect, as will be used in building the locator uri that is published
229     * for detection and this happens when the invoker is first created and started (after that, no one
230     * will be aware of a change).
231     *
232     * @jmx:managed-attribute
233     */

234    public void setClientConnectAddress(String JavaDoc clientConnectAddress)
235    {
236       this.clientConnectAddress = clientConnectAddress;
237    }
238
239
240    public String JavaDoc getServerBindAddress()
241    {
242       return serverBindAddress;
243    }
244
245    public int getServerBindPort()
246    {
247       return serverBindPort;
248    }
249
250
251    public void setMaxNumberOfOnewayThreads(int numOfThreads)
252    {
253       this.maxNumberThreads = numOfThreads;
254    }
255
256    public int getMaxNumberOfOnewayThreads()
257    {
258       return this.maxNumberThreads;
259    }
260
261    public ThreadPool getOnewayThreadPool()
262    {
263       if(onewayThreadPool == null)
264       {
265          // if no thread pool class set, then use default BasicThreadPool
266
if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
267          {
268             BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
269             pool.setMaximumPoolSize(maxNumberThreads);
270             pool.setBlockingMode(BlockingMode.WAIT);
271             onewayThreadPool = pool;
272          }
273          else
274          {
275             //first check to see if this is an ObjectName
276
boolean isObjName = false;
277             try
278             {
279                ObjectName JavaDoc objName = new ObjectName JavaDoc(onewayThreadPoolClass);
280                onewayThreadPool = createThreadPoolProxy(objName);
281                isObjName = true;
282             }
283             catch(MalformedObjectNameException JavaDoc e)
284             {
285                log.debug("Thread pool class supplied is not an object name.");
286             }
287
288             if(!isObjName)
289             {
290                try
291                {
292                   onewayThreadPool = (ThreadPool) getClassLoader().loadClass(onewayThreadPoolClass).newInstance();
293                }
294                catch(Exception JavaDoc e)
295                {
296                   throw new RuntimeException JavaDoc("Error loading instance of ThreadPool based on class name: " + onewayThreadPoolClass);
297                }
298             }
299
300          }
301
302       }
303       return onewayThreadPool;
304    }
305
306    public void setOnewayThreadPool(ThreadPool pool)
307    {
308       this.onewayThreadPool = pool;
309    }
310
311    private ThreadPool createThreadPoolProxy(ObjectName JavaDoc objName)
312    {
313       ThreadPool pool;
314       MBeanServer JavaDoc server = getMBeanServer();
315       if(server != null)
316       {
317          ThreadPoolMBean poolMBean = (ThreadPoolMBean)
318                MBeanServerInvocationHandler.newProxyInstance(server,
319                                                              objName,
320                                                              ThreadPoolMBean.class,
321                                                              false);
322          pool = poolMBean.getInstance();
323       }
324       else
325       {
326          throw new RuntimeException JavaDoc("Can not register MBean ThreadPool as the ServerInvoker has not been registered with a MBeanServer.");
327       }
328       return pool;
329    }
330
331
332    public MBeanServer JavaDoc getMBeanServer()
333    {
334       return mbeanServer;
335    }
336
337    public void setMBeanServer(MBeanServer JavaDoc server)
338    {
339       /**
340        * This has been added in order to support mbean service configuration.
341        * Now supporting classes, such as the ServerInvokerCallbackHandler can find and use
342        * resources such as CallbackStore, which can be run as a service mbean (and specified
343        * via object name within config). The use of JMX throughout remoting is a problem as
344        * now have to tie it in all throughout the code for service configuration as is being
345        * done here. When migrate to use under new server model, which does not depend on JMX,
346        * can rip out code such as this.
347        */

348       this.mbeanServer = server;
349    }
350
351    /**
352     * return true if a server invocation handler has been registered for this subsystem
353     *
354     * @param subsystem
355     * @return
356     */

357    public synchronized boolean hasInvocationHandler(String JavaDoc subsystem)
358    {
359       return handlers.containsKey(subsystem);
360    }
361
362    /**
363     * return array of keys for each subsystem this invoker can handle
364     *
365     * @return
366     */

367    public synchronized String JavaDoc[] getSupportedSubsystems()
368    {
369       String JavaDoc subsystems [] = new String JavaDoc[handlers.size()];
370       return (String JavaDoc[]) handlers.keySet().toArray(subsystems);
371    }
372
373    /**
374     * return an array of the server invocation handlers
375     *
376     * @return
377     */

378    public synchronized ServerInvocationHandler[] getInvocationHandlers()
379    {
380       ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
381       return (ServerInvocationHandler[]) handlers.values().toArray(ih);
382    }
383
384    /**
385     * add a server invocation handler for a particular subsystem. Typically, subsystems are defined
386     * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
387     *
388     * @param subsystem
389     * @param handler
390     * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or null
391     * if a previous one did not exist.
392     */

393    public synchronized ServerInvocationHandler addInvocationHandler(String JavaDoc subsystem, ServerInvocationHandler handler)
394    {
395       handler.setInvoker(this);
396       return (ServerInvocationHandler) handlers.put(subsystem.toUpperCase(), handler);
397    }
398
399    /**
400     * remove a subsystem invocation handler
401     *
402     * @param subsystem
403     */

404    public synchronized ServerInvocationHandler removeInvocationHandler(String JavaDoc subsystem)
405    {
406       return (ServerInvocationHandler) handlers.remove(subsystem.toUpperCase());
407    }
408
409    /**
410     * get a ServerInvocationHandler for a given subsystem type
411     *
412     * @param subsystem
413     * @return
414     */

415    public synchronized ServerInvocationHandler getInvocationHandler(String JavaDoc subsystem)
416    {
417       return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
418    }
419
420    public Object JavaDoc invoke(Object JavaDoc invoke) throws IOException JavaDoc
421    {
422       InvocationRequest request = null;
423       InvocationResponse response = null;
424
425       if(log.isTraceEnabled())
426       {
427          log.trace("server received invocation =>" + invoke);
428       }
429
430       if(invoke != null && invoke instanceof InvocationRequest)
431       {
432          request = (InvocationRequest) invoke;
433          try
434          {
435
436             Object JavaDoc result = invoke(request);
437
438             response = new InvocationResponse(request.getSessionId(),
439                                               result, false, request.getReturnPayload());
440
441          }
442          catch(Throwable JavaDoc throwable)
443          {
444             if(log.isTraceEnabled())
445             {
446                throwable.printStackTrace();
447             }
448             response = new InvocationResponse(request.getSessionId(),
449                                               throwable, true, request.getReturnPayload());
450          }
451       }
452       else
453       {
454          log.error("server invoker received " + invoke + " as invocation. Must not be null and must be of type InvocationRequest.");
455          response = new InvocationResponse(request.getSessionId(),
456                                            new Exception JavaDoc("Error processing invocation request on " + getLocator() + ". Either invocation was null or of wrong type."),
457                                            true, request.getReturnPayload());
458       }
459       return response;
460
461    }
462
463    /**
464     * Will get the data type for the marshaller factory so know which marshaller to
465     * get to marshal the data. Will first check the locator uri for a 'datatype'
466     * parameter and take that value if it exists. Otherwise, will use the
467     * default datatype for the client invoker, based on transport.
468     *
469     * @return
470     */

471    public String JavaDoc getDataType()
472    {
473       if(dataType == null)
474       {
475          dataType = getDataType(getLocator());
476          if(dataType == null)
477          {
478             dataType = getDefaultDataType();
479          }
480       }
481       return dataType;
482    }
483
484    //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
485
private String JavaDoc getDataType(InvokerLocator locator)
486    {
487       String JavaDoc type = null;
488
489       if(locator != null)
490       {
491          Map JavaDoc params = locator.getParameters();
492          if(params != null)
493          {
494             type = (String JavaDoc) params.get(InvokerLocator.DATATYPE);
495          }
496       }
497       return type;
498    }
499
500    protected abstract String JavaDoc getDefaultDataType();
501
502    /**
503     * Processes invocation request depending on the invocation type (internal, name based, oneway, etc).
504     * Can be called on directly when client and server are local to one another (by-passing serialization)
505     *
506     * @param invocation
507     * @return
508     * @throws Throwable
509     */

510    public Object JavaDoc invoke(InvocationRequest invocation) throws Throwable JavaDoc
511    {
512       Object JavaDoc param = invocation.getParameter();
513       Object JavaDoc result = null;
514
515       // first check to see if this is a is alive ping
516
if("$PING$".equals(param))
517       {
518          // if this is an invocation ping, just pong back
519
return new InvocationResponse(invocation.getSessionId(), Boolean.TRUE, false, null);
520       }
521
522       //TODO: -TME both oneway and internal invocation will be broken since have not
523
// deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
524
if(param instanceof OnewayInvocation)
525       {
526          handleOnewayInvocation((OnewayInvocation) param, invocation);
527       }
528       else // no point in delaying return to client if oneway
529
{
530          String JavaDoc subsystem = invocation.getSubsystem();
531          String JavaDoc clientId = invocation.getSessionId();
532
533          // too bad we can't optimize this a little better, since we take a lookup hit for
534
// each invocation -JGH
535
ServerInvocationHandler handler = null;
536          if(subsystem != null)
537          {
538             handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
539          }
540          else
541          {
542             // subsystem not specified, so will hope for a default one being set
543
if(!handlers.isEmpty())
544             {
545                handler = (ServerInvocationHandler) handlers.values().iterator().next();
546             }
547          }
548          if(param instanceof InternalInvocation)
549          {
550             result = handleInternalInvocation((InternalInvocation) param, invocation, handler);
551
552          }
553          else
554          {
555             if(log.isTraceEnabled())
556             {
557                log.trace("dispatching invocation: " + invocation + " to subsystem: " + subsystem + " from client: " + clientId);
558             }
559
560             if(handler == null)
561             {
562                throw new InvalidConfigurationException("Can not handle invocation request because there are no " +
563                                                        "ServerInvocationHandlers registered. Please add via xml configuration " +
564                                                        "or via the Connector's addInvocationHandler() method.");
565             }
566             result = handler.invoke(invocation);
567          }
568          if(log.isTraceEnabled())
569          {
570             log.trace("dispatch invocation, returning back: " + result + " from subsystem: " + subsystem +
571                       " to client: " + clientId);
572          }
573       }
574
575
576       return result;
577    }
578
579    /**
580     * Takes the real invocation from the client out of the OnewayInvocation and then executes the invoke()
581     * with the real invocation on a seperate thread.
582     *
583     * @param onewayInvocation
584     * @param invocation
585     * @throws Throwable
586     */

587    private void handleOnewayInvocation(OnewayInvocation onewayInvocation, InvocationRequest invocation) throws Throwable JavaDoc
588    {
589       Object JavaDoc[] objs = onewayInvocation.getParameters();
590       // The oneway invocation should contain the real param as it's only param in parameter array
591
Object JavaDoc realParam = objs[0];
592       invocation.setParameter(realParam);
593       final InvocationRequest newInvocation = invocation;
594
595       ThreadPool executor = getOnewayThreadPool();
596       Runnable JavaDoc onewayRun = new Runnable JavaDoc()
597       {
598          public void run()
599          {
600             try
601             {
602                invoke(newInvocation);
603             }
604             catch(Throwable JavaDoc e)
605             {
606                // throw away exception since can't get it back to original caller
607
log.error("Error executing server oneway invocation request: " + newInvocation, e);
608             }
609          }
610       };
611       executor.run(onewayRun);
612    }
613
614    /**
615     * Handles both internal and external invocations (internal meaning only
616     * to be used within remoting and external for ones that go to handlers.
617     *
618     * @param param
619     * @param invocation
620     * @param handler
621     * @return
622     * @throws Throwable
623     */

624    private Object JavaDoc handleInternalInvocation(InternalInvocation param,
625                                            InvocationRequest invocation,
626                                            ServerInvocationHandler handler)
627          throws Throwable JavaDoc
628    {
629       Object JavaDoc result = null;
630       String JavaDoc methodName = param.getMethodName();
631       if(log.isTraceEnabled())
632       {
633          log.trace("handling InternalInvocation where method name = " + methodName);
634       }
635       // check if the invocation is for callback handling
636
if(InternalInvocation.ADDLISTENER.equals(methodName))
637       {
638          if(handler == null)
639          {
640             throw new InvalidConfigurationException("Can not accept a callback listener since there are no " +
641                                                     "ServerInvocationHandlers registered. Please add via xml configuration " +
642                                                     "or via the Connector's addInvocationHandler() method.");
643
644          }
645          InvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
646          handler.addListener(callbackHandler);
647       }
648       else if(InternalInvocation.REMOVELISTENER.equals(methodName))
649       {
650          ServerInvokerCallbackHandler callbackHandler = removeCallbackHandler(invocation);
651          if(callbackHandler != null)
652          {
653             if(handler == null)
654             {
655                throw new InvalidConfigurationException("Can not remove a callback listener since there are no " +
656                                                        "ServerInvocationHandlers registered. Please add via xml configuration " +
657                                                        "or via the Connector's addInvocationHandler() method.");
658             }
659             handler.removeListener(callbackHandler);
660             if(log.isTraceEnabled())
661             {
662                log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + ".");
663             }
664
665             callbackHandler.destroy();
666          }
667          else
668          {
669             String JavaDoc sessionId = ServerInvokerCallbackHandler.getId(invocation);
670             throw new RuntimeException JavaDoc("Can not remove callback listener from target server with id of " + sessionId + " as it does not exist as a registered callback listener.");
671          }
672       }
673       else if(InternalInvocation.GETCALLBACKS.equals(methodName))
674       {
675          ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
676          if(log.isTraceEnabled())
677          {
678             log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + ".");
679          }
680
681          result = callbackHandler.getCallbacks();
682       }
683       else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
684       {
685          String JavaDoc sessionId = ServerInvokerCallbackHandler.getId(invocation);
686          Object JavaDoc[] params = param.getParameters();
687          // the only elements should be the callback handler and possibly the callback handle object
688
if(params == null || params.length < 0 || params.length > 3)
689          {
690             log.error("Recieved addClientListener InternalInvocation, but getParameters() " +
691                       "returned: " + params);
692             throw new RuntimeException JavaDoc("InvokerCallbackHandler and callback handle object (optional) must be supplied as the only " +
693                                        "parameter objects within the InternalInvocation when " +
694                                        "calling addClientListener.");
695          }
696          InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) params[0];
697          Object JavaDoc callbackHandleObject = params[1];
698          CallbackContainer callbackContainer = new CallbackContainer(callbackHandler, callbackHandleObject);
699          clientCallbackListener.put(sessionId, callbackContainer);
700
701          log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler + " with session id of " + sessionId +
702                    " and callback handle object of " + callbackHandleObject + ".");
703
704       }
705       else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
706       {
707          String JavaDoc sessionId = ServerInvokerCallbackHandler.getId(invocation);
708          log.debug("ServerInvoker (" + this + ") removing client callback handler with session id of " + sessionId + ".");
709          Object JavaDoc cbo = clientCallbackListener.remove(sessionId);
710          if(cbo == null)
711          {
712             throw new RuntimeException JavaDoc("Can not remove callback listener from callback server with id of " + sessionId + " as it does not exist as a registered callback listener.");
713          }
714
715       }
716       else if(InternalInvocation.HANDLECALLBACK.equals(methodName))
717       {
718          String JavaDoc sessionId = ServerInvokerCallbackHandler.getId(invocation);
719          if(log.isTraceEnabled())
720          {
721             log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + ".");
722          }
723          CallbackContainer callbackContainer = (CallbackContainer) clientCallbackListener.get(sessionId);
724          if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
725          {
726             Object JavaDoc[] params = param.getParameters();
727             Callback callbackRequest = (Callback) params[0];
728             Map JavaDoc callbackHandleObject = callbackRequest.getReturnPayload();
729             if(callbackHandleObject == null)
730             {
731                callbackHandleObject = new HashMap JavaDoc();
732             }
733             callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY, callbackContainer.getCallbackHandleObject());
734             callbackRequest.setReturnPayload(callbackHandleObject);
735             InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
736             callbackHandler.handleCallback(callbackRequest);
737          }
738          else
739          {
740             log.error("Could not find callback handler to call upon for handleCallback " +
741                       "where session id equals " + sessionId);
742          }
743
744
745       }
746       else if(InternalInvocation.ADDSTREAMCALLBACK.equals(methodName))
747       {
748          StreamHandler streamHandler = getStreamHandler(invocation);
749          if(handler instanceof StreamInvocationHandler)
750          {
751             InternalInvocation inv = (InternalInvocation) invocation.getParameter();
752             // second parameter should be the param payload
753
result = ((StreamInvocationHandler) handler).handleStream(streamHandler, inv.getParameters()[1]);
754          }
755          else
756          {
757             log.error("Client request is an InputStream, but the registered handlers do not " +
758                       "implement the StreamInvocationHandler interface, so could not process call.");
759             throw new RuntimeException JavaDoc("No handler registered of proper type (StreamInvocationHandler).");
760          }
761       }
762       else
763       {
764          log.error("Error processing InternalInvocation. Unable to process method " +
765                    methodName + ". Please make sure this should be an InternalInvocation.");
766          throw new RuntimeException JavaDoc("Error processing InternalInvocation. Unable to process method " +
767                                     methodName);
768       }
769       return result;
770    }
771
772    private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception JavaDoc
773    {
774       InternalInvocation inv = (InternalInvocation) invocation.getParameter();
775       String JavaDoc locator = (String JavaDoc) inv.getParameters()[0];
776       StreamHandler streamHandler = new StreamHandler(locator);
777       //StreamHandler streamHandler = StreamHandler.createStreamHandler(locator);
778
return streamHandler;
779    }
780
781    private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation) throws Exception JavaDoc
782    {
783       ServerInvokerCallbackHandler callbackHandler = null;
784       String JavaDoc id = ServerInvokerCallbackHandler.getId(invocation);
785
786       synchronized(callbackHandlers)
787       {
788          callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
789          // if does not exist, create it
790
if(callbackHandler == null)
791          {
792             callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
793             callbackHandlers.put(id, callbackHandler);
794          }
795       }
796       if(log.isTraceEnabled())
797       {
798          log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + ".");
799       }
800
801       return callbackHandler;
802    }
803
804    private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
805    {
806       String JavaDoc id = ServerInvokerCallbackHandler.getId(invocation);
807       ServerInvokerCallbackHandler callbackHandler = null;
808
809       synchronized(callbackHandlers)
810       {
811          callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
812       }
813       return callbackHandler;
814    }
815
816
817    /**
818     * called prior to an invocation
819     *
820     * @param sessionId
821     * @param payload
822     * @param locator
823     * @todo is sending in the arg appropriate???
824     */

825    protected void preProcess(String JavaDoc sessionId, ClassBytes arg, Map JavaDoc payload, InvokerLocator locator)
826    {
827    }
828
829    /**
830     * called after an invocation
831     *
832     * @param sessionId
833     * @param payload
834     * @param locator
835     * @todo is sending in the arg appropriate???
836     */

837    protected void postProcess(String JavaDoc sessionId, Object JavaDoc param, Map JavaDoc payload, InvokerLocator locator)
838    {
839    }
840
841    public void create()
842    {
843       if(!created)
844       {
845          try
846          {
847             setup();
848          }
849          catch(Exception JavaDoc e)
850          {
851             throw new RuntimeException JavaDoc("Error setting up server invoker " + this, e);
852          }
853          created = true;
854       }
855    }
856
857    /**
858     * subclasses should override to provide any specific start logic
859     *
860     * @throws IOException
861     */

862    public void start() throws IOException JavaDoc
863    {
864       started = true;
865       log.info("Invoker started for locator: " + getLocator());
866    }
867
868    /**
869     * return true if the server invoker is started, false if not
870     *
871     * @return
872     */

873    public boolean isStarted()
874    {
875       return started;
876    }
877
878    /**
879     * subclasses should override to provide any specific stop logic
880     */

881    public void stop()
882    {
883       started = false;
884    }
885
886    /**
887     * destory the invoker permanently
888     */

889    public void destroy()
890    {
891       if(classbyteloader != null)
892       {
893          classbyteloader.destroy();
894       }
895    }
896
897    /**
898     * Sets the server invoker's transport specific configuration. Will need to set before calling
899     * start() method (or at least stop() and start() again) before configurations will take affect.
900     *
901     * @param configuration
902     */

903    public void setConfigration(Map JavaDoc configuration)
904    {
905       this.configuration = configuration;
906    }
907
908    /**
909     * Gets teh server invoker's transport specific configuration.
910     *
911     * @return
912     */

913    public Map JavaDoc getConfiguration()
914    {
915       return configuration;
916    }
917
918    /**
919     * Returns the String for the object name to be used for the invoker.
920     *
921     * @return
922     */

923    public String JavaDoc getMBeanObjectName()
924    {
925       InvokerLocator locator = getLocator();
926       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc("jboss.remoting:service=invoker,transport= " + locator.getProtocol());
927       buffer.append(",host=" + locator.getHost());
928       buffer.append(",port=" + locator.getPort());
929       Map JavaDoc param = locator.getParameters();
930       if(param != null)
931       {
932          Iterator JavaDoc itr = param.keySet().iterator();
933          while(itr.hasNext())
934          {
935             buffer.append(",");
936             String JavaDoc key = (String JavaDoc) itr.next();
937             String JavaDoc value = (String JavaDoc) param.get(key);
938             buffer.append(key);
939             buffer.append("=");
940             buffer.append(value);
941          }
942       }
943       return buffer.toString();
944    }
945
946    private class CallbackContainer
947    {
948       private InvokerCallbackHandler handler;
949       private Object JavaDoc handleObject;
950
951       public CallbackContainer(InvokerCallbackHandler handler, Object JavaDoc handleObject)
952       {
953          this.handler = handler;
954          this.handleObject = handleObject;
955       }
956
957       public InvokerCallbackHandler getCallbackHandler()
958       {
959          return handler;
960       }
961
962       public Object JavaDoc getCallbackHandleObject()
963       {
964          return handleObject;
965       }
966    }
967 }
968
Popular Tags