1 7 package org.jboss.remoting.callback; 8 9 import java.io.IOException ; 10 import java.util.ArrayList ; 11 import java.util.HashMap ; 12 import java.util.List ; 13 import java.util.Map ; 14 import javax.management.MBeanServer ; 15 import javax.management.MBeanServerInvocationHandler ; 16 import javax.management.ObjectName ; 17 import org.jboss.logging.Logger; 18 import org.jboss.remoting.Client; 19 import org.jboss.remoting.InvocationRequest; 20 import org.jboss.remoting.InvokerLocator; 21 import org.jboss.remoting.SerializableStore; 22 import org.jboss.remoting.ServerInvoker; 23 import org.jboss.remoting.invocation.InternalInvocation; 24 25 26 36 public class ServerInvokerCallbackHandler implements InvokerCallbackHandler 37 { 38 private InvocationRequest invocation; 39 private Client callBackClient; 40 private ArrayList callbacks = new ArrayList (); 41 private String sessionId; 42 private String clientSessionId; 43 private InvokerLocator serverLocator; 44 45 private SerializableStore callbackStore = null; 46 47 52 public static final String CALLBACK_STORE_KEY = "callbackStore"; 53 54 58 public static final String CALLBACK_MEM_CEILING = "callbackMemCeiling"; 59 60 65 private double memPercentCeiling = 20; 67 private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class); 68 69 70 public ServerInvokerCallbackHandler(InvocationRequest invocation, InvokerLocator serverLocator, ServerInvoker owner) throws Exception 71 { 72 if(invocation == null) 73 { 74 throw new Exception ("Can not construct ServerInvokerCallbackHandler with null InvocationRequest."); 75 } 76 this.invocation = invocation; 77 this.serverLocator = serverLocator; 78 init(invocation, owner); 79 } 80 81 private void init(InvocationRequest invocation, ServerInvoker owner) throws Exception 82 { 83 clientSessionId = invocation.getSessionId(); 84 sessionId = getId(invocation); 85 if(invocation.getLocator() != null) 86 { 87 callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem()); 88 } 89 else 90 { 91 createCallbackStore(owner, sessionId); 92 } 93 94 log.debug("Session id for callback handler is " + sessionId); 95 } 96 97 public String getCallbackSessionId() 98 { 99 return sessionId; 100 } 101 102 public String getClientSessionId() 103 { 104 return clientSessionId; 105 } 106 107 public void setMemPercentCeiling(Double ceiling) 108 { 109 if(ceiling != null) 110 { 111 memPercentCeiling = ceiling.doubleValue(); 112 } 113 } 114 115 public Double getMemPercentCeiling() 116 { 117 return new Double (memPercentCeiling); 118 } 119 120 private void createCallbackStore(ServerInvoker owner, String sessionId) throws Exception 121 { 122 Map config = owner.getConfiguration(); 123 if(config != null) 124 { 125 String storeName = (String ) config.get(CALLBACK_STORE_KEY); 127 if(storeName != null) 128 { 129 try 131 { 132 MBeanServer server = owner.getMBeanServer(); 133 ObjectName storeObjectName = new ObjectName (storeName); 134 if(server != null) 135 { 136 callbackStore = (SerializableStore) 137 MBeanServerInvocationHandler.newProxyInstance(server, 138 storeObjectName, 139 SerializableStore.class, 140 false); 141 } 142 } 143 catch(Exception ex) 144 { 145 log.debug("Could not create callback store from the configration value given (" + storeName + ") as an MBean."); 146 if(log.isTraceEnabled()) 147 { 148 log.trace("Error is: " + ex.getMessage(), ex); 149 } 150 callbackStore = null; 151 } 152 153 if(callbackStore == null) 155 { 156 try 157 { 158 Class storeClass = Class.forName(storeName); 159 callbackStore = (SerializableStore) storeClass.newInstance(); 160 } 161 catch(Exception e) 162 { 163 log.debug("Could not create callback store from the configuration value given (" + storeName + ") as a fully qualified class name."); 164 if(log.isTraceEnabled()) 165 { 166 log.trace("Error is: " + e.getMessage(), e); 167 } 168 } 169 } 170 } 171 } 172 173 if(callbackStore == null) 175 { 176 callbackStore = new NullCallbackStore(); 177 } 178 else 179 { 180 Map storeConfig = new HashMap (); 182 storeConfig.putAll(owner.getConfiguration()); 183 184 String newFilePath = null; 185 186 String filePath = (String ) storeConfig.get(CallbackStore.FILE_PATH_KEY); 187 if(filePath == null) 188 { 189 newFilePath = System.getProperty("jboss.server.data.dir", "data"); 190 } 191 newFilePath = newFilePath + System.getProperty("file.separator") + "remoting" + 192 System.getProperty("file.separator") + sessionId; 193 194 storeConfig.put(CallbackStore.FILE_PATH_KEY, newFilePath); 195 196 callbackStore.setConfig(storeConfig); 197 } 198 199 callbackStore.create(); 200 callbackStore.start(); 201 202 configureMemCeiling(owner.getConfiguration()); 203 } 204 205 private void configureMemCeiling(Map configuration) 206 { 207 if(configuration != null) 208 { 209 String ceiling = (String ) configuration.get(CALLBACK_MEM_CEILING); 210 if(ceiling != null) 211 { 212 try 213 { 214 double newCeiling = Double.parseDouble(ceiling); 215 setMemPercentCeiling(new Double (newCeiling)); 216 } 217 catch(NumberFormatException e) 218 { 219 log.warn("Found new store memory ceiling seting (" + ceiling + "), but can not convert to type double.", e); 220 } 221 } 222 } 223 } 224 225 public Client getCallbackClient() 226 { 227 return callBackClient; 228 } 229 230 231 239 public static String getId(InvocationRequest invocation) 240 { 241 String sessionId = invocation.getSessionId(); 242 Map metadata = invocation.getRequestPayload(); 243 if(metadata != null) 244 { 245 String listenerId = (String ) metadata.get(Client.LISTENER_ID_KEY); 246 if(listenerId != null) 247 { 248 sessionId = sessionId + "+" + listenerId; 249 } 250 } 251 return sessionId; 252 } 253 254 261 public String getId() 262 { 263 return getId(invocation); 264 } 265 266 public List getCallbacks() 267 { 268 List callbackList = null; 269 synchronized(callbacks) 270 { 271 callbackList = (List ) callbacks.clone(); 272 callbacks.clear(); 273 } 274 275 List persistedCallbacks = null; 277 try 278 { 279 persistedCallbacks = getPersistedCallbacks(); 280 } 281 catch(IOException e) 282 { 283 log.error("Can not get persisted callbacks.", e); 284 throw new RuntimeException ("Error getting callbacks", e); 285 } 286 callbackList.addAll(persistedCallbacks); 287 288 return callbackList; 289 } 290 291 private List getPersistedCallbacks() throws IOException 292 { 293 List callbacks = new ArrayList (); 294 295 int size = callbackStore.size(); 296 for(int x = 0; x < size; x++) 297 { 298 callbacks.add(callbackStore.getNext()); 299 if(isMemLow()) 302 { 303 new Thread () 304 { 305 public void run() 306 { 307 System.gc(); 308 } 309 }.start(); 310 break; 311 } 312 } 313 314 return callbacks; 315 } 316 317 public boolean isPullCallbackHandler() 318 { 319 return (callBackClient == null); 320 } 321 322 329 public void handleCallback(Callback callback) 330 throws HandleCallbackException 331 { 332 try 333 { 334 if(callBackClient == null) 335 { 336 if(shouldPersist()) 338 { 339 try 340 { 341 persistCallback(callback); 342 callback = null; 343 new Thread () 345 { 346 public void run() 347 { 348 System.gc(); 349 } 350 }.start(); 351 } 352 catch(IOException e) 353 { 354 log.error("Unable to persist callback.", e); 355 throw new HandleCallbackException("Unable to persist callback and will not be able to deliver.", e); 356 } 357 } 358 else 359 { 360 synchronized(callbacks) 361 { 362 if(log.isTraceEnabled()) 363 { 364 log.debug("pull callback. adding to callback list"); 365 } 366 callbacks.add(callback); 367 } 368 } 369 } 370 else 371 { 372 try 373 { 374 if(log.isTraceEnabled()) 375 { 376 log.debug("push callback. Calling client now."); 377 } 378 if(callback != null) 379 { 380 Map returnPayload = callback.getReturnPayload(); 381 if(returnPayload == null) 382 { 383 returnPayload = new HashMap (); 384 } 385 returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator); 386 callback.setReturnPayload(returnPayload); 387 } 388 InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK, 391 new Object []{callback}); 392 callBackClient.setSessionId(sessionId); 393 callBackClient.invoke(internalInvocation, 394 callback.getRequestPayload()); 395 } 396 catch(HandleCallbackException hcex) 397 { 398 log.debug("Caught HandleCallbackException from calling the client's handleCallback() method.", hcex); 399 throw hcex; 400 } 401 catch(Throwable ex) 402 { 403 log.debug("Error dispatching callback to handler.", ex); 404 throw new HandleCallbackException("Error dispatching callback to handler.", ex); 405 } 406 } 407 } 408 catch(Throwable thr) 409 { 410 log.error("Error handling callback.", thr); 411 throw new HandleCallbackException("Error handling callback.", thr); 412 } 413 } 414 415 private void persistCallback(InvocationRequest callback) throws IOException 416 { 417 callbackStore.add(callback); 418 } 419 420 430 private boolean shouldPersist() 431 { 432 return isMemLow(); 433 } 434 435 private boolean isMemLow() 436 { 437 Runtime runtime = Runtime.getRuntime(); 438 long max = runtime.maxMemory(); 439 long total = runtime.totalMemory(); 440 long free = runtime.freeMemory(); 441 float percentage = 100 * free / total; 442 if(max == total && memPercentCeiling >= percentage) 443 { 444 return true; 445 } 446 else 447 { 448 return false; 449 } 450 } 451 452 457 public String toString() 458 { 459 return getClass().getName() + " - id: " + getId(); 460 } 461 462 467 public void destroy() 468 { 469 if(callBackClient != null) 470 { 471 callBackClient.disconnect(); 472 callBackClient = null; 473 } 474 if(callbackStore != null) 475 { 476 callbackStore.purgeFiles(); 477 } 478 } 479 } 480 | Popular Tags |