1 package org.jgroups.blocks; 3 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.jgroups.*; 7 import org.jgroups.util.RspList; 8 import org.jgroups.util.Util; 9 10 import java.io.Serializable ; 11 import java.util.*; 12 13 14 27 public class DistributedQueue implements MessageListener, MembershipListener, Cloneable 28 { 29 public interface Notification 30 { 31 void entryAdd(Object value); 32 33 void entryRemoved(Object key); 34 35 void viewChange(Vector new_mbrs, Vector old_mbrs); 36 37 void contentsCleared(); 38 39 void contentsSet(Collection new_entries); 40 } 41 42 protected Log logger = LogFactory.getLog(getClass()); 43 private long internal_timeout = 10000; 45 46 protected Object mutex = new Object (); 47 protected transient boolean stopped = false; protected LinkedList internalQueue; 49 protected transient Channel channel; 50 protected transient RpcDispatcher disp = null; 51 protected transient String groupname = null; 52 protected transient Vector notifs = new Vector(); protected transient Vector members = new Vector(); private transient Class [] add_signature = null; 55 private transient Class [] addAtHead_signature = null; 56 private transient Class [] addAll_signature = null; 57 private transient Class [] reset_signature = null; 58 private transient Class [] remove_signature = null; 59 60 67 public DistributedQueue(String groupname, ChannelFactory factory, String properties, long state_timeout) 68 throws ChannelException 69 { 70 if (logger.isDebugEnabled()) 71 { 72 logger.debug("DistributedQueue(" + groupname + ',' + properties + ',' + state_timeout); 73 } 74 75 this.groupname = groupname; 76 initSignatures(); 77 internalQueue = new LinkedList(); 78 channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties); 79 disp = new RpcDispatcher(channel, this, this, this); 80 disp.setDeadlockDetection(false); channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 82 channel.connect(groupname); 83 start(state_timeout); 84 } 85 86 public DistributedQueue(JChannel channel) 87 { 88 this.groupname = channel.getChannelName(); 89 this.channel = channel; 90 init(); 91 } 92 93 105 public DistributedQueue(PullPushAdapter adapter, Serializable id) 106 { 107 this.channel = (Channel)adapter.getTransport(); 108 this.groupname = this.channel.getChannelName(); 109 110 initSignatures(); 111 internalQueue = new LinkedList(); 112 113 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 114 disp = new RpcDispatcher(adapter, id, this, this, this); 115 disp.setDeadlockDetection(false); } 117 118 protected void init() 119 { 120 initSignatures(); 121 internalQueue = new LinkedList(); 122 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 123 disp = new RpcDispatcher(channel, this, this, this); 124 disp.setDeadlockDetection(false); } 126 127 public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException 128 { 129 boolean rc; 130 logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval"); 131 132 rc = channel.getState(null, state_timeout); 133 134 if (rc) 135 { 136 logger.info("DistributedQueue.initState(" + groupname + "): state was retrieved successfully"); 137 } 138 else 139 { 140 logger.info("DistributedQueue.initState(" + groupname + "): state could not be retrieved (first member)"); 141 } 142 } 143 144 public Address getLocalAddress() 145 { 146 return (channel != null) ? channel.getLocalAddress() : null; 147 } 148 149 public Channel getChannel() 150 { 151 return channel; 152 } 153 154 public void addNotifier(Notification n) 155 { 156 if (n != null && !notifs.contains(n)) 157 { 158 notifs.addElement(n); 159 } 160 } 161 162 public void removeNotifier(Notification n) 163 { 164 notifs.removeElement(n); 165 } 166 167 public void stop() 168 { 169 170 synchronized (mutex) 171 { 172 internalQueue.clear(); 173 174 if (disp != null) 175 { 176 disp.stop(); 177 disp = null; 178 } 179 180 if (channel != null) 181 { 182 channel.close(); 183 channel = null; 184 } 185 186 stopped = true; 187 } 188 } 189 190 194 public void add(Object value) 195 { 196 try 197 { 198 Object retval = null; 199 200 RspList rsp = disp.callRemoteMethods(null, "_add", new Object []{value}, add_signature, GroupRequest.GET_ALL, 0); 201 Vector results = rsp.getResults(); 202 203 if (results.size() > 0) 204 { 205 retval = results.elementAt(0); 206 207 if (logger.isDebugEnabled()) 208 { 209 checkResult(rsp, retval); 210 } 211 } 212 } 213 catch (Exception e) 214 { 215 logger.error("Unable to add value " + value, e); 216 } 217 218 return; 219 } 220 221 225 public void addAtHead(Object value) 226 { 227 try 228 { 229 disp.callRemoteMethods(null, "_addAtHead", new Object []{value}, addAtHead_signature, GroupRequest.GET_ALL, 0); 230 } 231 catch (Exception e) 232 { 233 logger.error("Unable to addAtHead value " + value, e); 234 } 235 236 return; 237 } 238 239 245 public void addAll(Collection values) 246 { 247 try 248 { 249 disp.callRemoteMethods(null, "_addAll", new Object []{values}, addAll_signature, GroupRequest.GET_ALL, 0); 250 } 251 catch (Exception e) 252 { 253 logger.error("Unable to addAll value: " + values, e); 254 } 255 256 return; 257 } 258 259 public Vector getContents() 260 { 261 Vector result = new Vector(); 262 263 for (Iterator e = internalQueue.iterator(); e.hasNext();) 264 result.add(e.next()); 265 266 return result; 267 } 268 269 public int size() 270 { 271 return internalQueue.size(); 272 } 273 274 280 public Object peek() 281 { 282 Object retval = null; 283 284 try 285 { 286 retval = internalQueue.getFirst(); 287 } 288 catch (NoSuchElementException e) 289 { 290 } 291 292 return retval; 293 } 294 295 public void reset() 296 { 297 try 298 { 299 disp.callRemoteMethods(null, "_reset", null, reset_signature, GroupRequest.GET_ALL, 0); 300 } 301 catch (Exception e) 302 { 303 logger.error("DistributedQueue.reset(" + groupname + ')', e); 304 } 305 } 306 307 protected void checkResult(RspList rsp, Object retval) 308 { 309 if (logger.isDebugEnabled()) 310 { 311 logger.debug("Value updated from " + groupname + " :" + retval); 312 } 313 314 Vector results = rsp.getResults(); 315 316 for (int i = 0; i < results.size(); i++) 317 { 318 Object data = results.elementAt(i); 319 320 if (!data.equals(retval)) 321 { 322 logger.error("Reference value differs from returned value " + retval + " != " + data); 323 } 324 } 325 } 326 327 331 public Object remove() 332 { 333 Object retval = null; 334 RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout); 335 Vector results = rsp.getResults(); 336 337 if (results.size() > 0) 338 { 339 retval = results.elementAt(0); 340 341 if (logger.isDebugEnabled()) 342 { 343 checkResult(rsp, retval); 344 } 345 } 346 347 return retval; 348 } 349 350 354 public Object remove(long timeout) 355 { 356 Object retval = null; 357 long start = System.currentTimeMillis(); 358 359 if (timeout <= 0) 360 { 361 while (!stopped && (retval == null)) 362 { 363 RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout); 364 Vector results = rsp.getResults(); 365 366 if (results.size() > 0) 367 { 368 retval = results.elementAt(0); 369 370 if (logger.isDebugEnabled()) 371 { 372 checkResult(rsp, retval); 373 } 374 } 375 376 if (retval == null) 377 { 378 try 379 { 380 synchronized (mutex) 381 { 382 mutex.wait(); 383 } 384 } 385 catch (InterruptedException e) 386 { 387 } 388 } 389 } 390 } 391 else 392 { 393 while (((System.currentTimeMillis() - start) < timeout) && !stopped && (retval == null)) 394 { 395 RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout); 396 Vector results = rsp.getResults(); 397 398 if (results.size() > 0) 399 { 400 retval = results.elementAt(0); 401 402 if (logger.isDebugEnabled()) 403 { 404 checkResult(rsp, retval); 405 } 406 } 407 408 if (retval == null) 409 { 410 try 411 { 412 long delay = timeout - (System.currentTimeMillis() - start); 413 414 synchronized (mutex) 415 { 416 if (delay > 0) 417 { 418 mutex.wait(delay); 419 } 420 } 421 } 422 catch (InterruptedException e) 423 { 424 } 425 } 426 } 427 } 428 429 return retval; 430 } 431 432 public String toString() 433 { 434 return internalQueue.toString(); 435 } 436 437 438 public void _add(Object value) 439 { 440 if (logger.isDebugEnabled()) 441 { 442 logger.debug(groupname + '@' + getLocalAddress() + " _add(" + value + ')'); 443 } 444 445 446 synchronized (mutex) 447 { 448 internalQueue.add(value); 449 450 451 mutex.notifyAll(); 452 } 453 454 for (int i = 0; i < notifs.size(); i++) 455 ((Notification)notifs.elementAt(i)).entryAdd(value); 456 } 457 458 public void _addAtHead(Object value) 459 { 460 461 synchronized (mutex) 462 { 463 internalQueue.addFirst(value); 464 465 466 mutex.notifyAll(); 467 } 468 469 for (int i = 0; i < notifs.size(); i++) 470 ((Notification)notifs.elementAt(i)).entryAdd(value); 471 } 472 473 public void _reset() 474 { 475 if (logger.isDebugEnabled()) 476 { 477 logger.debug(groupname + '@' + getLocalAddress() + " _reset()"); 478 } 479 480 _private_reset(); 481 482 for (int i = 0; i < notifs.size(); i++) 483 ((Notification)notifs.elementAt(i)).contentsCleared(); 484 } 485 486 protected void _private_reset() 487 { 488 489 synchronized (mutex) 490 { 491 internalQueue.clear(); 492 493 494 mutex.notifyAll(); 495 } 496 } 497 498 public Object _remove() 499 { 500 Object retval = null; 501 502 try 503 { 504 505 synchronized (mutex) 506 { 507 retval = internalQueue.removeFirst(); 508 509 510 mutex.notifyAll(); 511 } 512 513 if (logger.isDebugEnabled()) 514 { 515 logger.debug(groupname + '@' + getLocalAddress() + "_remove(" + retval + ')'); 516 } 517 518 for (int i = 0; i < notifs.size(); i++) 519 ((Notification)notifs.elementAt(i)).entryRemoved(retval); 520 } 521 catch (NoSuchElementException e) 522 { 523 logger.debug(groupname + '@' + getLocalAddress() + "_remove(): nothing to remove"); 524 } 525 526 return retval; 527 } 528 529 public void _addAll(Collection c) 530 { 531 if (logger.isDebugEnabled()) 532 { 533 logger.debug(groupname + '@' + getLocalAddress() + " _addAll(" + c + ')'); 534 } 535 536 537 synchronized (mutex) 538 { 539 internalQueue.addAll(c); 540 541 542 mutex.notifyAll(); 543 } 544 545 for (int i = 0; i < notifs.size(); i++) 546 ((Notification)notifs.elementAt(i)).contentsSet(c); 547 } 548 549 550 551 public void receive(Message msg) 552 { 553 } 554 555 public byte[] getState() 556 { 557 Vector copy = (Vector)getContents().clone(); 558 559 try 560 { 561 return Util.objectToByteBuffer(copy); 562 } 563 catch (Throwable ex) 564 { 565 logger.error("DistributedQueue.getState(): exception marshalling state.", ex); 566 567 return null; 568 } 569 } 570 571 public void setState(byte[] new_state) 572 { 573 Vector new_copy; 574 575 try 576 { 577 new_copy = (Vector)Util.objectFromByteBuffer(new_state); 578 579 if (new_copy == null) 580 { 581 return; 582 } 583 } 584 catch (Throwable ex) 585 { 586 logger.error("DistributedQueue.setState(): exception unmarshalling state.", ex); 587 588 return; 589 } 590 591 _private_reset(); _addAll(new_copy); 593 } 594 595 596 public void viewAccepted(View new_view) 597 { 598 Vector new_mbrs = new_view.getMembers(); 599 600 if (new_mbrs != null) 601 { 602 sendViewChangeNotifications(new_mbrs, members); members.removeAllElements(); 604 605 for (int i = 0; i < new_mbrs.size(); i++) 606 members.addElement(new_mbrs.elementAt(i)); 607 } 608 } 609 610 611 public void suspect(Address suspected_mbr) 612 { 613 ; 614 } 615 616 617 public void block() 618 { 619 } 620 621 void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) 622 { 623 Vector joined; 624 Vector left; 625 Object mbr; 626 Notification n; 627 628 if ((notifs.size() == 0) || (old_mbrs == null) || (new_mbrs == null) || (old_mbrs.size() == 0) || 629 (new_mbrs.size() == 0)) 630 { 631 return; 632 } 633 634 joined = new Vector(); 636 637 for (int i = 0; i < new_mbrs.size(); i++) 638 { 639 mbr = new_mbrs.elementAt(i); 640 641 if (!old_mbrs.contains(mbr)) 642 { 643 joined.addElement(mbr); 644 } 645 } 646 647 left = new Vector(); 649 650 for (int i = 0; i < old_mbrs.size(); i++) 651 { 652 mbr = old_mbrs.elementAt(i); 653 654 if (!new_mbrs.contains(mbr)) 655 { 656 left.addElement(mbr); 657 } 658 } 659 660 for (int i = 0; i < notifs.size(); i++) 661 { 662 n = (Notification)notifs.elementAt(i); 663 n.viewChange(joined, left); 664 } 665 } 666 667 void initSignatures() 668 { 669 try 670 { 671 if (add_signature == null) 672 { 673 add_signature = new Class [] { Object .class }; 674 } 675 676 if (addAtHead_signature == null) 677 { 678 addAtHead_signature = new Class [] { Object .class }; 679 } 680 681 if (addAll_signature == null) 682 { 683 addAll_signature = new Class [] { Collection.class }; 684 } 685 686 if (reset_signature == null) 687 { 688 reset_signature = new Class [0]; 689 } 690 691 if (remove_signature == null) 692 { 693 remove_signature = new Class [0]; 694 } 695 } 696 catch (Throwable ex) 697 { 698 logger.error("DistributedQueue.initMethods()", ex); 699 } 700 } 701 702 public static void main(String [] args) 703 { 704 try 705 { 706 JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/conf/total-token.xml"); 716 c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 717 718 DistributedQueue ht = new DistributedQueue(c); 719 c.connect("demo"); 720 ht.start(5000); 721 722 ht.add("name"); 723 ht.add("Michelle Ban"); 724 725 Object old_key = ht.remove(); 726 System.out.println("old key was " + old_key); 727 old_key = ht.remove(); 728 System.out.println("old value was " + old_key); 729 730 ht.add("name 'Michelle Ban'"); 731 732 System.out.println("queue is " + ht); 733 } 734 catch (Throwable t) 735 { 736 t.printStackTrace(); 737 } 738 } 739 } 740 | Popular Tags |