1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.*; 8 import org.jgroups.util.Util; 9 10 import java.io.Serializable ; 11 import java.util.*; 12 13 34 public class ReplicatedHashtable extends Hashtable implements MessageListener, MembershipListener { 35 36 public interface Notification { 37 void entrySet(Object key, Object value); 38 39 void entryRemoved(Object key); 40 41 void viewChange(Vector new_mbrs, Vector old_mbrs); 42 43 void contentsSet(Map new_entries); 44 } 45 46 public interface StateTransferListener { 47 void stateTransferStarted(); 48 49 void stateTransferCompleted(boolean success); 50 } 51 52 transient Channel channel; 53 transient PullPushAdapter adapter=null; 54 final transient Vector notifs=new Vector(); 55 final transient Vector members=new Vector(); final transient List state_transfer_listeners=new ArrayList(); 58 transient boolean state_transfer_running=false; 59 60 62 private transient boolean send_message=false; 63 64 protected final transient Log log=LogFactory.getLog(this.getClass()); 65 66 73 public ReplicatedHashtable(String groupname, ChannelFactory factory, StateTransferListener l, String properties, long state_timeout) { 74 if(l != null) 75 addStateTransferListener(l); 76 try { 77 channel=factory != null ? factory.createChannel(properties) : new JChannel(properties); 78 channel.connect(groupname); 79 adapter=new PullPushAdapter(channel, this, this); 80 adapter.setListener(this); 81 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 82 boolean rc=channel.getState(null, state_timeout); 83 if(rc) 84 if(log.isInfoEnabled()) log.info("state was retrieved successfully"); 85 else 86 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)"); 87 } 88 catch(Exception e) { 89 if(log.isErrorEnabled()) log.error("exception=" + e); 90 } 91 } 92 93 void getInitState(Channel channel, long state_timeout) throws Exception { 94 try { 95 notifyStateTransferStarted(); 96 boolean rc=channel.getState(null, state_timeout); 97 if(rc) 98 if(log.isInfoEnabled()) log.info("state was retrieved successfully"); 99 else { 100 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)"); 101 notifyStateTransferCompleted(false); 102 } 103 } 104 catch(Exception ex) { 105 notifyStateTransferCompleted(false); 106 throw ex; 107 } 108 } 109 110 public ReplicatedHashtable(String groupname, ChannelFactory factory, String properties, long state_timeout) { 111 this(groupname, factory, null, properties, state_timeout); 112 } 113 114 public ReplicatedHashtable(JChannel channel, long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { 115 this(channel, null, state_timeout); 116 } 117 118 public ReplicatedHashtable(JChannel channel, StateTransferListener l, long state_timeout) throws ChannelClosedException, ChannelNotConnectedException { 119 this.channel=channel; 120 this.adapter=new PullPushAdapter(channel, this, this); 121 this.adapter.setListener(this); 122 if(l != null) 123 addStateTransferListener(l); 124 this.channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 125 boolean rc=channel.getState(null, state_timeout); 126 if(rc) 127 if(log.isInfoEnabled()) log.info("state was retrieved successfully"); 128 else 129 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)"); 130 } 131 132 public boolean stateTransferRunning() { 133 return state_transfer_running; 134 } 135 136 public Address getLocalAddress() { 137 return channel != null ? channel.getLocalAddress() : null; 138 } 139 140 public Channel getChannel() { 141 return channel; 142 } 143 144 public void addNotifier(Notification n) { 145 if(!notifs.contains(n)) 146 notifs.addElement(n); 147 } 148 149 public void addStateTransferListener(StateTransferListener l) { 150 if(l != null && !(state_transfer_listeners.contains(l))) 151 state_transfer_listeners.add(l); 152 } 153 154 public void removeStateTransferListener(StateTransferListener l) { 155 if(l != null) 156 state_transfer_listeners.remove(l); 157 } 158 159 165 public Object put(Object key, Object value) { 166 Message msg; 167 Object prev_val=null; 168 prev_val=get(key); 169 170 if(send_message == true) { 173 try { 174 msg=new Message(null, null, new Request(Request.PUT, key, value)); 175 channel.send(msg); 176 } 178 catch(Exception e) { 179 } 181 } 182 else { 183 super.put(key, value); 184 } 186 return prev_val; 187 } 188 189 193 public void putAll(Map m) { 194 Message msg; 195 if(send_message == true) { 198 try { 199 msg=new Message(null, null, new Request(Request.PUT_ALL, null, m)); 200 channel.send(msg); 201 } 202 catch(Exception e) { 203 if(log.isErrorEnabled()) log.error("exception=" + e); 204 } 205 } 206 else { 207 super.putAll(m); 208 } 209 } 210 211 214 public void clear() { 215 Message msg; 216 if(send_message == true) { 219 try { 220 msg=new Message(null, null, new Request(Request.CLEAR, null, null)); 221 channel.send(msg); 222 } 223 catch(Exception e) { 224 if(log.isErrorEnabled()) log.error("exception=" + e); 225 } 226 } 227 else { 228 super.clear(); 229 } 230 } 231 232 237 public Object remove(Object key) { 238 Message msg; 239 Object retval=null; 240 retval=get(key); 241 242 if(send_message == true) { 245 try { 246 msg=new Message(null, null, new Request(Request.REMOVE, key, null)); 247 channel.send(msg); 248 } 250 catch(Exception e) { 251 } 253 } 254 else { 255 super.remove(key); 256 } 258 return retval; 259 } 260 261 262 Object _put(Object key, Object value) { 263 Object retval=super.put(key, value); 264 for(int i=0; i < notifs.size(); i++) 265 ((Notification)notifs.elementAt(i)).entrySet(key, value); 266 return retval; 267 } 268 269 void _clear() { 270 super.clear(); 271 } 272 273 Object _remove(Object key) { 274 Object retval=super.remove(key); 275 for(int i=0; i < notifs.size(); i++) 276 ((Notification)notifs.elementAt(i)).entryRemoved(key); 277 return retval; 278 } 279 280 283 public void _putAll(Map m) { 284 if(m == null) 285 return; 286 Map.Entry entry; 294 for(Iterator it=m.entrySet().iterator(); it.hasNext();) { 295 entry=(Map.Entry)it.next(); 296 super.put(entry.getKey(), entry.getValue()); 297 } 298 299 for(int i=0; i < notifs.size(); i++) 300 ((Notification)notifs.elementAt(i)).contentsSet(m); 301 } 302 303 304 305 306 public void receive(Message msg) { 307 Request req=null; 308 309 if(msg == null) 310 return; 311 req=(Request)msg.getObject(); 312 if(req == null) 313 return; 314 switch(req.req_type) { 315 case Request.PUT: 316 if(req.key != null && req.val != null) 317 _put(req.key, req.val); 318 break; 319 case Request.REMOVE: 320 if(req.key != null) 321 _remove(req.key); 322 break; 323 case Request.CLEAR: 324 _clear(); 325 break; 326 327 case Request.PUT_ALL: 328 if(req.val != null) 329 _putAll((Map)req.val); 330 break; 331 default : 332 } 334 } 335 336 public byte[] getState() { 337 Object key, val; 338 Hashtable copy=new Hashtable(); 339 340 for(Enumeration e=keys(); e.hasMoreElements();) { 341 key=e.nextElement(); 342 val=get(key); 343 copy.put(key, val); 344 } 345 try { 346 return Util.objectToByteBuffer(copy); 347 } 348 catch(Exception ex) { 349 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 350 return null; 351 } 352 } 353 354 public void setState(byte[] new_state) { 355 Hashtable new_copy; 356 Object key; 357 358 try { 359 new_copy=(Hashtable)Util.objectFromByteBuffer(new_state); 360 if(new_copy == null) { 361 notifyStateTransferCompleted(true); 362 return; 363 } 364 } 365 catch(Throwable ex) { 366 if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); 367 notifyStateTransferCompleted(false); 368 return; 369 } 370 371 _clear(); for(Enumeration e=new_copy.keys(); e.hasMoreElements();) { 373 key=e.nextElement(); 374 _put(key, new_copy.get(key)); 375 } 376 notifyStateTransferCompleted(true); 377 } 378 379 380 381 382 383 public void viewAccepted(View new_view) { 384 Vector new_mbrs=new_view.getMembers(); 385 386 if(new_mbrs != null) { 387 sendViewChangeNotifications(new_mbrs, members); 388 members.removeAllElements(); 390 for(int i=0; i < new_mbrs.size(); i++) 391 members.addElement(new_mbrs.elementAt(i)); 392 } 393 if(members.size() > 1) { 396 send_message=true; 397 } 398 else { 399 send_message=false; 400 } 401 } 402 403 404 public void suspect(Address suspected_mbr) { 405 ; 406 } 407 408 409 public void block() { 410 } 411 412 413 414 void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) { 415 Vector joined, left; 416 Object mbr; 417 Notification n; 418 419 if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0) 420 return; 421 422 joined=new Vector(); 424 for(int i=0; i < new_mbrs.size(); i++) { 425 mbr=new_mbrs.elementAt(i); 426 if(!old_mbrs.contains(mbr)) 427 joined.addElement(mbr); 428 } 429 430 left=new Vector(); 432 for(int i=0; i < old_mbrs.size(); i++) { 433 mbr=old_mbrs.elementAt(i); 434 if(!new_mbrs.contains(mbr)) { 435 left.addElement(mbr); 436 } 437 } 438 439 for(int i=0; i < notifs.size(); i++) { 440 n=(Notification)notifs.elementAt(i); 441 n.viewChange(joined, left); 442 } 443 } 444 445 void notifyStateTransferStarted() { 446 state_transfer_running=true; 447 for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) { 448 StateTransferListener listener=(StateTransferListener)it.next(); 449 try { 450 listener.stateTransferStarted(); 451 } 452 catch(Throwable t) { 453 } 454 } 455 } 456 457 void notifyStateTransferCompleted(boolean success) { 458 state_transfer_running=false; 459 for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) { 460 StateTransferListener listener=(StateTransferListener)it.next(); 461 try { 462 listener.stateTransferCompleted(success); 463 } 464 catch(Throwable t) { 465 } 466 } 467 } 468 469 private static class Request implements Serializable { 470 static final int PUT=1; 471 static final int REMOVE=2; 472 static final int CLEAR=3; 473 static final int PUT_ALL=4; 474 475 int req_type=0; 476 Object key=null; 477 Object val=null; 478 479 Request(int req_type, Object key, Object val) { 480 this.req_type=req_type; 481 this.key=key; 482 this.val=val; 483 } 484 485 public String toString() { 486 StringBuffer sb=new StringBuffer (); 487 sb.append(type2String(req_type)); 488 if(key != null) 489 sb.append("\nkey=" + key); 490 if(val != null) 491 sb.append("\nval=" + val); 492 return sb.toString(); 493 } 494 495 String type2String(int t) { 496 switch(t) { 497 case PUT: 498 return "PUT"; 499 case REMOVE: 500 return "REMOVE"; 501 case CLEAR: 502 return "CLEAR"; 503 case PUT_ALL: 504 return "PUT_ALL"; 505 default : 506 return "<unknown>"; 507 } 508 } 509 510 } 511 } 512 | Popular Tags |