1 package org.jgroups.blocks; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.persistence.CannotPersistException; 7 import org.jgroups.persistence.CannotRemoveException; 8 import org.jgroups.persistence.PersistenceFactory; 9 import org.jgroups.persistence.PersistenceManager; 10 import org.jgroups.util.Promise; 11 import org.jgroups.util.Util; 12 13 import java.io.*; 14 import java.util.HashMap ; 15 import java.util.Map ; 16 import java.util.Vector ; 17 import java.lang.reflect.Method ; 18 19 20 38 public class ReplicatedHashMap<K extends Serializable,V extends Serializable> extends HashMap <K,V> implements ExtendedMessageListener, ExtendedMembershipListener { 39 40 41 42 43 public interface Notification { 44 void entrySet(Object key, Object value); 45 46 void entryRemoved(Object key); 47 48 void viewChange(View view, Vector <Address> new_mbrs, Vector <Address> old_mbrs); 49 50 void contentsSet(Map new_entries); 51 52 void contentsCleared(); 53 } 54 55 protected static Map <Short , Method > methods; 56 57 static { 58 try { 59 methods=new HashMap <Short ,Method >(10); 60 methods.put(new Short ((short)1), ReplicatedHashMap.class.getMethod("_put", Serializable.class, Serializable.class)); 61 methods.put(new Short ((short)2), ReplicatedHashMap.class.getMethod("_putAll", Map .class)); 62 methods.put(new Short ((short)3), ReplicatedHashMap.class.getMethod("_remove", Object .class)); 63 methods.put(new Short ((short)4), ReplicatedHashMap.class.getMethod("_clear")); 64 } 65 catch(NoSuchMethodException e) { 66 throw new RuntimeException (e); 67 } 68 } 69 70 private transient Channel channel; 71 protected transient RpcDispatcher disp=null; 72 private String cluster_name=null; 73 private final transient Vector <Notification> notifs=new Vector <Notification>(); private final Vector <Address> members=new Vector <Address>(); private transient boolean persistent=false; private transient PersistenceManager persistence_mgr=null; 77 78 82 private transient boolean send_message=false; 83 84 protected final transient Promise state_promise=new Promise(); 85 86 89 protected int update_mode=GroupRequest.GET_NONE; 90 91 94 protected long timeout=5000; 95 96 protected final Log log=LogFactory.getLog(this.getClass()); 97 98 99 107 public ReplicatedHashMap(String clustername, ChannelFactory factory, String properties, long state_timeout) 108 throws ChannelException { 109 this.cluster_name=clustername; 110 if(factory != null) { 111 channel=properties != null? factory.createChannel(properties) : factory.createChannel(); 112 } 113 else { 114 channel=new JChannel(properties); 115 } 116 disp=new RpcDispatcher(channel, this, this, this); 117 disp.setMethodLookup(new MethodLookup() { 118 public Method findMethod(short id) { 119 return methods.get(id); 120 } 121 }); 122 channel.connect(clustername); 123 start(state_timeout); 124 } 125 126 137 public ReplicatedHashMap(String clustername, ChannelFactory factory, String properties, 138 boolean persistent, long state_timeout) 139 throws ChannelException { 140 this.cluster_name=clustername; 141 this.persistent=persistent; 142 if(factory != null) { 143 channel=properties != null? factory.createChannel(properties) : factory.createChannel(); 144 } 145 else { 146 channel=new JChannel(properties); 147 } 148 disp=new RpcDispatcher(channel, this, this, this); 149 disp.setMethodLookup(new MethodLookup() { 150 public Method findMethod(short id) { 151 return methods.get(id); 152 } 153 }); 154 channel.connect(clustername); 155 start(state_timeout); 156 } 157 158 159 public ReplicatedHashMap(Channel channel, long state_timeout) { 160 this(channel, false, state_timeout); 161 } 162 163 164 public ReplicatedHashMap(Channel channel, boolean persistent, long state_timeout) { 165 this.cluster_name=channel.getClusterName(); 166 this.channel=channel; 167 this.persistent=persistent; 168 init(state_timeout); 169 } 170 171 172 protected final void init(long state_timeout) { 173 disp=new RpcDispatcher(channel, this, this, this); 174 disp.setMethodLookup(new MethodLookup() { 175 public Method findMethod(short id) { 176 return methods.get(id); 177 } 178 }); 179 180 } 184 185 186 public boolean isBlockingUpdates() { 187 return update_mode == GroupRequest.GET_ALL; 188 } 189 190 194 public void setBlockingUpdates(boolean blocking_updates) { 195 this.update_mode=blocking_updates? GroupRequest.GET_ALL : GroupRequest.GET_NONE; 196 } 197 198 201 public long getTimeout() { 202 return timeout; 203 } 204 205 209 public void setTimeout(long timeout) { 210 this.timeout=timeout; 211 } 212 213 221 public final void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { 222 boolean rc; 223 if(persistent) { 224 if(log.isInfoEnabled()) log.info("fetching state from database"); 225 try { 226 persistence_mgr=PersistenceFactory.getInstance().createManager(); 227 } 228 catch(Throwable ex) { 229 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " + 230 "turning persistency off. Exception: " + Util.printStackTrace(ex)); 231 persistent=false; 232 } 233 } 234 235 state_promise.reset(); 236 rc=channel.getState(null, state_timeout); 237 if(rc) { 238 if(log.isInfoEnabled()) log.info("state was retrieved successfully, waiting for setState()"); 239 Boolean result=(Boolean )state_promise.getResult(state_timeout); 240 if(result == null) { 241 if(log.isErrorEnabled()) log.error("setState() never got called"); 242 } 243 else { 244 if(log.isInfoEnabled()) log.info("setState() was called"); 245 } 246 } 247 else { 248 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)"); 249 if(persistent) { 250 if(log.isInfoEnabled()) log.info("fetching state from database"); 251 try { 252 Map <K,V> m=persistence_mgr.retrieveAll(); 253 if(m != null) { 254 K key; 255 V val; 256 for(Map.Entry <K,V> entry: m.entrySet()) { 257 key=entry.getKey(); 258 val=entry.getValue(); 259 if(log.isTraceEnabled()) log.trace("inserting " + key + " --> " + val); 260 put(key, val); } 262 } 263 } 264 catch(Throwable ex) { 265 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " + 266 "turning persistency off. Exception: " + Util.printStackTrace(ex)); 267 persistent=false; 268 } 269 } 270 } 271 } 272 273 274 public Address getLocalAddress() { 275 return channel != null? channel.getLocalAddress() : null; 276 } 277 278 public String getClusterName() { 279 return cluster_name; 280 } 281 282 public Channel getChannel() { 283 return channel; 284 } 285 286 public boolean getPersistent() { 287 return persistent; 288 } 289 290 public void setPersistent(boolean p) { 291 persistent=p; 292 } 293 294 295 public void setDeadlockDetection(boolean flag) { 296 if(disp != null) 297 disp.setDeadlockDetection(flag); 298 } 299 300 public void addNotifier(Notification n) { 301 if(!notifs.contains(n)) 302 notifs.addElement(n); 303 } 304 305 public void removeNotifier(Notification n) { 306 if(notifs.contains(n)) 307 notifs.removeElement(n); 308 } 309 310 public void stop() { 311 if(disp != null) { 312 disp.stop(); 313 disp=null; 314 } 315 if(channel != null) { 316 channel.close(); 317 channel=null; 318 } 319 } 320 321 322 323 324 330 public V put(K key, V value) { 331 V prev_val=get(key); 332 333 if(send_message == true) { 334 try { 335 MethodCall call=new MethodCall((short)1, new Object []{key, value}); 336 disp.callRemoteMethods(null, call, update_mode, timeout); 337 } 338 catch(Exception e) { 339 throw new RuntimeException ("put(" + key + ", " + value + ") failed", e); 340 } 341 } 342 else { 343 _put(key, value); 344 } 345 return prev_val; 346 } 347 348 349 354 public void putAll(Map <? extends K, ? extends V> m) { 355 if(send_message == true) { 356 try { 357 MethodCall call=new MethodCall((short)2, new Object []{m}); 358 disp.callRemoteMethods(null, call, update_mode, timeout); 359 } 360 catch(Throwable t) { 361 throw new RuntimeException ("putAll() failed", t); 362 } 363 } 364 else { 365 _putAll(m); 366 } 367 } 368 369 372 public void clear() { 373 if(send_message == true) { 376 try { 377 MethodCall call=new MethodCall((short)4, null); 378 disp.callRemoteMethods(null, call, update_mode, timeout); 379 } 380 catch(Exception e) { 381 throw new RuntimeException ("clear() failed", e); 382 } 383 } 384 else { 385 _clear(); 386 } 387 } 388 389 394 public V remove(Object key) { 395 V retval=get(key); 396 397 if(send_message == true) { 400 try { 401 MethodCall call=new MethodCall((short)3, new Object []{key}); 402 disp.callRemoteMethods(null, call, update_mode, timeout); 403 } 404 catch(Exception e) { 405 throw new RuntimeException ("remove(" + key + ") failed", e); 406 } 407 } 408 else { 409 _remove(key); 410 } 412 return retval; 413 } 414 415 416 417 418 419 public V _put(K key, V value) { 420 V retval=super.put(key, value); 421 if(persistent) { 422 try { 423 persistence_mgr.save(key, value); 424 } 425 catch(CannotPersistException cannot_persist_ex) { 426 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + 427 value + ", exception=" + cannot_persist_ex); 428 } 429 catch(Throwable t) { 430 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + 431 value + ", exception=" + Util.printStackTrace(t)); 432 } 433 } 434 for(int i=0; i < notifs.size(); i++) 435 notifs.elementAt(i).entrySet(key, value); 436 return retval; 437 } 438 439 440 443 public void _putAll(Map <? extends K, ? extends V> map) { 444 if(map == null) 445 return; 446 447 451 453 for(Map.Entry <? extends K,? extends V> entry: map.entrySet()) { 455 super.put(entry.getKey(), entry.getValue()); 456 } 457 458 if(persistent) { 459 try { 460 persistence_mgr.saveAll(map); 461 } 462 catch(CannotPersistException persist_ex) { 463 if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex); 464 } 465 catch(Throwable t) { 466 if(log.isErrorEnabled()) log.error("failed persisting contents: " + t); 467 } 468 } 469 for(int i=0; i < notifs.size(); i++) 470 notifs.elementAt(i).contentsSet(map); 471 } 472 473 474 public void _clear() { 475 super.clear(); 476 if(persistent) { 477 try { 478 persistence_mgr.clear(); 479 } 480 catch(CannotRemoveException cannot_remove_ex) { 481 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); 482 } 483 catch(Throwable t) { 484 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); 485 } 486 } 487 for(int i=0; i < notifs.size(); i++) 488 notifs.elementAt(i).contentsCleared(); 489 } 490 491 492 public V _remove(Object key) { 493 V retval=super.remove(key); 494 if(persistent) { 495 try { 496 persistence_mgr.remove((Serializable)key); 497 } 498 catch(CannotRemoveException cannot_remove_ex) { 499 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); 500 } 501 catch(Throwable t) { 502 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); 503 } 504 } 505 for(int i=0; i < notifs.size(); i++) 506 notifs.elementAt(i).entryRemoved(key); 507 508 return retval; 509 } 510 511 512 513 514 515 public void receive(Message msg) { 516 } 517 518 public byte[] getState() { 519 K key; 520 V val; 521 Map <K,V> copy=new HashMap <K,V>(); 522 523 for(Map.Entry <K,V> entry: entrySet()) { 524 key=entry.getKey(); 525 val=entry.getValue(); 526 copy.put(key, val); 527 } 528 try { 529 return Util.objectToByteBuffer(copy); 530 } 531 catch(Throwable ex) { 532 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 533 return null; 534 } 535 } 536 537 538 public void setState(byte[] new_state) { 539 HashMap <K,V> new_copy; 540 541 try { 542 new_copy=(HashMap <K,V>)Util.objectFromByteBuffer(new_state); 543 if(new_copy == null) 544 return; 545 } 546 catch(Throwable ex) { 547 if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); 548 return; 549 } 550 _putAll(new_copy); 551 state_promise.setResult(Boolean.TRUE); 552 } 553 554 555 556 public void viewAccepted(View new_view) { 557 Vector <Address> new_mbrs=new_view.getMembers(); 558 559 if(new_mbrs != null) { 560 sendViewChangeNotifications(new_view, new_mbrs, new Vector <Address>(members)); members.clear(); 562 members.addAll(new_mbrs); 563 } 564 send_message=members.size() > 1; 567 } 568 569 570 573 public void suspect(Address suspected_mbr) { 574 ; 575 } 576 577 578 581 public void block() { 582 } 583 584 585 void sendViewChangeNotifications(View view, Vector <Address> new_mbrs, Vector <Address> old_mbrs) { 586 Vector <Address> joined, left; 587 Notification n; 588 589 if((notifs.isEmpty()) || (old_mbrs == null) || (new_mbrs == null) || 590 (old_mbrs.isEmpty()) || (new_mbrs.isEmpty())) 591 return; 592 593 joined=new Vector <Address>(); 595 for(Address mbr: new_mbrs) { 596 if(!old_mbrs.contains(mbr)) 597 joined.addElement(mbr); 598 } 599 600 left=new Vector <Address>(); 602 for(Address mbr: old_mbrs) { 603 if(!new_mbrs.contains(mbr)) { 604 left.addElement(mbr); 605 } 606 } 607 608 for(int i=0; i < notifs.size(); i++) { 609 n=notifs.elementAt(i); 610 n.viewChange(view, joined, left); 611 } 612 } 613 614 615 616 617 public byte[] getState(String state_id) { 618 return null; 620 } 621 622 public void getState(OutputStream ostream) { 623 K key; 624 V val; 625 HashMap <K,V> copy=new HashMap <K,V>(); 626 ObjectOutputStream oos=null; 627 628 for(Map.Entry <K,V> entry: entrySet()) { 629 key=entry.getKey(); 630 val=entry.getValue(); 631 copy.put(key, val); 632 } 633 try { 634 oos=new ObjectOutputStream(ostream); 635 oos.writeObject(copy); 636 } 637 catch(Throwable ex) { 638 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 639 } 640 finally { 641 Util.close(oos); 642 } 643 } 644 645 public void getState(String state_id, OutputStream ostream) { 646 } 647 648 public void setState(String state_id, byte[] state) { 649 } 650 651 public void setState(InputStream istream) { 652 HashMap <K,V> new_copy=null; 653 ObjectInputStream ois=null; 654 try { 655 ois=new ObjectInputStream(istream); 656 new_copy=(HashMap <K,V>)ois.readObject(); 657 ois.close(); 658 } 659 catch(Throwable e) { 660 e.printStackTrace(); 661 if(log.isErrorEnabled()) log.error("exception marshalling state: " + e); 662 } 663 finally { 664 Util.close(ois); 665 } 666 if(new_copy != null) 667 _putAll(new_copy); 668 669 state_promise.setResult(Boolean.TRUE); 670 } 671 672 public void setState(String state_id, InputStream istream) { 673 } 674 675 public void unblock() { 676 } 677 678 } | Popular Tags |