KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > remoting > callback > ServerInvokerCallbackHandler


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.remoting.callback;
8
9 import java.io.IOException JavaDoc;
10 import java.util.ArrayList JavaDoc;
11 import java.util.HashMap JavaDoc;
12 import java.util.List JavaDoc;
13 import java.util.Map JavaDoc;
14 import javax.management.MBeanServer JavaDoc;
15 import javax.management.MBeanServerInvocationHandler JavaDoc;
16 import javax.management.ObjectName JavaDoc;
17 import org.jboss.logging.Logger;
18 import org.jboss.remoting.Client;
19 import org.jboss.remoting.InvocationRequest;
20 import org.jboss.remoting.InvokerLocator;
21 import org.jboss.remoting.SerializableStore;
22 import org.jboss.remoting.ServerInvoker;
23 import org.jboss.remoting.invocation.InternalInvocation;
24
25
26 /**
27  * Responsible for all callbacks in remoting at invoker level (on the server side).
28  * This is created within the ServerInvoker and passed to the server handler as a
29  * proxy for the client's callback handler.
30  * <p/>
31  * Will determin internally if is using pull or push mechanism for delivering callbacks.
32  * If is push, will create a Client to call back on the callback server.
33  *
34  * @author <a HREF="mailto:telrod@e2technologies.net">Tom Elrod</a>
35  */

36 public class ServerInvokerCallbackHandler implements InvokerCallbackHandler
37 {
38    private InvocationRequest invocation;
39    private Client callBackClient;
40    private ArrayList JavaDoc callbacks = new ArrayList JavaDoc();
41    private String JavaDoc sessionId;
42    private String JavaDoc clientSessionId;
43    private InvokerLocator serverLocator;
44
45    private SerializableStore callbackStore = null;
46
47    /**
48     * The map key to use when looking up any callback store that
49     * should be used. This key should be used when setting up
50     * config in the invoker.
51     */

52    public static final String JavaDoc CALLBACK_STORE_KEY = "callbackStore";
53
54    /**
55     * The map key to use when looking up the percentage of free memory
56     * available before tiggering persistence.
57     */

58    public static final String JavaDoc CALLBACK_MEM_CEILING = "callbackMemCeiling";
59
60    /**
61     * The percentage number of used memory before should persist messages.
62     * For example, if 64MB available and only 30MB free mem and memPercentCeiling
63     * is 50, then would trigger persisting of messages.
64     */

65    private double memPercentCeiling = 20; // 20% by default
66

67    private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
68
69
70    public ServerInvokerCallbackHandler(InvocationRequest invocation, InvokerLocator serverLocator, ServerInvoker owner) throws Exception JavaDoc
71    {
72       if(invocation == null)
73       {
74          throw new Exception JavaDoc("Can not construct ServerInvokerCallbackHandler with null InvocationRequest.");
75       }
76       this.invocation = invocation;
77       this.serverLocator = serverLocator;
78       init(invocation, owner);
79    }
80
81    private void init(InvocationRequest invocation, ServerInvoker owner) throws Exception JavaDoc
82    {
83       clientSessionId = invocation.getSessionId();
84       sessionId = getId(invocation);
85       if(invocation.getLocator() != null)
86       {
87          callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem());
88       }
89       else
90       {
91          createCallbackStore(owner, sessionId);
92       }
93
94       log.debug("Session id for callback handler is " + sessionId);
95    }
96
97    public String JavaDoc getCallbackSessionId()
98    {
99       return sessionId;
100    }
101
102    public String JavaDoc getClientSessionId()
103    {
104       return clientSessionId;
105    }
106
107    public void setMemPercentCeiling(Double JavaDoc ceiling)
108    {
109       if(ceiling != null)
110       {
111          memPercentCeiling = ceiling.doubleValue();
112       }
113    }
114
115    public Double JavaDoc getMemPercentCeiling()
116    {
117       return new Double JavaDoc(memPercentCeiling);
118    }
119
120    private void createCallbackStore(ServerInvoker owner, String JavaDoc sessionId) throws Exception JavaDoc
121    {
122       Map JavaDoc config = owner.getConfiguration();
123       if(config != null)
124       {
125          // should either be a fully qualified class name or a mbean object name
126
String JavaDoc storeName = (String JavaDoc) config.get(CALLBACK_STORE_KEY);
127          if(storeName != null)
128          {
129             // will first try as a MBean
130
try
131             {
132                MBeanServer JavaDoc server = owner.getMBeanServer();
133                ObjectName JavaDoc storeObjectName = new ObjectName JavaDoc(storeName);
134                if(server != null)
135                {
136                   callbackStore = (SerializableStore)
137                         MBeanServerInvocationHandler.newProxyInstance(server,
138                                                                       storeObjectName,
139                                                                       SerializableStore.class,
140                                                                       false);
141                }
142             }
143             catch(Exception JavaDoc ex)
144             {
145                log.debug("Could not create callback store from the configration value given (" + storeName + ") as an MBean.");
146                if(log.isTraceEnabled())
147                {
148                   log.trace("Error is: " + ex.getMessage(), ex);
149                }
150                callbackStore = null;
151             }
152
153             // now try by class name
154
if(callbackStore == null)
155             {
156                try
157                {
158                   Class JavaDoc storeClass = Class.forName(storeName);
159                   callbackStore = (SerializableStore) storeClass.newInstance();
160                }
161                catch(Exception JavaDoc e)
162                {
163                   log.debug("Could not create callback store from the configuration value given (" + storeName + ") as a fully qualified class name.");
164                   if(log.isTraceEnabled())
165                   {
166                      log.trace("Error is: " + e.getMessage(), e);
167                   }
168                }
169             }
170          }
171       }
172
173       // if still null, then just use default
174
if(callbackStore == null)
175       {
176          callbackStore = new NullCallbackStore();
177       }
178       else
179       {
180          // need to modify configuration to include session id for the callback client.
181
Map JavaDoc storeConfig = new HashMap JavaDoc();
182          storeConfig.putAll(owner.getConfiguration());
183
184          String JavaDoc newFilePath = null;
185
186          String JavaDoc filePath = (String JavaDoc) storeConfig.get(CallbackStore.FILE_PATH_KEY);
187          if(filePath == null)
188          {
189             newFilePath = System.getProperty("jboss.server.data.dir", "data");
190          }
191          newFilePath = newFilePath + System.getProperty("file.separator") + "remoting" +
192                        System.getProperty("file.separator") + sessionId;
193
194          storeConfig.put(CallbackStore.FILE_PATH_KEY, newFilePath);
195
196          callbackStore.setConfig(storeConfig);
197       }
198
199       callbackStore.create();
200       callbackStore.start();
201
202       configureMemCeiling(owner.getConfiguration());
203    }
204
205    private void configureMemCeiling(Map JavaDoc configuration)
206    {
207       if(configuration != null)
208       {
209          String JavaDoc ceiling = (String JavaDoc) configuration.get(CALLBACK_MEM_CEILING);
210          if(ceiling != null)
211          {
212             try
213             {
214                double newCeiling = Double.parseDouble(ceiling);
215                setMemPercentCeiling(new Double JavaDoc(newCeiling));
216             }
217             catch(NumberFormatException JavaDoc e)
218             {
219                log.warn("Found new store memory ceiling seting (" + ceiling + "), but can not convert to type double.", e);
220             }
221          }
222       }
223    }
224
225    public Client getCallbackClient()
226    {
227       return callBackClient;
228    }
229
230
231    /**
232     * Returns an id that can be used to identify this particular
233     * callback handler, which should be representative of the
234     * client invoker it will make callbacks to. Currently, this
235     * is the session id associated with the invocation request.
236     *
237     * @return
238     */

