1 22 package org.jboss.ha.jmx; 23 24 import java.io.Serializable ; 25 import java.util.Iterator ; 26 import java.util.List ; 27 import java.util.Set ; 28 29 import javax.management.AttributeChangeNotification ; 30 import javax.management.InstanceNotFoundException ; 31 import javax.management.Notification ; 32 import javax.management.ObjectInstance ; 33 import javax.management.Query ; 34 import javax.management.QueryExp ; 35 36 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 37 import org.jboss.ha.framework.interfaces.DistributedState; 38 import org.jboss.ha.framework.interfaces.HAPartition; 39 import org.jboss.ha.framework.server.ClusterPartition; 40 import org.jboss.ha.framework.server.ClusterPartitionMBean; 41 import org.jboss.mx.util.MBeanProxyExt; 42 import org.jboss.system.ServiceMBeanSupport; 43 import org.jboss.system.server.ServerConfigUtil; 44 45 64 public class HAServiceMBeanSupport 65 extends ServiceMBeanSupport 66 implements HAServiceMBean 67 { 68 70 private HAPartition partition_; 72 private ClusterPartitionMBean clusterPartition; 73 private String partitionName = ServerConfigUtil.getDefaultPartitionName(); 74 75 private DistributedReplicantManager.ReplicantListener drmListener = null; 76 77 80 private String REPLICANT_TOKEN = ""; 81 82 private boolean sendLocalLifecycleNotifications = true; 83 private boolean sendRemoteLifecycleNotifications = true; 84 85 87 public HAServiceMBeanSupport() 88 { 89 } 91 92 public ClusterPartitionMBean getClusterPartition() 93 { 94 return clusterPartition; 95 } 96 97 public void setClusterPartition(ClusterPartitionMBean clusterPartition) 98 { 99 if ((getState() != STARTED) && (getState() != STARTING)) 100 { 101 this.clusterPartition = clusterPartition; 102 } 103 } 104 105 public String getPartitionName() 106 { 107 return partitionName; 108 } 109 110 public void setPartitionName(String newPartitionName) 111 { 112 if ((getState() != STARTED) && (getState() != STARTING)) 113 { 114 partitionName = newPartitionName; 115 } 116 } 117 118 119 121 131 public void setDistributedState(String key, Serializable value) 132 throws Exception 133 { 134 DistributedState ds = getPartition().getDistributedStateService(); 135 ds.set(getServiceHAName(), key, value); 136 } 137 138 147 public Serializable getDistributedState(String key) 148 { 149 DistributedState ds = getPartition().getDistributedStateService(); 150 return ds.get(getServiceHAName(), key); 151 } 152 153 168 protected void startService() throws Exception 169 { 170 log.debug("start HAServiceMBeanSupport"); 171 172 setupPartition(); 173 174 registerRPCHandler(); 175 176 registerDRMListener(); 177 } 178 179 185 protected void stopService() throws Exception 186 { 187 log.debug("stop HAServiceMBeanSupport"); 188 189 unregisterDRMListener(); 190 191 unregisterRPCHandler(); 192 } 193 194 protected void setupPartition() throws Exception 195 { 196 if (clusterPartition == null) 198 { 199 String pName = getPartitionName(); 200 partition_ = findHAPartitionWithName(pName); 201 } 202 else 203 { 204 partition_ = clusterPartition.getHAPartition(); 205 partitionName = partition_.getPartitionName(); 206 } 207 } 208 209 protected void registerRPCHandler() 210 { 211 partition_.registerRPCHandler(getServiceHAName(), this); 212 } 213 214 protected void unregisterRPCHandler() 215 { 216 partition_.unregisterRPCHandler(getServiceHAName(), this); 217 } 218 219 protected void registerDRMListener() throws Exception 220 { 221 DistributedReplicantManager drm = 222 this.partition_.getDistributedReplicantManager(); 223 224 drmListener = new DistributedReplicantManager.ReplicantListener() 226 { 227 Object mutex = new Object (); 228 229 public void replicantsChanged( 230 String key, 231 List newReplicants, 232 int newReplicantsViewId) 233 { 234 if (key.equals(getServiceHAName())) 235 { 236 synchronized(mutex) 242 { 243 HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId); 245 } 246 } 247 } 248 }; 249 drm.registerListener(getServiceHAName(), drmListener); 250 251 drm.add(getServiceHAName(), REPLICANT_TOKEN); 253 } 254 255 protected void unregisterDRMListener() throws Exception 256 { 257 DistributedReplicantManager drm = 258 this.partition_.getDistributedReplicantManager(); 259 260 drm.remove(getServiceHAName()); 262 263 drm.unregisterListener(getServiceHAName(), drmListener); 265 } 266 267 public void partitionTopologyChanged(List newReplicants, int newReplicantsViewId) 268 { 269 if (log.isDebugEnabled()) 270 { 271 log.debug("partitionTopologyChanged(). cluster view id: " + newReplicantsViewId); 272 } 273 } 274 275 protected boolean isDRMMasterReplica() 276 { 277 DistributedReplicantManager drm = 278 getPartition().getDistributedReplicantManager(); 279 280 return drm.isMasterReplica(getServiceHAName()); 281 } 282 283 284 public HAPartition getPartition() 285 { 286 return partition_; 287 } 288 289 296 public void callMethodOnPartition(String methodName, Object [] args) 297 throws Exception 298 { 299 getPartition().callMethodOnCluster( 300 getServiceHAName(), 301 methodName, 302 args, 303 true); 304 } 305 306 public void callMethodOnPartition(String methodName, Object [] args, Class [] types) 307 throws Exception 308 { 309 getPartition().callMethodOnCluster( 310 getServiceHAName(), 311 methodName, 312 args, types, 313 true); 314 } 315 316 324 public boolean getSendLocalLifecycleNotifications() 325 { 326 return sendLocalLifecycleNotifications; 327 } 328 329 337 public void setSendLocalLifecycleNotifications(boolean sendLocalLifecycleNotifications) 338 { 339 this.sendLocalLifecycleNotifications = sendLocalLifecycleNotifications; 340 } 341 342 355 public boolean getSendRemoteLifecycleNotifications() 356 { 357 return sendRemoteLifecycleNotifications; 358 } 359 360 373 public void setSendRemoteLifecycleNotifications(boolean sendRemoteLifecycleNotifications) 374 { 375 this.sendRemoteLifecycleNotifications = sendRemoteLifecycleNotifications; 376 } 377 378 390 public void sendNotification(Notification notification) 391 { 392 boolean stateChange = isStateChangeNotification(notification); 393 394 if (!stateChange || sendRemoteLifecycleNotifications) 395 { 396 try 397 { 398 notification.setSource(this.getServiceName()); 401 sendNotificationRemote(notification); 402 } 403 404 catch (Throwable th) 405 { 406 boolean debug = log.isDebugEnabled(); 407 if (debug) 408 log.debug("sendNotificationRemote( " + notification + " ) failed ", th); 409 411 } 412 } 413 414 if (!stateChange || sendLocalLifecycleNotifications) 415 { 416 sendNotificationToLocalListeners(notification); 417 } 418 } 419 420 private boolean isStateChangeNotification(Notification notification) 421 { 422 boolean stateChange = false; 423 if (notification instanceof AttributeChangeNotification ) 424 { 425 stateChange = "State".equals(((AttributeChangeNotification ) notification).getAttributeName()); 426 } 427 return stateChange; 428 } 429 430 protected void sendNotificationToLocalListeners(Notification notification) 431 { 432 super.sendNotification(notification); 433 } 434 435 protected void callAsyncMethodOnPartition(String methodName, Object [] args, Class [] types) 436 throws Exception 437 { 438 HAPartition partition = getPartition(); 439 if (partition != null) 440 { 441 partition.callAsynchMethodOnCluster( 442 getServiceHAName(), 443 methodName, 444 args, types, 445 true); 446 } 447 } 448 449 450 456 protected void sendNotificationRemote(Notification notification) 457 throws Exception 458 { 459 callAsyncMethodOnPartition("_receiveRemoteNotification", 460 new Object []{notification}, new Class []{Notification .class}); 461 } 462 463 469 public void _receiveRemoteNotification(Notification notification) 470 { 471 super.sendNotification(notification); 472 } 473 474 475 483 public String getServiceHAName() 484 { 485 return getServiceName().getCanonicalName(); 486 } 487 488 489 protected HAPartition findHAPartitionWithName(String name) throws Exception 490 { 491 log.debug("findHAPartitionWithName, name="+name); 492 HAPartition result = null; 493 QueryExp matchName = Query.match(Query.attr("Name"), 497 Query.value("ClusterPartition")); 498 QueryExp matchPartitionName = Query.match(Query.attr("PartitionName"), 499 Query.value(name)); 500 QueryExp exp = Query.and(matchName, matchPartitionName); 501 Set mbeans = this.getServer().queryMBeans(null, exp); 502 if (mbeans != null && mbeans.size() > 0) 503 { 504 for (Iterator iter = mbeans.iterator(); iter.hasNext();) 505 { 506 ObjectInstance inst = (ObjectInstance ) iter.next(); 507 try 508 { 509 ClusterPartitionMBean cp = 510 (ClusterPartitionMBean) MBeanProxyExt.create( 511 ClusterPartitionMBean.class, 512 inst.getObjectName(), 513 this.getServer()); 514 result = cp.getHAPartition(); 515 break; 516 } 517 catch (Exception e) {} 518 } 519 } 520 521 if( result == null ) 522 { 523 String msg = "Failed to find HAPartition with PartitionName="+name; 524 throw new InstanceNotFoundException (msg); 525 } 526 return result; 527 } 528 529 531 533 } 534 535 | Popular Tags |