1 7 package org.jboss.remoting; 8 9 import org.jboss.logging.Logger; 10 import org.jboss.remoting.invocation.InternalInvocation; 11 12 import javax.management.MBeanServer ; 13 import javax.management.MBeanServerInvocationHandler ; 14 import javax.management.ObjectName ; 15 import java.io.IOException ; 16 import java.util.ArrayList ; 17 import java.util.HashMap ; 18 import java.util.List ; 19 import java.util.Map ; 20 21 22 27 public class ServerInvokerCallbackHandler implements InvokerCallbackHandler 28 { 29 private InvocationRequest invocation; 30 private Client callBackClient; 31 private ArrayList callbacks = new ArrayList (); 32 private String sessionId; 33 private InvokerLocator serverLocator; 34 35 private SerializableStore callbackStore = null; 36 37 42 public static final String CALLBACK_STORE_KEY = "callbackStore"; 43 44 48 public static final String CALLBACK_MEM_CEILING = "callbackMemCeiling"; 49 50 55 private double memPercentCeiling = 20; 57 private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class); 58 59 60 public ServerInvokerCallbackHandler(InvocationRequest invocation, InvokerLocator serverLocator, ServerInvoker owner) throws Exception 61 { 62 if(invocation == null) 63 { 64 throw new Exception ("Can not construct ServerInvokerCallbackHandler with null InvocationRequest."); 65 } 66 this.invocation = invocation; 67 this.serverLocator = serverLocator; 68 init(invocation, owner); 69 } 70 71 private void init(InvocationRequest invocation, ServerInvoker owner) throws Exception 72 { 73 sessionId = invocation.getSessionId(); 74 if(invocation.getLocator() != null) 75 { 76 callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem()); 77 callBackClient.connect(); 78 } 79 else 80 { 81 createCallbackStore(owner, sessionId); 82 } 83 84 if(log.isDebugEnabled()) 85 { 86 log.debug("Session id for callback handler is " + sessionId); 87 } 88 } 89 90 91 public void setMemPercentCeiling(Double ceiling) 92 { 93 if(ceiling != null) 94 { 95 memPercentCeiling = ceiling.doubleValue(); 96 } 97 } 98 99 public Double getMemPercentCeiling() 100 { 101 return new Double (memPercentCeiling); 102 } 103 104 private void createCallbackStore(ServerInvoker owner, String sessionId) throws Exception 105 { 106 Map config = owner.getConfiguration(); 107 if(config != null) 108 { 109 String storeName = (String ) config.get(CALLBACK_STORE_KEY); 111 if(storeName != null) 112 { 113 try 115 { 116 MBeanServer server = owner.getMBeanServer(); 117 ObjectName storeObjectName = new ObjectName (storeName); 118 if(server != null) 119 { 120 callbackStore = (SerializableStore) 121 MBeanServerInvocationHandler.newProxyInstance(server, 122 storeObjectName, 123 SerializableStore.class, 124 false); 125 } 126 } 127 catch(Exception ex) 128 { 129 log.debug("Could not create callback store from the configration value given (" + storeName + ") as an MBean."); 130 if(log.isTraceEnabled()) 131 { 132 log.trace("Error is: " + ex.getMessage(), ex); 133 } 134 callbackStore = null; 135 } 136 137 if(callbackStore == null) 139 { 140 try 141 { 142 Class storeClass = Class.forName(storeName); 143 callbackStore = (SerializableStore) storeClass.newInstance(); 144 } 145 catch(Exception e) 146 { 147 log.debug("Could not create callback store from the configuration value given (" + storeName + ") as a fully qualified class name."); 148 if(log.isTraceEnabled()) 149 { 150 log.trace("Error is: " + e.getMessage(), e); 151 } 152 } 153 } 154 } 155 } 156 157 if(callbackStore == null) 159 { 160 callbackStore = new NullCallbackStore(); 161 } 162 else 163 { 164 Map storeConfig = new HashMap (); 166 storeConfig.putAll(owner.getConfiguration()); 167 168 String newFilePath = null; 169 170 String filePath = (String ) storeConfig.get(CallbackStore.FILE_PATH_KEY); 171 if(filePath == null) 172 { 173 newFilePath = System.getProperty("jboss.server.data.dir", "data"); 174 } 175 newFilePath = newFilePath + System.getProperty("file.separator") + "remoting" + 176 System.getProperty("file.separator") + sessionId; 177 178 storeConfig.put(CallbackStore.FILE_PATH_KEY, newFilePath); 179 180 callbackStore.setConfig(storeConfig); 181 } 182 183 callbackStore.create(); 184 callbackStore.start(); 185 186 configureMemCeiling(owner.getConfiguration()); 187 } 188 189 private void configureMemCeiling(Map configuration) 190 { 191 if(configuration != null) 192 { 193 String ceiling = (String ) configuration.get(CALLBACK_MEM_CEILING); 194 if(ceiling != null) 195 { 196 try 197 { 198 double newCeiling = Double.parseDouble(ceiling); 199 setMemPercentCeiling(new Double (newCeiling)); 200 } 201 catch(NumberFormatException e) 202 { 203 log.warn("Found new store memory ceiling seting (" + ceiling + "), but can not convert to type double.", e); 204 } 205 } 206 } 207 } 208 209 public Client getCallbackClient() 210 { 211 return callBackClient; 212 } 213 214 215 223 public static String getId(InvocationRequest invocation) 224 { 225 String sessionId = invocation.getSessionId(); 226 return sessionId; 227 } 228 229 236 public String getId() 237 { 238 return getId(invocation); 239 } 240 241 public List getCallbacks() 242 { 243 List callbackList = null; 244 synchronized(callbacks) 245 { 246 callbackList = (List ) callbacks.clone(); 247 callbacks.clear(); 248 } 249 250 List persistedCallbacks = null; 252 try 253 { 254 persistedCallbacks = getPersistedCallbacks(); 255 } 256 catch(IOException e) 257 { 258 log.error("Can not get persisted callbacks.", e); 259 throw new RuntimeException ("Error getting callbacks", e); 260 } 261 callbackList.addAll(persistedCallbacks); 262 263 return callbackList; 264 } 265 266 private List getPersistedCallbacks() throws IOException 267 { 268 List callbacks = new ArrayList (); 269 270 int size = callbackStore.size(); 271 for(int x = 0; x < size; x++) 272 { 273 callbacks.add(callbackStore.getNext()); 274 if(isMemLow()) 277 { 278 new Thread () 279 { 280 public void run() 281 { 282 System.gc(); 283 } 284 }.start(); 285 break; 286 } 287 } 288 289 return callbacks; 290 } 291 292 public boolean isPullCallbackHandler() 293 { 294 return (callBackClient == null); 295 } 296 297 304 public void handleCallback(InvocationRequest callback) 305 throws HandleCallbackException 306 { 307 try 308 { 309 if(callBackClient == null) 310 { 311 if(shouldPersist()) 313 { 314 try 315 { 316 persistCallback(callback); 317 callback = null; 318 new Thread () 320 { 321 public void run() 322 { 323 System.gc(); 324 } 325 }.start(); 326 } 327 catch(IOException e) 328 { 329 log.error("Unable to persist callback.", e); 330 throw new HandleCallbackException("Unable to persist callback and will not be able to deliver.", e); 331 } 332 } 333 else 334 { 335 synchronized(callbacks) 336 { 337 if(log.isDebugEnabled()) 338 { 339 log.debug("pull callback. adding to callback list"); 340 } 341 callbacks.add(callback); 342 } 343 } 344 } 345 else 346 { 347 try 348 { 349 if(!callBackClient.isConnected()) 351 { 352 callBackClient.connect(); 353 } 354 if(callBackClient.isConnected()) 355 { 356 if(log.isDebugEnabled()) 357 { 358 log.debug("push callback. Calling client now."); 359 } 360 if(callback != null) 361 { 362 Map returnPayload = callback.getReturnPayload(); 363 if(returnPayload == null) 364 { 365 returnPayload = new HashMap (); 366 } 367 returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator); 368 callback.setReturnPayload(returnPayload); 369 } 370 InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK, 373 new Object []{callback}); 374 callBackClient.setSessionId(sessionId); 375 callBackClient.invoke(internalInvocation, 376 callback.getRequestPayload()); 377 } 378 else 379 { 380 log.error("Can not handle callback since can not connect to client invoker."); 381 throw new HandleCallbackException("Can not handle callback since can not connect to client invoker."); 382 } 383 } 384 catch(Throwable ex) 385 { 386 log.debug("Error dispatching callback to handler.", ex); 387 throw new HandleCallbackException("Error dispatching callback to handler.", ex); 388 } 389 } 390 } 391 catch(Throwable thr) 392 { 393 log.error("Error handling callback.", thr); 394 throw new HandleCallbackException("Error handling callback.", thr); 395 } 396 } 397 398 private void persistCallback(InvocationRequest callback) throws IOException 399 { 400 callbackStore.add(callback); 401 } 402 403 413 private boolean shouldPersist() 414 { 415 return isMemLow(); 416 } 417 418 private boolean isMemLow() 419 { 420 Runtime runtime = Runtime.getRuntime(); 421 long max = runtime.maxMemory(); 422 long total = runtime.totalMemory(); 423 long free = runtime.freeMemory(); 424 float percentage = 100 * free / total; 425 if(max == total && memPercentCeiling >= percentage) 426 { 427 return true; 428 } 429 else 430 { 431 return false; 432 } 433 } 434 435 440 public String toString() 441 { 442 return getClass().getName() + " - id: " + getId(); 443 } 444 445 450 public void destroy() 451 { 452 if(callBackClient != null) 453 { 454 callBackClient.disconnect(); 455 callBackClient = null; 456 } 457 if(callbackStore != null) 458 { 459 callbackStore.purgeFiles(); 460 } 461 } 462 } 463 | Popular Tags |