239    public static String JavaDoc getId(InvocationRequest invocation)
240    {
241       String JavaDoc sessionId = invocation.getSessionId();
242       Map JavaDoc metadata = invocation.getRequestPayload();
243       if(metadata != null)
244       {
245          String JavaDoc listenerId = (String JavaDoc) metadata.get(Client.LISTENER_ID_KEY);
246          if(listenerId != null)
247          {
248             sessionId = sessionId + "+" + listenerId;
249          }
250       }
251       return sessionId;
252    }
253
254    /**
255     * Returns an id that can be used to identify this particular
256     * callback handler, which should be representative of the
257     * client invoker it will make callbacks to.
258     *
259     * @return
260     */

261    public String JavaDoc getId()
262    {
263       return getId(invocation);
264    }
265
266    public List JavaDoc getCallbacks()
267    {
268       List JavaDoc callbackList = null;
269       synchronized(callbacks)
270       {
271          callbackList = (List JavaDoc) callbacks.clone();
272          callbacks.clear();
273       }
274
275       // get as many persisted callbacks as possible without over run on memory
276
List JavaDoc persistedCallbacks = null;
277       try
278       {
279          persistedCallbacks = getPersistedCallbacks();
280       }
281       catch(IOException JavaDoc e)
282       {
283          log.error("Can not get persisted callbacks.", e);
284          throw new RuntimeException JavaDoc("Error getting callbacks", e);
285       }
286       callbackList.addAll(persistedCallbacks);
287
288       return callbackList;
289    }
290
291    private List JavaDoc getPersistedCallbacks() throws IOException JavaDoc
292    {
293       List JavaDoc callbacks = new ArrayList JavaDoc();
294
295       int size = callbackStore.size();
296       for(int x = 0; x < size; x++)
297       {
298          callbacks.add(callbackStore.getNext());
299          // check the amount of mem in use as get callbacks out so
300
// don't load so many callbacks from store, that run out of memory.
301
if(isMemLow())
302          {
303             new Thread JavaDoc()
304             {
305                public void run()
306                {
307                   System.gc();
308                }
309             }.start();
310             break;
311          }
312       }
313
314       return callbacks;
315    }
316
317    public boolean isPullCallbackHandler()
318    {
319       return (callBackClient == null);
320    }
321
322    /**
323     * Will take the callback message and send back to client.
324     * If client locator is null, will store them till client polls to get them.
325     *
326     * @param callback
327     * @throws HandleCallbackException
328     */

