1 22 package org.jboss.cache.invalidation.bridges; 23 24 import java.io.Serializable ; 25 import java.util.ArrayList ; 26 import java.util.Vector ; 27 import java.util.Collection ; 28 import javax.management.MBeanServerInvocationHandler ; 29 import javax.management.ObjectName ; 30 31 import org.jboss.cache.invalidation.BatchInvalidation; 32 import org.jboss.cache.invalidation.InvalidationManager; 33 import org.jboss.cache.invalidation.InvalidationGroup; 34 import org.jboss.cache.invalidation.InvalidationManagerMBean; 35 import org.jboss.cache.invalidation.BridgeInvalidationSubscription; 36 import org.jboss.cache.invalidation.InvalidationBridgeListener; 37 import org.jboss.ha.framework.interfaces.HAPartition; 38 import org.jboss.ha.framework.interfaces.DistributedState; 39 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 40 import org.jboss.ha.framework.server.ClusterPartitionMBean; 41 import org.jboss.system.server.ServerConfigUtil; 42 43 58 59 public class JGCacheInvalidationBridge 60 extends org.jboss.system.ServiceMBeanSupport 61 implements JGCacheInvalidationBridgeMBean, 62 DistributedState.DSListenerEx, 63 InvalidationBridgeListener, 64 DistributedReplicantManager.ReplicantListener 65 { 66 67 69 71 protected String partitionName = ServerConfigUtil.getDefaultPartitionName(); 72 75 protected ClusterPartitionMBean clusterPartition; 76 protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME; 77 protected String bridgeName = "DefaultJGCacheIB"; 78 79 protected HAPartition partition = null; 80 protected DistributedState ds = null; 81 protected DistributedReplicantManager drm = null; 82 protected String RPC_HANLE_NAME = null; 83 protected String nodeName = null; 84 85 protected InvalidationManagerMBean invalMgr = null; 86 protected BridgeInvalidationSubscription invalidationSubscription = null; 87 protected Collection localGroups = null; 88 protected Vector bridgedGroups = new Vector (); 89 90 91 protected final Class [] rpc_invalidate_types=new Class []{String .class, Serializable .class}; 92 protected final Class [] rpc_invalidates_types=new Class []{String .class, Serializable [].class}; 93 protected final Class [] rpc_invalidate_all_types=new Class []{String .class}; 94 protected final Class [] rpc_batch_invalidate_types=new Class []{BatchInvalidation[].class}; 95 96 97 99 101 public JGCacheInvalidationBridge () 102 { 103 } 104 105 107 109 public String getInvalidationManager () 110 { 111 return this.invalidationManagerName; 112 } 113 114 public ClusterPartitionMBean getClusterPartition() 115 { 116 return clusterPartition; 117 } 118 119 public void setClusterPartition(ClusterPartitionMBean clusterPartition) 120 { 121 this.clusterPartition = clusterPartition; 122 } 123 124 public String getPartitionName () 125 { 126 return this.partitionName; 127 } 128 129 public void setInvalidationManager (String objectName) 130 { 131 this.invalidationManagerName = objectName; 132 } 133 134 public void setPartitionName (String partitionName) 135 { 136 this.partitionName = partitionName; 137 } 138 139 public String getBridgeName () 140 { 141 return this.bridgeName; 142 } 143 144 public void setBridgeName (String name) 145 { 146 this.bridgeName = name; 147 } 148 149 151 158 public synchronized void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId) 159 { 160 if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME)) 161 { 162 log.debug ("The list of replicant for the JG bridge has changed, computing and updating local info..."); 163 164 java.util.Collection coll = this.ds.getAllKeys (this.RPC_HANLE_NAME); 167 if (coll == null) 168 { 169 log.debug ("... No bridge info was associated to this node"); 170 return; 171 } 172 173 ArrayList collCopy = new java.util.ArrayList (coll); 176 java.util.List newReplicantsNodeNames = this.drm.lookupReplicantsNodeNames (this.RPC_HANLE_NAME); 177 178 179 for (int i = 0; i < collCopy.size(); i++) 180 { 181 String nodeEntry = (String )collCopy.get(i); 182 if (!newReplicantsNodeNames.contains (nodeEntry)) 183 { 184 try 187 { 188 log.debug ("removing bridge information associated to this node from the DS"); 189 this.ds.remove (this.RPC_HANLE_NAME, nodeEntry, true); 190 } 191 catch (Exception e) 192 { 193 log.info ("Unable to remove a node entry from the distributed cache", e); 194 } 195 } 196 } 197 } 198 } 199 200 202 public void valueHasChanged (String category, Serializable key, Serializable value, boolean locallyModified) 203 { 204 this.updatedBridgedInvalidationGroupsInfo (); 205 } 206 207 public void keyHasBeenRemoved (String category, Serializable key, Serializable previousContent, boolean locallyModified) 208 { 209 this.updatedBridgedInvalidationGroupsInfo (); 210 } 211 212 214 public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous) 215 { 216 if (invalidations == null) return; 217 218 ArrayList acceptedGroups = new ArrayList (); 221 222 for (int i=0; i<invalidations.length; i++) 223 { 224 BatchInvalidation currBI = invalidations[i]; 225 if (groupExistsRemotely (currBI.getInvalidationGroupName ())) 226 acceptedGroups.add (currBI); 227 } 228 229 if (acceptedGroups.size () > 0) 230 { 231 BatchInvalidation[] result = new BatchInvalidation[acceptedGroups.size ()]; 232 result = (BatchInvalidation[])acceptedGroups.toArray (result); 233 234 if (log.isTraceEnabled ()) 235 log.trace ("Transmitting batch invalidation: " + result); 236 this._do_rpc_batchInvalidate (result, asynchronous); 237 } 238 } 239 240 public void invalidate (String invalidationGroupName, Serializable [] keys, boolean asynchronous) 241 { 242 if (log.isTraceEnabled ()) 245 log.trace ("Transmitting invalidations for group: " + invalidationGroupName); 246 247 if (groupExistsRemotely (invalidationGroupName)) 248 _do_rpc_invalidates (invalidationGroupName, keys, asynchronous); 249 } 250 251 public void invalidate (String invalidationGroupName, Serializable key, boolean asynchronous) 252 { 253 if (log.isTraceEnabled ()) 256 log.trace ("Transmitting invalidation for group: " + invalidationGroupName); 257 258 if (groupExistsRemotely (invalidationGroupName)) 259 _do_rpc_invalidate (invalidationGroupName, key, asynchronous); 260 } 261 262 public void invalidateAll(String groupName, boolean async) 263 { 264 if (log.isTraceEnabled ()) 265 log.trace ("Transmitting for all entries for invalidation for group: " + groupName); 266 if (groupExistsRemotely (groupName)) 267 _do_rpc_invalidate_all (groupName, async); 268 } 269 270 public void newGroupCreated (String groupInvalidationName) 271 { 272 try 273 { 274 this.publishLocalInvalidationGroups (); 275 } 277 catch (Exception e) 278 { 279 log.info ("Problem while registering a new invalidation group over the cluster", e); 280 } 281 } 282 283 public void groupIsDropped (String groupInvalidationName) 284 { 285 try 286 { 287 this.publishLocalInvalidationGroups (); 288 } 290 catch (Exception e) 291 { 292 log.info ("Problem while un-registering a new invalidation group over the cluster", e); 293 } 294 } 295 296 298 public void startService () throws Exception 299 { 300 RPC_HANLE_NAME = "DCacheBridge-" + this.bridgeName; 301 302 if (this.clusterPartition == null) 306 { 307 javax.naming.Context ctx = new javax.naming.InitialContext (); 308 this.partition = (HAPartition)ctx.lookup("/HAPartition/" + this.partitionName); 309 } 310 else 311 { 312 this.partition = this.clusterPartition.getHAPartition(); 313 this.partitionName = this.partition.getPartitionName(); 314 } 315 316 this.ds = this.partition.getDistributedStateService (); 317 this.drm = this.partition.getDistributedReplicantManager (); 318 this.nodeName = this.partition.getNodeName(); 319 320 this.drm.add (this.RPC_HANLE_NAME, ""); 321 this.drm.registerListener (this.RPC_HANLE_NAME, this); 322 this.ds.registerDSListenerEx (RPC_HANLE_NAME, this); 323 this.partition.registerRPCHandler(RPC_HANLE_NAME, this); 324 325 this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean) 328 org.jboss.system.Registry.lookup (this.invalidationManagerName); 329 if( invalMgr == null ) 330 throw new IllegalStateException ("Failed to find: "+invalidationManagerName+", check dependency"); 331 332 publishLocalInvalidationGroups (); 333 this.updatedBridgedInvalidationGroupsInfo (); 334 335 this.invalidationSubscription = invalMgr.registerBridgeListener (this); 336 337 } 338 339 public void stopService () 340 { 341 try 342 { 343 this.partition.unregisterRPCHandler (this.RPC_HANLE_NAME, this); 344 this.ds.unregisterDSListenerEx (this.RPC_HANLE_NAME, this); 345 this.drm.unregisterListener (this.RPC_HANLE_NAME, this); 346 this.drm.remove (this.RPC_HANLE_NAME); 347 348 this.invalidationSubscription.unregister (); 349 350 this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true); 351 352 this.invalMgr = null; 353 this.partition = null; 354 this.drm = null; 355 this.ds = null; 356 this.invalidationSubscription = null; 357 this.RPC_HANLE_NAME = null; 358 this.nodeName = null; 359 this.localGroups = null; 360 this.bridgedGroups = new Vector (); 361 } 362 catch (Exception e) 363 { 364 log.info ("Problem while shuting down invalidation cache bridge", e); 365 } 366 } 367 368 370 public void _rpc_invalidate (String invalidationGroupName, Serializable key) 371 { 372 if (log.isTraceEnabled ()) 373 log.trace ("Received remote invalidation for group: " + invalidationGroupName); 374 375 this.invalidationSubscription.invalidate (invalidationGroupName, key); 376 } 377 378 public void _rpc_invalidates (String invalidationGroupName, Serializable [] keys) 379 { 380 if (log.isTraceEnabled ()) 381 log.trace ("Received remote invalidations for group: " + invalidationGroupName); 382 383 this.invalidationSubscription.invalidate (invalidationGroupName, keys); 384 } 385 386 public void _rpc_invalidate_all (String invalidationGroupName) 387 { 388 if (log.isTraceEnabled ()) 389 log.trace ("Received remote invalidate_all for group: " + invalidationGroupName); 390 391 this.invalidationSubscription.invalidateAll (invalidationGroupName); 392 } 393 394 public void _rpc_batchInvalidate (BatchInvalidation[] invalidations) 395 { 396 if (log.isTraceEnabled () && invalidations != null) 397 log.trace ("Received remote batch invalidation for this number of groups: " + invalidations.length); 398 399 this.invalidationSubscription.batchInvalidate (invalidations); 400 } 401 402 protected void _do_rpc_invalidate (String invalidationGroupName, Serializable key, boolean asynch) 403 { 404 Object [] params = new Object [] {invalidationGroupName, key}; 405 try 406 { 407 if (asynch) 408 this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME, 409 "_rpc_invalidate", 410 params, rpc_invalidate_types, true); 411 else 412 this.partition.callMethodOnCluster (this.RPC_HANLE_NAME, 413 "_rpc_invalidate", 414 params, rpc_invalidate_types, true); 415 } 416 catch (Exception e) 417 { 418 log.debug ("Distributed invalidation (1) has failed for group " + 419 invalidationGroupName + " (Bridge: " + this.bridgeName + ")"); 420 } 421 } 422 423 protected void _do_rpc_invalidates (String invalidationGroupName, Serializable [] keys, boolean asynch) 424 { 425 Object [] params = new Object [] {invalidationGroupName, keys}; 426 try 427 { 428 if (asynch) 429 this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME, 430 "_rpc_invalidates", params, rpc_invalidates_types, true); 431 else 432 this.partition.callMethodOnCluster (this.RPC_HANLE_NAME, 433 "_rpc_invalidates", params, rpc_invalidates_types, true); 434 } 435 catch (Exception e) 436 { 437 log.debug ("Distributed invalidation (2) has failed for group " + 438 invalidationGroupName + " (Bridge: " + this.bridgeName + ")"); 439 } 440 } 441 442 protected void _do_rpc_invalidate_all (String invalidationGroupName, boolean asynch) 443 { 444 Object [] params = new Object [] {invalidationGroupName}; 445 try 446 { 447 if (asynch) 448 this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME, 449 "_rpc_invalidate_all", params, rpc_invalidate_all_types, true); 450 else 451 this.partition.callMethodOnCluster (this.RPC_HANLE_NAME, 452 "_rpc_invalidate_all", params, rpc_invalidate_all_types, true); 453 } 454 catch (Exception e) 455 { 456 log.debug ("Distributed invalidation (2) has failed for group " + 457 invalidationGroupName + " (Bridge: " + this.bridgeName + ")"); 458 } 459 } 460 461 protected void _do_rpc_batchInvalidate (BatchInvalidation[] invalidations, boolean asynch) 462 { 463 Object [] params = new Object [] {invalidations}; 464 try 465 { 466 if (asynch) 467 this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME, 468 "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true); 469 else 470 this.partition.callMethodOnCluster (this.RPC_HANLE_NAME, 471 "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true); 472 } 473 catch (Exception e) 474 { 475 log.debug ("Distributed invalidation (3) has failed (Bridge: " + this.bridgeName + ")"); 476 } 477 } 478 479 480 482 484 protected synchronized void publishLocalInvalidationGroups () throws Exception 485 { 486 this.localGroups = invalMgr.getInvalidationGroups (); 487 488 log.debug ("Publishing locally available invalidation groups: " + this.localGroups); 489 490 ArrayList content = new ArrayList (this.localGroups); 491 ArrayList result = new ArrayList (content.size ()); 492 493 494 for (int i = 0; i < content.size(); i++) 495 { 496 String aGroup = ((InvalidationGroup)content.get(i)).getGroupName (); 497 result.add (aGroup); 498 } 499 500 if (result.size () > 0) 501 { 502 NodeInfo info = new NodeInfo (result, this.nodeName); 503 this.ds.set (this.RPC_HANLE_NAME, this.nodeName, info, true); 504 } 505 else 506 this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true); 507 } 508 509 protected void updatedBridgedInvalidationGroupsInfo () 510 { 511 Collection bridgedByNode = this.ds.getAllValues (this.RPC_HANLE_NAME); 512 513 log.debug ("Updating list of invalidation groups that are bridged..."); 514 515 if (bridgedByNode != null) 516 { 517 ArrayList copy = new ArrayList (bridgedByNode); 520 521 Vector result = new Vector (); 522 523 524 for (int i = 0; i < copy.size(); i++) 525 { 526 NodeInfo infoForNode = (NodeInfo)copy.get(i); 527 log.trace ("InfoForNode: " + infoForNode); 528 529 if (infoForNode != null && !infoForNode.groupName.equals (this.nodeName)) 530 { 531 ArrayList groupsForNode = infoForNode.groups; 532 log.trace ("Groups for node: " + groupsForNode); 533 534 535 for (int j = 0; j < groupsForNode.size(); j++) 536 { 537 String aGroup = (String )groupsForNode.get(j); 538 if (!result.contains (aGroup)) 539 { 540 log.trace ("Adding: " + aGroup); 541 result.add (aGroup); 542 } 543 } 544 545 } 546 547 } 548 this.bridgedGroups = result; 551 552 log.debug ("... computed list of bridged groups: " + result); 553 } 554 else 555 { 556 log.debug ("... nothing needs to be bridged."); 557 } 558 559 } 560 561 protected boolean groupExistsRemotely (String groupName) 562 { 563 return this.bridgedGroups.contains (groupName); 564 } 565 566 568 570 } 571 572 class NodeInfo implements java.io.Serializable 573 { 574 static final long serialVersionUID = -3215712955134929006L; 575 576 public ArrayList groups = null; 577 public String groupName = null; 578 579 public NodeInfo (){} 580 581 public NodeInfo (ArrayList groups, String groupName) 582 { 583 this.groups = groups; 584 this.groupName = groupName; 585 } 586 587 } 588 | Popular Tags |