1 3 package org.jgroups.blocks; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.*; 9 import org.jgroups.util.Promise; 10 import org.jgroups.util.Util; 11 12 import java.io.Serializable ; 13 import java.util.Vector ; 14 15 16 21 public class NotificationBus implements MessageListener, MembershipListener { 22 final Vector members=new Vector (); 23 JChannel channel=null; 24 Address local_addr=null; 25 PullPushAdapter ad=null; 26 Consumer consumer=null; String bus_name="notification_bus"; 28 final Promise get_cache_promise=new Promise(); 29 final Object cache_mutex=new Object (); 30 31 protected final Log log=LogFactory.getLog(getClass()); 32 33 34 String props=null; 35 36 37 public interface Consumer { 38 void handleNotification(Serializable n); 39 40 41 Serializable getCache(); 42 43 void memberJoined(Address mbr); 44 45 void memberLeft(Address mbr); 46 } 47 48 49 public NotificationBus() throws Exception { 50 this(null, null); 51 } 52 53 54 public NotificationBus(String bus_name) throws Exception { 55 this(bus_name, null); 56 } 57 58 59 public NotificationBus(String bus_name, String properties) throws Exception { 60 if(bus_name != null) this.bus_name=bus_name; 61 if(properties != null) props=properties; 62 channel=new JChannel(props); 63 } 64 65 66 public void setConsumer(Consumer c) { 67 consumer=c; 68 } 69 70 71 public Address getLocalAddress() { 72 if(local_addr != null) return local_addr; 73 if(channel != null) 74 local_addr=channel.getLocalAddress(); 75 return local_addr; 76 } 77 78 79 82 public Vector getMembership() { 83 return members; 84 } 85 86 87 89 public Channel getChannel() { 90 return channel; 91 } 92 93 94 public boolean isCoordinator() { 95 Object first_mbr=null; 96 97 synchronized(members) { 98 first_mbr=members.size() > 0 ? members.elementAt(0) : null; 99 if(first_mbr == null) 100 return true; 101 } 102 if(getLocalAddress() != null) 103 return getLocalAddress().equals(first_mbr); 104 return false; 105 } 106 107 108 public void start() throws Exception { 109 channel.connect(bus_name); 110 ad=new PullPushAdapter(channel, this, this); 111 } 112 113 114 public void stop() { 115 if(ad != null) { 116 ad.stop(); 117 ad=null; 118 } 119 if(channel != null) { 120 channel.close(); channel=null; 122 } 123 } 124 125 126 127 public void sendNotification(Serializable n) { 128 Message msg=null; 129 byte[] data=null; 130 Info info; 131 132 try { 133 if(n == null) return; 134 info=new Info(Info.NOTIFICATION, n); 135 data=Util.objectToByteBuffer(info); 136 msg=new Message(null, null, data); 137 if(channel == null) { 138 if(log.isErrorEnabled()) log.error("channel is null. " + 139 " Won't send notification"); 140 return; 141 } 142 channel.send(msg); 143 } 144 catch(Throwable ex) { 145 146 if(log.isErrorEnabled()) log.error("exception is " + ex); 147 } 148 } 149 150 151 157 public Serializable getCacheFromCoordinator(long timeout, int max_tries) { 158 return getCacheFromMember(null, timeout, max_tries); 159 } 160 161 162 171 public Serializable getCacheFromMember(Address mbr, long timeout, int max_tries) { 172 Serializable cache=null; 173 int num_tries=0; 174 Info info=new Info(Info.GET_CACHE_REQ); 175 Message msg; 176 Address dst=mbr; 178 long start, stop; 180 181 if(max_tries < 1) max_tries=1; 182 183 get_cache_promise.reset(); 184 while(num_tries <= max_tries) { 185 if(mbr == null) { dst=determineCoordinator(); 187 if(dst == null || dst.equals(getLocalAddress())) { if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + 189 "] no coordinator found --> first member (cache is empty)"); 190 return null; 191 } 192 } 193 194 if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] dst=" + dst + 196 ", timeout=" + timeout + ", max_tries=" + max_tries + ", num_tries=" + num_tries); 197 198 info=new Info(Info.GET_CACHE_REQ); 199 msg=new Message(dst, null, info); 200 channel.down(new Event(Event.MSG, msg)); 201 202 start=System.currentTimeMillis(); 203 cache=(Serializable ) get_cache_promise.getResult(timeout); 204 stop=System.currentTimeMillis(); 205 if(cache != null) { 206 if(log.isInfoEnabled()) log.info("got cache from " + 207 dst + ": cache is valid (waited " + (stop - start) + " msecs on get_cache_promise)"); 208 return cache; 209 } 210 else { 211 if(log.isErrorEnabled()) log.error("received null cache; retrying (waited " + 212 (stop - start) + " msecs on get_cache_promise)"); 213 } 214 215 Util.sleep(500); 216 ++num_tries; 217 } 218 if(cache == null) 219 if(log.isErrorEnabled()) log.error("[" + getLocalAddress() + 220 "] cache is null (num_tries=" + num_tries + ')'); 221 return cache; 222 } 223 224 225 228 public void notifyConsumer(Serializable n) { 229 if(consumer != null && n != null) 230 consumer.handleNotification(n); 231 } 232 233 234 235 public void receive(Message msg) { 236 Info info=null; 237 Object obj; 238 239 if(msg == null || msg.getLength() == 0) return; 240 try { 241 obj=msg.getObject(); 242 if(!(obj instanceof Info)) { 243 244 if(log.isErrorEnabled()) log.error("expected an instance of Info (received " + 245 obj.getClass().getName() + ')'); 246 return; 247 } 248 info=(Info) obj; 249 switch(info.type) { 250 case Info.NOTIFICATION: 251 notifyConsumer(info.data); 252 break; 253 254 case Info.GET_CACHE_REQ: 255 handleCacheRequest(msg.getSrc()); 256 break; 257 258 case Info.GET_CACHE_RSP: 259 if(log.isDebugEnabled()) log.debug("[GET_CACHE_RSP] cache was received from " + msg.getSrc()); 261 get_cache_promise.setResult(info.data); 262 break; 263 264 default: 265 if(log.isErrorEnabled()) log.error("type " + info.type + " unknown"); 266 break; 267 } 268 } 269 catch(Throwable ex) { 270 271 if(log.isErrorEnabled()) log.error("exception=" + ex); 272 } 273 } 274 275 public byte[] getState() { 276 return null; 277 } 278 279 public void setState(byte[] state) { 280 } 281 282 283 284 285 286 287 288 289 public synchronized void viewAccepted(View new_view) { 290 Vector joined_mbrs, left_mbrs, tmp; 291 Object tmp_mbr; 292 293 if(new_view == null) return; 294 tmp=new_view.getMembers(); 295 296 synchronized(members) { 297 joined_mbrs=new Vector (); 299 for(int i=0; i < tmp.size(); i++) { 300 tmp_mbr=tmp.elementAt(i); 301 if(!members.contains(tmp_mbr)) 302 joined_mbrs.addElement(tmp_mbr); 303 } 304 305 left_mbrs=new Vector (); 307 for(int i=0; i < members.size(); i++) { 308 tmp_mbr=members.elementAt(i); 309 if(!tmp.contains(tmp_mbr)) 310 left_mbrs.addElement(tmp_mbr); 311 } 312 313 members.removeAllElements(); 315 members.addAll(tmp); 316 } 317 318 if(consumer != null) { 319 if(joined_mbrs.size() > 0) 320 for(int i=0; i < joined_mbrs.size(); i++) 321 consumer.memberJoined((Address) joined_mbrs.elementAt(i)); 322 if(left_mbrs.size() > 0) 323 for(int i=0; i < left_mbrs.size(); i++) 324 consumer.memberLeft((Address) left_mbrs.elementAt(i)); 325 } 326 } 327 328 329 public void suspect(Address suspected_mbr) { 330 } 331 332 public void block() { 333 } 334 335 336 337 338 339 340 341 342 343 344 345 346 Address determineCoordinator() { 347 Vector v=channel != null ? channel.getView().getMembers() : null; 348 return v != null ? (Address) v.elementAt(0) : null; 349 } 350 351 352 void handleCacheRequest(Address sender) { 353 Serializable cache=null; 354 Message msg; 355 Info info; 356 357 if(sender == null) { 358 if(log.isErrorEnabled()) log.error("sender is null"); 361 return; 362 } 363 364 synchronized(cache_mutex) { 365 cache=getCache(); info=new Info(Info.GET_CACHE_RSP, cache); 367 msg=new Message(sender, null, info); 368 if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] returning cache to " + sender); 369 channel.down(new Event(Event.MSG, msg)); 370 } 371 } 372 373 public Serializable getCache() { 374 return consumer != null ? consumer.getCache() : null; 375 } 376 377 378 379 380 381 382 383 384 385 private static class Info implements Serializable { 386 public final static int NOTIFICATION=1; 387 public final static int GET_CACHE_REQ=2; 388 public final static int GET_CACHE_RSP=3; 389 390 391 int type=0; 392 Serializable data=null; 394 395 public Info(int type) { 396 this.type=type; 397 } 398 399 public Info(int type, Serializable data) { 400 this.type=type; 401 this.data=data; 402 } 403 404 405 public String toString() { 406 StringBuffer sb=new StringBuffer (); 407 sb.append("type= "); 408 if(type == NOTIFICATION) 409 sb.append("NOTIFICATION"); 410 else if(type == GET_CACHE_REQ) 411 sb.append("GET_CACHE_REQ"); 412 else if(type == GET_CACHE_RSP) 413 sb.append("GET_CACHE_RSP"); 414 else 415 sb.append("<unknown>"); 416 if(data != null) { 417 if(type == NOTIFICATION) 418 sb.append(", notification=" + data); 419 else if(type == GET_CACHE_RSP) sb.append(", cache=" + data); 420 } 421 return sb.toString(); 422 } 423 } 424 425 426 } 427 428 429 430 | Popular Tags |