329    public void handleCallback(Callback callback)
330          throws HandleCallbackException
331    {
332       try
333       {
334          if(callBackClient == null)
335          {
336             // need to check if shoudl persist callback instead of keeping in memory
337
if(shouldPersist())
338             {
339                try
340                {
341                   persistCallback(callback);
342                   callback = null;
343                   // try to help out with the amount of memory usuage
344
new Thread JavaDoc()
345                   {
346                      public void run()
347                      {
348                         System.gc();
349                      }
350                   }.start();
351                }
352                catch(IOException JavaDoc e)
353                {
354                   log.error("Unable to persist callback.", e);
355                   throw new HandleCallbackException("Unable to persist callback and will not be able to deliver.", e);
356                }
357             }
358             else
359             {
360                synchronized(callbacks)
361                {
362                   if(log.isTraceEnabled())
363                   {
364                      log.debug("pull callback. adding to callback list");
365                   }
366                   callbacks.add(callback);
367                }
368             }
369          }
370          else
371          {
372             try
373             {
374                if(log.isTraceEnabled())
375                {
376                   log.debug("push callback. Calling client now.");
377                }
378                if(callback != null)
379                {
380                   Map JavaDoc returnPayload = callback.getReturnPayload();
381                   if(returnPayload == null)
382                   {
383                      returnPayload = new HashMap JavaDoc();
384                   }
385                   returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
386                   callback.setReturnPayload(returnPayload);
387                }
388                // sending internal invocation so server invoker we are sending to
389
// will know how pass onto it's client callback handler
390
InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
391                                                                               new Object JavaDoc[]{callback});
392                callBackClient.setSessionId(sessionId);
393                callBackClient.invoke(internalInvocation,
394                                      callback.getRequestPayload());
395             }
396             catch(HandleCallbackException hcex)
397             {
398                log.debug("Caught HandleCallbackException from calling the client's handleCallback() method.", hcex);
399                throw hcex;
400             }
401             catch(Throwable JavaDoc ex)
402             {
403                log.debug("Error dispatching callback to handler.", ex);
404                throw new HandleCallbackException("Error dispatching callback to handler.", ex);
405             }
406          }
407       }
408       catch(Throwable JavaDoc thr)
409       {
410          log.error("Error handling callback.", thr);
411          throw new HandleCallbackException("Error handling callback.", thr);
412       }
413    }
414
415    private void persistCallback(InvocationRequest callback) throws IOException JavaDoc
416    {
417       callbackStore.add(callback);
418    }
419
420    /**
421     * Calculates the percentage amount of free memory compared to max memory. The calculations for this
422     * is not always acurate. The reason is that total memory used is usually less than the max allowed. Thus,
423     * the amount of free memory is relative to the total amount allocated at that point in time. It is not
424     * until the total amount of memory allocated is equal to the max it will be allowed to allocate. At this point,
425     * the amount of free memory becomes relavent. Therefore, if the memory percentage ceiling is high, it might
426     * not trigger until after free memory percentage is well below the ceiling.
427     *
428     * @return
429     */

430    private boolean shouldPersist()
431    {
432       return isMemLow();
433    }
434
435    private boolean isMemLow()
436    {
437       Runtime JavaDoc runtime = Runtime.getRuntime();
438       long max = runtime.maxMemory();
439       long total = runtime.totalMemory();
440       long free = runtime.freeMemory();
441       float percentage = 100 * free / total;
442       if(max == total && memPercentCeiling >= percentage)
443       {
444          return true;
445       }
446       else
447       {
448          return false;
449       }
450    }
451
452    /**
453     * Returns the id for this handler
454     *
455     * @return
456     */

457    public String JavaDoc toString()
458    {
459       return getClass().getName() + " - id: " + getId();
460    }
461
462    /**
463     * This method is required to be called upon removing a callback listener
464     * so can clean up resources used by the handler. In particular, should
465     * call disconnect on internal Client.
466     */

467    public void destroy()
468    {
469       if(callBackClient != null)
470       {
471          callBackClient.disconnect();
472          callBackClient = null;
473       }
474       if(callbackStore != null)
475       {
476          callbackStore.purgeFiles();
477       }
478    }
479 }
480
Popular Tags