1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.*; 8 import org.jgroups.persistence.CannotPersistException; 9 import org.jgroups.persistence.CannotRemoveException; 10 import org.jgroups.persistence.PersistenceFactory; 11 import org.jgroups.persistence.PersistenceManager; 12 import org.jgroups.util.Promise; 13 import org.jgroups.util.Util; 14 15 import java.io.Serializable ; 16 import java.util.*; 17 18 19 20 21 22 41 public class DistributedHashtable extends Hashtable implements MessageListener, MembershipListener { 42 43 44 45 public interface Notification { 46 void entrySet(Object key, Object value); 47 void entryRemoved(Object key); 48 void viewChange(Vector new_mbrs, Vector old_mbrs); 49 void contentsSet(Map new_entries); 50 void contentsCleared(); 51 } 52 53 54 private transient Channel channel; 55 protected transient RpcDispatcher disp=null; 56 private transient String groupname=null; 57 private final transient Vector notifs=new Vector(); private final transient Vector members=new Vector(); private transient Class [] put_signature=null; 60 private transient Class [] putAll_signature=null; 61 private transient Class [] clear_signature=null; 62 private transient Class [] remove_signature=null; 63 private transient boolean persistent=false; private transient PersistenceManager persistence_mgr=null; 65 66 68 private transient boolean send_message = false; 69 70 protected final transient Promise state_promise=new Promise(); 71 72 protected final Log log=LogFactory.getLog(this.getClass()); 73 74 75 76 77 84 public DistributedHashtable(String groupname, ChannelFactory factory, 85 String properties, long state_timeout) 86 throws ChannelException { 87 this.groupname=groupname; 88 initSignatures(); 89 channel=factory != null ? factory.createChannel(properties) : new JChannel(properties); 90 disp=new RpcDispatcher(channel, this, this, this); 91 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 92 channel.connect(groupname); 93 start(state_timeout); 94 } 95 96 106 public DistributedHashtable(String groupname, ChannelFactory factory, String properties, 107 boolean persistent, long state_timeout) 108 throws ChannelException { 109 this.groupname=groupname; 110 this.persistent=persistent; 111 initSignatures(); 112 channel=factory != null ? factory.createChannel(properties) : new JChannel(properties); 113 disp=new RpcDispatcher(channel, this, this, this); 114 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 115 channel.connect(groupname); 116 start(state_timeout); 117 } 118 119 120 public DistributedHashtable(JChannel channel, long state_timeout) 121 throws ChannelNotConnectedException, ChannelClosedException { 122 this(channel, false, state_timeout); 123 } 124 125 126 public DistributedHashtable(JChannel channel, boolean persistent, long state_timeout) 127 throws ChannelNotConnectedException, ChannelClosedException { 128 this.groupname = channel.getChannelName(); 129 this.channel = channel; 130 this.persistent=persistent; 131 init(state_timeout); 132 } 133 134 146 public DistributedHashtable(PullPushAdapter adapter, Serializable id, long state_timeout) 147 throws ChannelNotConnectedException, ChannelClosedException { 148 initSignatures(); 149 this.channel = (Channel)adapter.getTransport(); 150 this.groupname = this.channel.getChannelName(); 151 disp=new RpcDispatcher(adapter, id, this, this, this); 152 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 153 start(state_timeout); 154 } 155 156 public DistributedHashtable(PullPushAdapter adapter, Serializable id) { 157 initSignatures(); 158 this.channel = (Channel)adapter.getTransport(); 159 this.groupname = this.channel.getChannelName(); 160 disp=new RpcDispatcher(adapter, id, this, this, this); 161 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 162 } 163 164 protected void init(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { 165 initSignatures(); 166 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 167 disp = new RpcDispatcher(channel, this, this, this); 168 169 } 173 174 175 181 public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { 182 boolean rc; 183 if(persistent) { 184 if(log.isInfoEnabled()) log.info("fetching state from database"); 185 try { 186 persistence_mgr=PersistenceFactory.getInstance().createManager(); 187 } 188 catch(Throwable ex) { 189 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " + 190 "turning persistency off. Exception: " + Util.printStackTrace(ex)); 191 persistent=false; 192 } 193 } 194 195 state_promise.reset(); 196 rc=channel.getState(null, state_timeout); 197 if(rc) { 198 if(log.isInfoEnabled()) log.info("state was retrieved successfully, waiting for setState()"); 199 Boolean result=(Boolean )state_promise.getResult(state_timeout); 200 if(result == null) { 201 if(log.isErrorEnabled()) log.error("setState() never got called"); 202 } 203 else { 204 if(log.isInfoEnabled()) log.info("setState() was called"); 205 } 206 } 207 else { 208 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)"); 209 if(persistent) { 210 if(log.isInfoEnabled()) log.info("fetching state from database"); 211 try { 212 Map m=persistence_mgr.retrieveAll(); 213 if(m != null) { 214 Map.Entry entry; 215 Object key, val; 216 for(Iterator it=m.entrySet().iterator(); it.hasNext();) { 217 entry=(Map.Entry)it.next(); 218 key=entry.getKey(); 219 val=entry.getValue(); 220 221 if(log.isInfoEnabled()) log.info("inserting " + key + 222 " --> " + val); 223 put(key, val); } 225 } 226 } 227 catch(Throwable ex) { 228 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " + 229 "turning persistency off. Exception: " + Util.printStackTrace(ex)); 230 persistent=false; 231 } 232 } 233 } 234 } 235 236 237 public Address getLocalAddress() {return channel != null ? channel.getLocalAddress() : null;} 238 public String getGroupName() {return groupname;} 239 public Channel getChannel() {return channel;} 240 public boolean getPersistent() {return persistent;} 241 public void setPersistent(boolean p) {persistent=p;} 242 243 public void addNotifier(Notification n) { 244 if(!notifs.contains(n)) 245 notifs.addElement(n); 246 } 247 248 public void removeNotifier(Notification n) { 249 if(notifs.contains(n)) 250 notifs.removeElement(n); 251 } 252 253 public void stop() { 254 if(disp != null) { 255 disp.stop(); 256 disp=null; 257 } 258 if(channel != null) { 259 channel.close(); 260 channel=null; 261 } 262 } 263 264 265 271 public Object put(Object key, Object value) { 272 Object prev_val=get(key); 273 274 if(send_message == true) { 277 try { 278 disp.callRemoteMethods( 279 null, "_put", new Object []{key,value}, 280 put_signature, 281 GroupRequest.GET_ALL, 282 0); 283 } 284 catch(Exception e) { 285 } 287 } 288 else { 289 _put(key, value); 290 } 292 return prev_val; 293 } 294 295 299 public void putAll(Map m) { 300 if(send_message == true) { 303 try { 304 disp.callRemoteMethods( 305 null, "_putAll", new Object []{m}, 306 putAll_signature, 307 GroupRequest.GET_ALL, 308 0); 309 } 310 catch(Throwable t) { 311 } 312 } 313 else { 314 _putAll(m); 315 } 316 } 317 318 321 public synchronized void clear() { 322 if(send_message == true) { 325 try { 326 disp.callRemoteMethods( 327 null, "_clear", null, 328 clear_signature, 329 GroupRequest.GET_ALL, 330 0); 331 } 332 catch(Exception e) { 333 if(log.isErrorEnabled()) log.error("exception=" + e); 334 } 335 } 336 else { 337 _clear(); 338 } 339 } 340 341 346 public Object remove(Object key) { 347 Object retval = get(key); 348 349 if(send_message == true) { 352 try { 353 disp.callRemoteMethods( 354 null, "_remove", new Object []{key}, 355 remove_signature, 356 GroupRequest.GET_ALL, 357 0); 358 } 360 catch(Exception e) { 361 } 363 } 364 else { 365 _remove(key); 366 } 368 return retval; 369 } 370 371 372 373 374 375 public Object _put(Object key, Object value) { 376 Object retval=super.put(key, value); 377 if(persistent) { 378 try { 379 persistence_mgr.save((Serializable )key, (Serializable )value); 380 } 381 catch(CannotPersistException cannot_persist_ex) { 382 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + 383 value + ", exception=" + cannot_persist_ex); 384 } 385 catch(Throwable t) { 386 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " + 387 value + ", exception=" + Util.printStackTrace(t)); 388 } 389 } 390 for(int i=0; i < notifs.size(); i++) 391 ((Notification)notifs.elementAt(i)).entrySet(key, value); 392 return retval; 393 } 394 395 396 399 public void _putAll(Map m) { 400 if (m == null) 401 return; 402 403 407 409 Map.Entry entry; 411 for(Iterator it=m.entrySet().iterator(); it.hasNext();) { 412 entry=(Map.Entry)it.next(); 413 super.put(entry.getKey(), entry.getValue()); 414 } 415 416 if (persistent) { 417 try { 418 persistence_mgr.saveAll(m); 419 } 420 catch (CannotPersistException persist_ex) { 421 if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex); 422 } 423 catch (Throwable t) { 424 if(log.isErrorEnabled()) log.error("failed persisting contents: " + t); 425 } 426 } 427 for(int i=0; i < notifs.size(); i++) 428 ((Notification)notifs.elementAt(i)).contentsSet(m); 429 } 430 431 432 public void _clear() { 433 super.clear(); 434 if(persistent) { 435 try { 436 persistence_mgr.clear(); 437 } 438 catch(CannotRemoveException cannot_remove_ex) { 439 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); 440 } 441 catch(Throwable t) { 442 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); 443 } 444 } 445 for(int i=0; i < notifs.size(); i++) 446 ((Notification)notifs.elementAt(i)).contentsCleared(); 447 } 448 449 450 public Object _remove(Object key) { 451 Object retval=super.remove(key); 452 if(persistent) { 453 try { 454 persistence_mgr.remove((Serializable )key); 455 } 456 catch(CannotRemoveException cannot_remove_ex) { 457 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex); 458 } 459 catch(Throwable t) { 460 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t); 461 } 462 } 463 for(int i=0; i < notifs.size(); i++) 464 ((Notification)notifs.elementAt(i)).entryRemoved(key); 465 466 return retval; 467 } 468 469 470 471 472 473 474 475 public void receive(Message msg) { } 476 477 public byte[] getState() { 478 Object key, val; 479 Hashtable copy=new Hashtable(); 480 481 for(Enumeration e=keys(); e.hasMoreElements();) { 482 key=e.nextElement(); 483 val=get(key); 484 copy.put(key, val); 485 } 486 try { 487 return Util.objectToByteBuffer(copy); 488 } 489 catch(Throwable ex) { 490 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 491 return null; 492 } 493 } 494 495 496 public void setState(byte[] new_state) { 497 Hashtable new_copy; 498 499 try { 500 new_copy=(Hashtable)Util.objectFromByteBuffer(new_state); 501 if(new_copy == null) 502 return; 503 } 504 catch(Throwable ex) { 505 if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); 506 return; 507 } 508 _putAll(new_copy); 509 state_promise.setResult(Boolean.TRUE); 510 } 511 512 513 514 515 516 public void viewAccepted(View new_view) { 517 Vector new_mbrs=new_view.getMembers(); 518 519 if(new_mbrs != null) { 520 sendViewChangeNotifications(new_mbrs, members); members.removeAllElements(); 522 for(int i=0; i < new_mbrs.size(); i++) 523 members.addElement(new_mbrs.elementAt(i)); 524 } 525 if(members.size() > 1) { 528 send_message=true; 529 } 530 else { 531 send_message=false; 532 } 533 } 534 535 536 537 public void suspect(Address suspected_mbr) { 538 ; 539 } 540 541 542 543 public void block() {} 544 545 546 547 void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) { 548 Vector joined, left; 549 Object mbr; 550 Notification n; 551 552 if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || 553 old_mbrs.size() == 0 || new_mbrs.size() == 0) 554 return; 555 556 557 joined=new Vector(); 559 for(int i=0; i < new_mbrs.size(); i++) { 560 mbr=new_mbrs.elementAt(i); 561 if(!old_mbrs.contains(mbr)) 562 joined.addElement(mbr); 563 } 564 565 566 left=new Vector(); 568 for(int i=0; i < old_mbrs.size(); i++) { 569 mbr=old_mbrs.elementAt(i); 570 if(!new_mbrs.contains(mbr)) { 571 left.addElement(mbr); 572 } 573 } 574 575 for(int i=0; i < notifs.size(); i++) { 576 n=(Notification)notifs.elementAt(i); 577 n.viewChange(joined, left); 578 } 579 } 580 581 582 void initSignatures() { 583 try { 584 if(put_signature == null) { 585 put_signature=new Class [] {Object .class,Object .class}; 586 } 587 588 if(putAll_signature == null) { 589 putAll_signature=new Class [] {Map.class}; 590 } 591 592 if(clear_signature == null) 593 clear_signature=new Class [0]; 594 595 if(remove_signature == null) { 596 remove_signature=new Class [] {Object .class}; 597 } 598 } 599 catch(Throwable ex) { 600 if(log.isErrorEnabled()) log.error("exception=" + ex); 601 } 602 } 603 604 public static void main(String [] args) { 605 try { 606 616 JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/state_transfer.xml"); 617 c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 618 DistributedHashtable ht = new DistributedHashtable(c, false, 5000); 619 c.connect("demo"); 620 ht.start(5000); 621 622 623 624 ht.put("name", "Michelle Ban"); 625 Object old_key = ht.remove("name"); 626 System.out.println("old key was " + old_key); 627 ht.put("newkey", "newvalue"); 628 629 Map m = new HashMap(); 630 m.put("k1", "v1"); 631 m.put("k2", "v2"); 632 633 ht.putAll(m); 634 635 System.out.println("hashmap is " + ht); 636 } 637 catch (Throwable t) { 638 t.printStackTrace(); 639 } 640 } 641 642 } 643 644 | Popular Tags |