1 package org.jgroups.protocols.pbcast; 2 3 import org.jgroups.*; 4 import org.jgroups.annotations.GuardedBy; 5 import org.jgroups.stack.IpAddress; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.stack.StateTransferInfo; 8 import org.jgroups.util.Streamable; 9 import org.jgroups.util.Util; 10 import org.jgroups.util.Digest; 11 12 import java.io.*; 13 import java.net.*; 14 import java.util.*; 15 import java.util.concurrent.ExecutorService ; 16 import java.util.concurrent.LinkedBlockingQueue ; 17 import java.util.concurrent.ThreadFactory ; 18 import java.util.concurrent.ThreadPoolExecutor ; 19 import java.util.concurrent.TimeUnit ; 20 import java.util.concurrent.atomic.AtomicInteger ; 21 import java.util.concurrent.atomic.AtomicLong ; 22 23 64 public class STREAMING_STATE_TRANSFER extends Protocol { 65 66 private final static String NAME = "STREAMING_STATE_TRANSFER"; 67 68 private Address local_addr = null; 69 70 @GuardedBy ("members") 71 private final Vector<Address> members = new Vector<Address>(); 72 73 77 @GuardedBy ("state_requesters") 78 private final Map<String , Set<Address>> state_requesters = new HashMap<String , Set<Address>>(); 79 80 83 private boolean waiting_for_state_response = false; 84 85 89 private AtomicInteger num_state_reqs = new AtomicInteger (0); 90 91 private AtomicLong num_bytes_sent = new AtomicLong (0); 92 93 private volatile double avg_state_size = 0; 94 95 99 private InetAddress bind_addr; 100 101 private int bind_port = 0; 102 103 private int max_pool = 5; 104 105 private long pool_thread_keep_alive; 106 107 private int socket_buffer_size = 8 * 1024; 108 109 private boolean use_reading_thread; 110 111 private volatile boolean flushProtocolInStack = false; 112 113 117 private StateProviderThreadSpawner spawner; 118 119 private final AtomicLong threadCounter = new AtomicLong (0); 120 121 122 123 public STREAMING_STATE_TRANSFER(){} 124 125 public final String getName() { 126 return NAME; 127 } 128 129 public int getNumberOfStateRequests() { 130 return num_state_reqs.get(); 131 } 132 133 public long getNumberOfStateBytesSent() { 134 return num_bytes_sent.get(); 135 } 136 137 public double getAverageStateSize() { 138 return avg_state_size; 139 } 140 141 public Vector<Integer > requiredDownServices() { 142 Vector<Integer > retval = new Vector<Integer >(); 143 retval.addElement(new Integer (Event.GET_DIGEST)); 144 retval.addElement(new Integer (Event.SET_DIGEST)); 145 return retval; 146 } 147 148 public void resetStats() { 149 super.resetStats(); 150 num_state_reqs.set(0); 151 num_bytes_sent.set(0); 152 avg_state_size = 0; 153 } 154 155 public boolean setProperties(Properties props) { 156 super.setProperties(props); 157 158 String str = props.getProperty("use_flush"); 159 if(str != null){ 160 log.warn("use_flush has been deprecated and its value will be ignored"); 161 props.remove("use_flush"); 162 } 163 str = props.getProperty("flush_timeout"); 164 if(str != null){ 165 log.warn("flush_timeout has been deprecated and its value will be ignored"); 166 props.remove("flush_timeout"); 167 } 168 169 try{ 170 bind_addr = Util.parseBindAddress(props, "bind_addr"); 171 }catch(UnknownHostException e){ 172 log.error("(bind_addr): host " + e.getLocalizedMessage() + " not known"); 173 return false; 174 } 175 bind_port = Util.parseInt(props, "start_port", 0); 176 socket_buffer_size = Util.parseInt(props, "socket_buffer_size", 8 * 1024); max_pool = Util.parseInt(props, "max_pool", 5); 178 pool_thread_keep_alive = Util.parseLong(props, "pool_thread_keep_alive", 1000 * 30); use_reading_thread = Util.parseBoolean(props, "use_reading_thread", false); 180 if(!props.isEmpty()){ 181 log.error("the following properties are not recognized: " + props); 182 183 return false; 184 } 185 return true; 186 } 187 188 public void init() throws Exception {} 189 190 public void start() throws Exception { 191 HashMap map = new HashMap(); 192 map.put("state_transfer", Boolean.TRUE); 193 map.put("protocol_class", getClass().getName()); 194 up_prot.up(new Event(Event.CONFIG, map)); 195 } 196 197 public void stop() { 198 super.stop(); 199 waiting_for_state_response = false; 200 if(spawner != null){ 201 spawner.stop(); 202 } 203 } 204 205 public Object up(Event evt) { 206 switch(evt.getType()){ 207 208 case Event.MSG: 209 Message msg = (Message) evt.getArg(); 210 StateHeader hdr = (StateHeader) msg.getHeader(getName()); 211 if(hdr != null){ 212 switch(hdr.type){ 213 case StateHeader.STATE_REQ: 214 handleStateReq(hdr); 215 break; 216 case StateHeader.STATE_RSP: 217 handleStateRsp(hdr); 218 break; 219 case StateHeader.STATE_REMOVE_REQUESTER: 220 removeFromStateRequesters(hdr.sender, hdr.state_id); 221 break; 222 default: 223 if(log.isErrorEnabled()) 224 log.error("type " + hdr.type + " not known in StateHeader"); 225 break; 226 } 227 return null; 228 } 229 break; 230 231 case Event.BECOME_SERVER: 232 break; 233 234 case Event.SET_LOCAL_ADDRESS: 235 local_addr = (Address) evt.getArg(); 236 break; 237 238 case Event.TMP_VIEW: 239 case Event.VIEW_CHANGE: 240 handleViewChange((View) evt.getArg()); 241 break; 242 243 case Event.CONFIG: 244 Map config = (Map) evt.getArg(); 245 if(bind_addr == null && (config != null && config.containsKey("bind_addr"))){ 246 bind_addr = (InetAddress) config.get("bind_addr"); 247 if(log.isDebugEnabled()) 248 log.debug("using bind_addr from CONFIG event " + bind_addr); 249 } 250 if(config != null && config.containsKey("state_transfer")){ 251 log.error("Protocol stack cannot contain two state transfer protocols. Remove either one of them"); 252 } 253 break; 254 } 255 return up_prot.up(evt); 256 } 257 258 public Object down(Event evt) { 259 260 switch(evt.getType()){ 261 262 case Event.TMP_VIEW: 263 case Event.VIEW_CHANGE: 264 handleViewChange((View) evt.getArg()); 265 break; 266 267 case Event.GET_STATE: 268 StateTransferInfo info = (StateTransferInfo) evt.getArg(); 269 Address target; 270 if(info.target == null){ 271 target = determineCoordinator(); 272 }else{ 273 target = info.target; 274 if(target.equals(local_addr)){ 275 if(log.isErrorEnabled()) 276 log.error("GET_STATE: cannot fetch state from myself !"); 277 target = null; 278 } 279 } 280 if(target == null){ 281 if(log.isDebugEnabled()) 282 log.debug("GET_STATE: first member (no state)"); 283 up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo())); 284 }else{ 285 boolean successfulFlush = false; 286 if(flushProtocolInStack){ 287 Map atts = new HashMap(); 288 atts.put("timeout", new Long (4000)); 289 successfulFlush = (Boolean ) up_prot.up(new Event(Event.SUSPEND, atts)); 290 } 291 if(successfulFlush){ 292 if(log.isTraceEnabled()) 293 log.trace("Successful flush at " + local_addr); 294 }else{ 295 if(flushProtocolInStack && log.isWarnEnabled()){ 296 log.warn("Could not get successful flush from " + local_addr); 297 } 298 } 299 Message state_req = new Message(target, null, null); 300 state_req.putHeader(NAME, new StateHeader(StateHeader.STATE_REQ, local_addr, 301 info.state_id)); 302 if(log.isDebugEnabled()) 303 log.debug("GET_STATE: asking " + target + 304 " for state, passing down a SUSPEND_STABLE event, timeout=" + info.timeout); 305 306 down_prot.down(new Event(Event.SUSPEND_STABLE, new Long (info.timeout))); 307 waiting_for_state_response = true; 308 down_prot.down(new Event(Event.MSG, state_req)); 309 } 310 return null; 312 case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED: 313 if(flushProtocolInStack){ 314 up_prot.up(new Event(Event.RESUME)); 315 } 316 317 if(log.isDebugEnabled()) 318 log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received,passing down a RESUME_STABLE event"); 319 320 down_prot.down(new Event(Event.RESUME_STABLE)); 321 return null; 322 case Event.CONFIG: 323 Map config = (Map) evt.getArg(); 324 if(config != null && config.containsKey("flush_supported")){ 325 flushProtocolInStack = true; 326 } 327 break; 328 329 } 330 331 return down_prot.down(evt); } 333 334 338 339 347 private boolean isDigestNeeded() { 348 return !flushProtocolInStack; 349 } 350 351 private void respondToStateRequester(boolean open_barrier) { 352 353 if(spawner == null){ 355 ServerSocket serverSocket = Util.createServerSocket(bind_addr, bind_port); 356 spawner = new StateProviderThreadSpawner(setupThreadPool(), serverSocket); 357 new Thread (Util.getGlobalThreadGroup(), spawner, "StateProviderThreadSpawner").start(); 358 } 359 360 List<Message> responses = new LinkedList<Message>(); 361 Digest digest = null; 362 synchronized(state_requesters){ 363 if(state_requesters.isEmpty()){ 364 if(log.isWarnEnabled()) 365 log.warn("Should be responding to state requester, but there are no requesters !"); 366 if(open_barrier) 367 down_prot.down(new Event(Event.OPEN_BARRIER)); 368 return; 369 } 370 371 if(isDigestNeeded()){ 372 if(log.isDebugEnabled()) 373 log.debug("passing down GET_DIGEST"); 374 digest = (Digest) down_prot.down(Event.GET_DIGEST_EVT); 375 } 376 377 for(Map.Entry<String , Set<Address>> entry:state_requesters.entrySet()){ 378 String stateId = entry.getKey(); 379 Set<Address> requesters = entry.getValue(); 380 for(Address requester:requesters){ 381 Message state_rsp = new Message(requester); 382 StateHeader hdr = new StateHeader(StateHeader.STATE_RSP, local_addr, 383 spawner.getServerSocketAddress(), digest, 384 stateId); 385 state_rsp.putHeader(NAME, hdr); 386 responses.add(state_rsp); 387 } 388 } 389 } 390 391 if(open_barrier) 392 down_prot.down(new Event(Event.OPEN_BARRIER)); 393 394 for(Message msg:responses){ 395 if(log.isDebugEnabled()) 396 log.debug("Responding to state requester " + msg.getDest() + " with address " 397 + spawner.getServerSocketAddress() + " and digest " + digest); 398 down_prot.down(new Event(Event.MSG, msg)); 399 if(stats){ 400 num_state_reqs.incrementAndGet(); 401 } 402 } 403 } 404 405 private ThreadPoolExecutor setupThreadPool() { 406 ThreadPoolExecutor threadPool = new ThreadPoolExecutor (1, 407 max_pool, 408 pool_thread_keep_alive, 409 TimeUnit.MILLISECONDS, 410 new LinkedBlockingQueue <Runnable >(10)); 411 ThreadFactory factory = new ThreadFactory () { 412 public Thread newThread(final Runnable command) { 413 Thread thread = new Thread (Util.getGlobalThreadGroup(), command, 414 "STREAMING_STATE_TRANSFER state provider-" 415 + threadCounter.incrementAndGet()); 416 return thread; 417 } 418 }; 419 threadPool.setThreadFactory(factory); 420 return threadPool; 421 } 422 423 private Address determineCoordinator() { 424 synchronized(members){ 425 for(Address member: members){ 426 if(!local_addr.equals(member)){ 427 return member; 428 } 429 } 430 } 431 return null; 432 } 433 434 private void handleViewChange(View v) { 435 Address old_coord; 436 Vector<Address> new_members = v.getMembers(); 437 boolean send_up_null_state_rsp = false; 438 439 synchronized(members){ 440 old_coord = (!members.isEmpty() ? members.firstElement() : null); 441 members.clear(); 442 members.addAll(new_members); 443 444 if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)){ 445 send_up_null_state_rsp = true; 446 } 447 } 448 449 if(send_up_null_state_rsp){ 450 log.warn("discovered that the state provider (" + old_coord 451 + ") crashed; will return null state to application"); 452 } 453 } 454 455 private void handleStateReq(StateHeader hdr) { 456 Address sender = hdr.sender; 457 String id = hdr.state_id; 458 if(sender == null){ 459 if(log.isErrorEnabled()) 460 log.error("sender is null !"); 461 return; 462 } 463 464 synchronized(state_requesters){ 465 Set<Address> requesters = state_requesters.get(id); 466 if(requesters == null){ 467 requesters = new HashSet<Address>(); 468 } 469 requesters.add(sender); 470 state_requesters.put(id, requesters); 471 } 472 473 if(isDigestNeeded()) { 475 down_prot.down(new Event(Event.CLOSE_BARRIER)); } 480 try{ 481 respondToStateRequester(isDigestNeeded()); 482 }catch(Throwable t){ 483 if(log.isErrorEnabled()) 484 log.error("failed fetching state from application", t); 485 if(isDigestNeeded()) 486 down_prot.down(new Event(Event.OPEN_BARRIER)); 487 } 488 } 489 490 void handleStateRsp(StateHeader hdr) { 491 Digest tmp_digest = hdr.my_digest; 492 493 waiting_for_state_response = false; 494 if(isDigestNeeded()){ 495 if(tmp_digest == null){ 496 if(log.isWarnEnabled()) 497 log.warn("digest received from " + hdr.sender 498 + " is null, skipping setting digest !"); 499 }else{ 500 down_prot.down(new Event(Event.SET_DIGEST, tmp_digest)); 501 } 502 } 503 connectToStateProvider(hdr); 504 } 505 506 void removeFromStateRequesters(Address address, String state_id) { 507 synchronized(state_requesters){ 508 Set<Address> requesters = state_requesters.get(state_id); 509 if(requesters != null && !requesters.isEmpty()){ 510 boolean removed = requesters.remove(address); 511 if(log.isDebugEnabled()){ 512 log.debug("Attempted to clear " + address + " from requesters, successful=" 513 + removed); 514 } 515 if(requesters.isEmpty()){ 516 state_requesters.remove(state_id); 517 if(log.isDebugEnabled()){ 518 log.debug("Cleared all requesters for state " + state_id 519 + ",state_requesters=" + state_requesters); 520 } 521 } 522 } 523 } 524 } 525 526 private void connectToStateProvider(StateHeader hdr) { 527 IpAddress address = hdr.bind_addr; 528 String tmp_state_id = hdr.getStateId(); 529 StreamingInputStreamWrapper wrapper = null; 530 StateTransferInfo sti = null; 531 Socket socket = new Socket(); 532 try{ 533 socket.bind(new InetSocketAddress(bind_addr, 0)); 534 int bufferSize = socket.getReceiveBufferSize(); 535 socket.setReceiveBufferSize(socket_buffer_size); 536 if(log.isDebugEnabled()) 537 log.debug("Connecting to state provider " + address.getIpAddress() + ":" 538 + address.getPort() + ", original buffer size was " + bufferSize 539 + " and was reset to " + socket.getReceiveBufferSize()); 540 socket.connect(new InetSocketAddress(address.getIpAddress(), address.getPort())); 541 if(log.isDebugEnabled()) 542 log.debug("Connected to state provider, my end of the socket is " 543 + socket.getLocalAddress() + ":" + socket.getLocalPort() 544 + " passing inputstream up..."); 545 546 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); 549 out.writeObject(tmp_state_id); 550 out.writeObject(local_addr); 551 552 wrapper = new StreamingInputStreamWrapper(socket); 553 sti = new StateTransferInfo(hdr.sender, wrapper, tmp_state_id); 554 }catch(IOException e){ 555 if(log.isWarnEnabled()){ 556 log.warn("State reader socket thread spawned abnormaly", e); 557 } 558 559 InputStream is = null; 561 sti = new StateTransferInfo(hdr.sender, is, tmp_state_id); 562 }finally{ 563 if(!socket.isConnected()){ 564 if(log.isWarnEnabled()) 565 log.warn("Could not connect to state provider. Closing socket..."); 566 try{ 567 if(wrapper != null){ 568 wrapper.close(); 569 }else{ 570 socket.close(); 571 } 572 573 }catch(IOException e){ 574 } 575 Message m = new Message(hdr.sender); 578 StateHeader mhdr = new StateHeader(StateHeader.STATE_REMOVE_REQUESTER, local_addr, 579 tmp_state_id); 580 m.putHeader(NAME, mhdr); 581 down_prot.down(new Event(Event.MSG, m)); 582 } 583 passStreamUp(sti); 584 } 585 } 586 587 private void passStreamUp(final StateTransferInfo sti) { 588 Runnable readingThread = new Runnable () { 589 public void run() { 590 up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); 591 } 592 }; 593 if(use_reading_thread){ 594 new Thread (Util.getGlobalThreadGroup(), readingThread, 595 "STREAMING_STATE_TRANSFER.reader").start(); 596 597 }else{ 598 readingThread.run(); 599 } 600 } 601 602 606 607 private class StateProviderThreadSpawner implements Runnable { 608 ExecutorService pool; 609 610 ServerSocket serverSocket; 611 612 IpAddress address; 613 614 Thread runner; 615 616 volatile boolean running = true; 617 618 public StateProviderThreadSpawner(ExecutorService pool,ServerSocket stateServingSocket){ 619 super(); 620 this.pool = pool; 621 this.serverSocket = stateServingSocket; 622 this.address = new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr, 623 serverSocket.getLocalPort()); 624 } 625 626 public void run() { 627 runner = Thread.currentThread(); 628 for(;running;){ 629 try{ 630 if(log.isDebugEnabled()) 631 log.debug("StateProviderThreadSpawner listening at " 632 + getServerSocketAddress() + "..."); 633 634 final Socket socket = serverSocket.accept(); 635 pool.execute(new Runnable () { 636 public void run() { 637 if(log.isDebugEnabled()) 638 log.debug("Accepted request for state transfer from " 639 + socket.getInetAddress() + ":" + socket.getPort() 640 + " handing of to PooledExecutor thread"); 641 new StateProviderHandler().process(socket); 642 } 643 }); 644 645 }catch(IOException e){ 646 if(log.isWarnEnabled()){ 647 if(serverSocket != null && !serverSocket.isClosed()){ 650 log.warn("Spawning socket from server socket finished abnormaly", e); 651 } 652 } 653 } 654 } 655 } 656 657 public IpAddress getServerSocketAddress() { 658 return address; 659 } 660 661 public void stop() { 662 running = false; 663 try{ 664 if(serverSocket != null && !serverSocket.isClosed()){ 665 serverSocket.close(); 666 } 667 }catch(IOException e){ 668 }finally{ 669 if(log.isDebugEnabled()) 670 log.debug("Waiting for StateProviderThreadSpawner to die ... "); 671 672 if(runner != null){ 673 try{ 674 runner.join(3000); 675 }catch(InterruptedException ignored){ 676 Thread.currentThread().interrupt(); 677 } 678 } 679 680 if(log.isDebugEnabled()) 681 log.debug("Shutting the thread pool down... "); 682 683 pool.shutdownNow(); 684 try{ 685 pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, 686 TimeUnit.MILLISECONDS); 687 }catch(InterruptedException ignored){ 688 Thread.currentThread().interrupt(); 689 } 690 } 691 if(log.isDebugEnabled()) 692 log.debug("Thread pool is shutdown. All pool threads are cleaned up."); 693 } 694 } 695 696 private class StateProviderHandler { 697 public void process(Socket socket) { 698 StreamingOutputStreamWrapper wrapper = null; 699 ObjectInputStream ois = null; 700 try{ 701 int bufferSize = socket.getSendBufferSize(); 702 socket.setSendBufferSize(socket_buffer_size); 703 if(log.isDebugEnabled()) 704 log.debug("Running on " + Thread.currentThread() 705 + ". Accepted request for state transfer from " 706 + socket.getInetAddress() + ":" + socket.getPort() 707 + ", original buffer size was " + bufferSize + " and was reset to " 708 + socket.getSendBufferSize() + ", passing outputstream up... "); 709 710 ois = new ObjectInputStream(socket.getInputStream()); 713 String state_id = (String ) ois.readObject(); 714 Address stateRequester = (Address) ois.readObject(); 715 removeFromStateRequesters(stateRequester, state_id); 716 717 wrapper = new StreamingOutputStreamWrapper(socket); 718 StateTransferInfo sti = new StateTransferInfo(stateRequester, wrapper, state_id); 719 up_prot.up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti)); 720 }catch(IOException e){ 721 if(log.isWarnEnabled()){ 722 log.warn("State writer socket thread spawned abnormaly", e); 723 } 724 }catch(ClassNotFoundException e){ 725 }finally{ 728 if(socket != null && !socket.isConnected()){ 729 if(log.isWarnEnabled()) 730 log.warn("Accepted request for state transfer but socket " + socket 731 + " not connected properly. Closing it..."); 732 try{ 733 if(wrapper != null){ 734 wrapper.close(); 735 }else{ 736 socket.close(); 737 } 738 }catch(IOException e){ 739 } 740 } 741 } 742 } 743 } 744 745 private class StreamingInputStreamWrapper extends InputStream { 746 747 private Socket inputStreamOwner; 748 749 private InputStream delegate; 750 751 private Channel channelOwner; 752 753 public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException{ 754 super(); 755 this.inputStreamOwner = inputStreamOwner; 756 this.delegate = new BufferedInputStream(inputStreamOwner.getInputStream()); 757 this.channelOwner = stack.getChannel(); 758 } 759 760 public int available() throws IOException { 761 return delegate.available(); 762 } 763 764 public void close() throws IOException { 765 if(log.isDebugEnabled()){ 766 log.debug("State reader " + inputStreamOwner + " is closing the socket "); 767 } 768 if(channelOwner != null && channelOwner.isConnected()){ 769 channelOwner.down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); 770 } 771 inputStreamOwner.close(); 772 } 773 774 public synchronized void mark(int readlimit) { 775 delegate.mark(readlimit); 776 } 777 778 public boolean markSupported() { 779 return delegate.markSupported(); 780 } 781 782 public int read() throws IOException { 783 return delegate.read(); 784 } 785 786 public int read(byte[] b, int off, int len) throws IOException { 787 return delegate.read(b, off, len); 788 } 789 790 public int read(byte[] b) throws IOException { 791 return delegate.read(b); 792 } 793 794 public synchronized void reset() throws IOException { 795 delegate.reset(); 796 } 797 798 public long skip(long n) throws IOException { 799 return delegate.skip(n); 800 } 801 } 802 803 private class StreamingOutputStreamWrapper extends OutputStream { 804 private Socket outputStreamOwner; 805 806 private OutputStream delegate; 807 808 private long bytesWrittenCounter = 0; 809 810 private Channel channelOwner; 811 812 public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException{ 813 super(); 814 this.outputStreamOwner = outputStreamOwner; 815 this.delegate = new BufferedOutputStream(outputStreamOwner.getOutputStream()); 816 this.channelOwner = stack.getChannel(); 817 } 818 819 public void close() throws IOException { 820 if(log.isDebugEnabled()){ 821 log.debug("State writer " + outputStreamOwner + " is closing the socket "); 822 } 823 try{ 824 if(channelOwner != null && channelOwner.isConnected()){ 825 channelOwner.down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); 826 } 827 outputStreamOwner.close(); 828 }catch(IOException e){ 829 throw e; 830 }finally{ 831 if(stats){ 832 avg_state_size = num_bytes_sent.addAndGet(bytesWrittenCounter) / num_state_reqs.doubleValue(); 833 } 834 } 835 } 836 837 public void flush() throws IOException { 838 delegate.flush(); 839 } 840 841 public void write(byte[] b, int off, int len) throws IOException { 842 delegate.write(b, off, len); 843 bytesWrittenCounter += len; 844 } 845 846 public void write(byte[] b) throws IOException { 847 delegate.write(b); 848 if(b != null){ 849 bytesWrittenCounter += b.length; 850 } 851 } 852 853 public void write(int b) throws IOException { 854 delegate.write(b); 855 bytesWrittenCounter += 1; 856 } 857 } 858 859 public static class StateHeader extends Header implements Streamable { 860 public static final byte STATE_REQ = 1; 861 862 public static final byte STATE_RSP = 2; 863 864 public static final byte STATE_REMOVE_REQUESTER = 3; 865 866 long id = 0; 869 byte type = 0; 870 871 Address sender; 873 Digest my_digest = null; 875 IpAddress bind_addr = null; 876 877 String state_id = null; 879 public StateHeader(){ } 881 882 public StateHeader(byte type,Address sender,String state_id){ 883 this.type = type; 884 this.sender = sender; 885 this.state_id = state_id; 886 } 887 888 public StateHeader(byte type,Address sender,long id,Digest digest){ 889 this.type = type; 890 this.sender = sender; 891 this.id = id; 892 this.my_digest = digest; 893 } 894 895 public StateHeader( 896 byte type, 897 Address sender, 898 IpAddress bind_addr, 899 Digest digest, 900 String state_id){ 901 this.type = type; 902 this.sender = sender; 903 this.my_digest = digest; 904 this.bind_addr = bind_addr; 905 this.state_id = state_id; 906 } 907 908 public int getType() { 909 return type; 910 } 911 912 public Digest getDigest() { 913 return my_digest; 914 } 915 916 public String getStateId() { 917 return state_id; 918 } 919 920 public boolean equals(Object o) { 921 StateHeader other; 922 923 if(sender != null && o != null){ 924 if(!(o instanceof StateHeader)) 925 return false; 926 other = (StateHeader) o; 927 return sender.equals(other.sender) && id == other.id; 928 } 929 return false; 930 } 931 932 public int hashCode() { 933 if(sender != null) 934 return sender.hashCode() + (int) id; 935 else 936 return (int) id; 937 } 938 939 public String toString() { 940 StringBuilder sb = new StringBuilder (); 941 sb.append("type=").append(type2Str(type)); 942 if(sender != null) 943 sb.append(", sender=").append(sender).append(" id=").append(id); 944 if(my_digest != null) 945 sb.append(", digest=").append(my_digest); 946 return sb.toString(); 947 } 948 949 static String type2Str(int t) { 950 switch(t){ 951 case STATE_REQ: 952 return "STATE_REQ"; 953 case STATE_RSP: 954 return "STATE_RSP"; 955 case STATE_REMOVE_REQUESTER: 956 return "STATE_REMOVE_REQUESTER"; 957 default: 958 return "<unknown>"; 959 } 960 } 961 962 public void writeExternal(ObjectOutput out) throws IOException { 963 out.writeObject(sender); 964 out.writeLong(id); 965 out.writeByte(type); 966 out.writeObject(my_digest); 967 out.writeObject(bind_addr); 968 out.writeUTF(state_id); 969 } 970 971 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 972 sender = (Address) in.readObject(); 973 id = in.readLong(); 974 type = in.readByte(); 975 my_digest = (Digest) in.readObject(); 976 bind_addr = (IpAddress) in.readObject(); 977 state_id = in.readUTF(); 978 } 979 980 public void writeTo(DataOutputStream out) throws IOException { 981 out.writeByte(type); 982 out.writeLong(id); 983 Util.writeAddress(sender, out); 984 Util.writeStreamable(my_digest, out); 985 Util.writeStreamable(bind_addr, out); 986 Util.writeString(state_id, out); 987 } 988 989 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , 990 InstantiationException { 991 type = in.readByte(); 992 id = in.readLong(); 993 sender = Util.readAddress(in); 994 my_digest = (Digest) Util.readStreamable(Digest.class, in); 995 bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, in); 996 state_id = Util.readString(in); 997 } 998 999 public int size() { 1000 int retval = Global.LONG_SIZE + Global.BYTE_SIZE; 1002 retval += Util.size(sender); 1003 1004 retval += Global.BYTE_SIZE; if(my_digest != null) 1006 retval += my_digest.serializedSize(); 1007 1008 retval += Global.BYTE_SIZE; if(state_id != null) 1010 retval += state_id.length() + 2; 1011 return retval; 1012 } 1013 } 1014} 1015 | Popular Tags |