KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > STREAMING_STATE_TRANSFER


1 package org.jgroups.protocols.pbcast;
2
3 import org.jgroups.*;
4 import org.jgroups.annotations.GuardedBy;
5 import org.jgroups.stack.IpAddress;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.stack.StateTransferInfo;
8 import org.jgroups.util.Streamable;
9 import org.jgroups.util.Util;
10 import org.jgroups.util.Digest;
11
12 import java.io.*;
13 import java.net.*;
14 import java.util.*;
15 import java.util.concurrent.ExecutorService JavaDoc;
16 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
17 import java.util.concurrent.ThreadFactory JavaDoc;
18 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
19 import java.util.concurrent.TimeUnit JavaDoc;
20 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
21 import java.util.concurrent.atomic.AtomicLong JavaDoc;
22
23 /**
24  * <code>STREAMING_STATE_TRANSFER</code>, as its name implies, allows a
25  * streaming state transfer between two channel instances.
26  *
27  * <p>
28  *
29  * Major advantage of this approach is that transfering application state to a
30  * joining member of a group does not entail loading of the complete application
31  * state into memory. Application state, for example, might be located entirely
32  * on some form of disk based storage. The default <code>STATE_TRANSFER</code>
33  * requires this state to be loaded entirely into memory before being
34  * transferred to a group member while <code>STREAMING_STATE_TRANSFER</code>
35  * does not. Thus <code>STREAMING_STATE_TRANSFER</code> protocol is able to
36  * transfer application state that is very large (>1Gb) without a likelihood of
37  * such transfer resulting in OutOfMemoryException.
38  *
39  * <p>
40  *
41  * Channel instance can be configured with either
42  * <code>STREAMING_STATE_TRANSFER</code> or <code>STATE_TRANSFER</code> but
43  * not both protocols at the same time.
44  *
45  * <p>
46  *
47  * In order to process streaming state transfer an application has to implement
48  * <code>ExtendedMessageListener</code> if it is using channel in a push style
49  * mode or it has to process <code>StreamingSetStateEvent</code> and
50  * <code>StreamingGetStateEvent</code> if it is using channel in a pull style
51  * mode.
52  *
53  *
54  * @author Vladimir Blagojevic
55  * @see org.jgroups.ExtendedMessageListener
56  * @see org.jgroups.StreamingGetStateEvent
57  * @see org.jgroups.StreamingSetStateEvent
58  * @see org.jgroups.protocols.pbcast.STATE_TRANSFER
59  * @since 2.4
60  *
61  * @version $Id$
62  *
63  */

64 public class STREAMING_STATE_TRANSFER extends Protocol {
65     
66     private final static String JavaDoc NAME = "STREAMING_STATE_TRANSFER";
67     
68     private Address local_addr = null;
69
70     @GuardedBy ("members")
71     private final Vector<Address> members = new Vector<Address>();
72
73     /*
74      * key is state id and value is set of state requesters for that state id
75      * has contents only for state provider member
76      */

77     @GuardedBy ("state_requesters")
78     private final Map<String JavaDoc, Set<Address>> state_requesters = new HashMap<String JavaDoc, Set<Address>>();
79
80     /*
81      * set to true while waiting for a STATE_RSP
82      */

83     private boolean waiting_for_state_response = false;
84
85     /*
86      * JMX statistics
87      *
88      */

89     private AtomicInteger JavaDoc num_state_reqs = new AtomicInteger JavaDoc(0);
90
91     private AtomicLong JavaDoc num_bytes_sent = new AtomicLong JavaDoc(0);
92
93     private volatile double avg_state_size = 0;
94
95     /*
96      * properties
97      *
98      */

99     private InetAddress bind_addr;
100
101     private int bind_port = 0;
102
103     private int max_pool = 5;
104
105     private long pool_thread_keep_alive;
106
107     private int socket_buffer_size = 8 * 1024;
108
109     private boolean use_reading_thread;
110
111     private volatile boolean flushProtocolInStack = false;
112     
113     /*
114      * plumbing to provide state
115      *
116      */

117     private StateProviderThreadSpawner spawner;
118     
119     private final AtomicLong JavaDoc threadCounter = new AtomicLong JavaDoc(0);
120     
121     
122
123     public STREAMING_STATE_TRANSFER(){}
124
125     public final String JavaDoc getName() {
126         return NAME;
127     }
128
129     public int getNumberOfStateRequests() {
130         return num_state_reqs.get();
131     }
132
133     public long getNumberOfStateBytesSent() {
134         return num_bytes_sent.get();
135     }
136
137     public double getAverageStateSize() {
138         return avg_state_size;
139     }
140
141     public Vector<Integer JavaDoc> requiredDownServices() {
142         Vector<Integer JavaDoc> retval = new Vector<Integer JavaDoc>();
143         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
144         retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
145         return retval;
146     }
147
148     public void resetStats() {
149         super.resetStats();
150         num_state_reqs.set(0);
151         num_bytes_sent.set(0);
152         avg_state_size = 0;
153     }
154
155     public boolean setProperties(Properties props) {
156         super.setProperties(props);
157
158         String JavaDoc str = props.getProperty("use_flush");
159         if(str != null){
160             log.warn("use_flush has been deprecated and its value will be ignored");
161             props.remove("use_flush");
162         }
163         str = props.getProperty("flush_timeout");
164         if(str != null){
165             log.warn("flush_timeout has been deprecated and its value will be ignored");
166             props.remove("flush_timeout");
167         }
168
169         try{
170             bind_addr = Util.parseBindAddress(props, "bind_addr");
171         }catch(UnknownHostException e){
172             log.error("(bind_addr): host " + e.getLocalizedMessage() + " not known");
173             return false;
174         }
175         bind_port = Util.parseInt(props, "start_port", 0);
176         socket_buffer_size = Util.parseInt(props, "socket_buffer_size", 8 * 1024); // 8K
177
max_pool = Util.parseInt(props, "max_pool", 5);
178         pool_thread_keep_alive = Util.parseLong(props, "pool_thread_keep_alive", 1000 * 30); //30sec
179
use_reading_thread = Util.parseBoolean(props, "use_reading_thread", false);
180         if(!props.isEmpty()){
181             log.error("the following properties are not recognized: " + props);
182
183             return false;
184         }
185         return true;
186     }
187
188     public void init() throws Exception JavaDoc {}
189
190     public void start() throws Exception JavaDoc {
191         HashMap map = new HashMap();
192         map.put("state_transfer", Boolean.TRUE);
193         map.put("protocol_class", getClass().getName());
194         up_prot.up(new Event(Event.CONFIG, map));
195     }
196
197     public void stop() {
198         super.stop();
199         waiting_for_state_response = false;
200         if(spawner != null){
201             spawner.stop();
202         }
203     }
204
205     public Object JavaDoc up(Event evt) {
206         switch(evt.getType()){
207         
208         case Event.MSG:
209             Message msg = (Message) evt.getArg();
210             StateHeader hdr = (StateHeader) msg.getHeader(getName());
211             if(hdr != null){
212                 switch(hdr.type){
213                 case StateHeader.STATE_REQ:
214                     handleStateReq(hdr);
215                     break;
216                 case StateHeader.STATE_RSP:
217                     handleStateRsp(hdr);
218                     break;
219                 case StateHeader.STATE_REMOVE_REQUESTER:
220                     removeFromStateRequesters(hdr.sender, hdr.state_id);
221                     break;
222                 default:
223                     if(log.isErrorEnabled())
224                         log.error("type " + hdr.type + " not known in StateHeader");
225                     break;
226                 }
227                 return null;
228             }
229             break;
230             
231         case Event.BECOME_SERVER:
232             break;
233
234         case Event.SET_LOCAL_ADDRESS:
235             local_addr = (Address) evt.getArg();
236             break;
237
238         case Event.TMP_VIEW:
239         case Event.VIEW_CHANGE:
240             handleViewChange((View) evt.getArg());
241             break;
242
243         case Event.CONFIG:
244             Map config = (Map) evt.getArg();
245             if(bind_addr == null && (config != null && config.containsKey("bind_addr"))){
246                 bind_addr = (InetAddress) config.get("bind_addr");
247                 if(log.isDebugEnabled())
248                     log.debug("using bind_addr from CONFIG event " + bind_addr);
249             }
250             if(config != null && config.containsKey("state_transfer")){
251                 log.error("Protocol stack cannot contain two state transfer protocols. Remove either one of them");
252             }
253             break;
254         }
255         return up_prot.up(evt);
256     }
257
258     public Object JavaDoc down(Event evt) {
259
260         switch(evt.getType()){
261
262         case Event.TMP_VIEW:
263         case Event.VIEW_CHANGE:
264             handleViewChange((View) evt.getArg());
265             break;
266
267         case Event.GET_STATE:
268             StateTransferInfo info = (StateTransferInfo) evt.getArg();
269             Address target;
270             if(info.target == null){
271                 target = determineCoordinator();
272             }else{
273                 target = info.target;
274                 if(target.equals(local_addr)){
275                     if(log.isErrorEnabled())
276                         log.error("GET_STATE: cannot fetch state from myself !");
277                     target = null;
278                 }
279             }
280             if(target == null){
281                 if(log.isDebugEnabled())
282                     log.debug("GET_STATE: first member (no state)");
283                 up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo()));
284             }else{
285                 boolean successfulFlush = false;
286                 if(flushProtocolInStack){
287                     Map atts = new HashMap();
288                     atts.put("timeout", new Long JavaDoc(4000));
289                     successfulFlush = (Boolean JavaDoc) up_prot.up(new Event(Event.SUSPEND, atts));
290                 }
291                 if(successfulFlush){
292                     if(log.isTraceEnabled())
293                         log.trace("Successful flush at " + local_addr);
294                 }else{
295                     if(flushProtocolInStack && log.isWarnEnabled()){
296                         log.warn("Could not get successful flush from " + local_addr);
297                     }
298                 }
299                 Message state_req = new Message(target, null, null);
300                 state_req.putHeader(NAME, new StateHeader(StateHeader.STATE_REQ, local_addr,
301                                                             info.state_id));
302                 if(log.isDebugEnabled())
303                     log.debug("GET_STATE: asking " + target +
304                               " for state, passing down a SUSPEND_STABLE event, timeout=" + info.timeout);
305                     
306                 down_prot.down(new Event(Event.SUSPEND_STABLE, new Long JavaDoc(info.timeout)));
307                 waiting_for_state_response = true;
308                 down_prot.down(new Event(Event.MSG, state_req));
309             }
310             return null; // don't pass down any further !
311

312         case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED:
313             if(flushProtocolInStack){
314                 up_prot.up(new Event(Event.RESUME));
315             }
316
317             if(log.isDebugEnabled())
318                 log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received,passing down a RESUME_STABLE event");
319         
320             down_prot.down(new Event(Event.RESUME_STABLE));
321             return null;
322         case Event.CONFIG:
323             Map config = (Map) evt.getArg();
324             if(config != null && config.containsKey("flush_supported")){
325                 flushProtocolInStack = true;
326             }
327             break;
328
329         }
330
331         return down_prot.down(evt); // pass on to the layer below us
332
}
333
334     /*
335      * --------------------------- Private Methods
336      * --------------------------------
337      */

338
339     /**
340      * When FLUSH is used we do not need to pass digests between members
341      *
342      * see JGroups/doc/design/PArtialStateTransfer.txt see
343      * JGroups/doc/design/FLUSH.txt
344      *
345      * @return true if use of digests is required, false otherwise
346      */

347     private boolean isDigestNeeded() {
348         return !flushProtocolInStack;
349     }
350
351     private void respondToStateRequester(boolean open_barrier) {
352
353         // setup the plumbing if needed
354
if(spawner == null){
355             ServerSocket serverSocket = Util.createServerSocket(bind_addr, bind_port);
356             spawner = new StateProviderThreadSpawner(setupThreadPool(), serverSocket);
357             new Thread JavaDoc(Util.getGlobalThreadGroup(), spawner, "StateProviderThreadSpawner").start();
358         }
359
360         List<Message> responses = new LinkedList<Message>();
361         Digest digest = null;
362         synchronized(state_requesters){
363             if(state_requesters.isEmpty()){
364                 if(log.isWarnEnabled())
365                     log.warn("Should be responding to state requester, but there are no requesters !");
366                 if(open_barrier)
367                     down_prot.down(new Event(Event.OPEN_BARRIER));
368                 return;
369             }
370
371             if(isDigestNeeded()){
372                 if(log.isDebugEnabled())
373                     log.debug("passing down GET_DIGEST");
374                 digest = (Digest) down_prot.down(Event.GET_DIGEST_EVT);
375             }
376
377             for(Map.Entry<String JavaDoc, Set<Address>> entry:state_requesters.entrySet()){
378                 String JavaDoc stateId = entry.getKey();
379                 Set<Address> requesters = entry.getValue();
380                 for(Address requester:requesters){
381                     Message state_rsp = new Message(requester);
382                     StateHeader hdr = new StateHeader(StateHeader.STATE_RSP, local_addr,
383                                                         spawner.getServerSocketAddress(), digest,
384                                                         stateId);
385                     state_rsp.putHeader(NAME, hdr);
386                     responses.add(state_rsp);
387                 }
388             }
389         }
390
391         if(open_barrier)
392             down_prot.down(new Event(Event.OPEN_BARRIER));
393
394         for(Message msg:responses){
395             if(log.isDebugEnabled())
396                 log.debug("Responding to state requester " + msg.getDest() + " with address "
397                         + spawner.getServerSocketAddress() + " and digest " + digest);
398             down_prot.down(new Event(Event.MSG, msg));
399             if(stats){
400                 num_state_reqs.incrementAndGet();
401             }
402         }
403     }
404
405     private ThreadPoolExecutor JavaDoc setupThreadPool() {
406         ThreadPoolExecutor JavaDoc threadPool = new ThreadPoolExecutor JavaDoc(1,
407                                                                max_pool,
408                                                                pool_thread_keep_alive,
409                                                                TimeUnit.MILLISECONDS,
410                                                                new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>(10));
411         ThreadFactory JavaDoc factory = new ThreadFactory JavaDoc() {
412             public Thread JavaDoc newThread(final Runnable JavaDoc command) {
413                 Thread JavaDoc thread = new Thread JavaDoc(Util.getGlobalThreadGroup(), command,
414                                             "STREAMING_STATE_TRANSFER state provider-"
415                                                     + threadCounter.incrementAndGet());
416                 return thread;
417             }
418         };
419         threadPool.setThreadFactory(factory);
420         return threadPool;
421     }
422
423     private Address determineCoordinator() {
424         synchronized(members){
425             for(Address member: members){
426                 if(!local_addr.equals(member)){
427                     return member;
428                 }
429             }
430         }
431         return null;
432     }
433
434     private void handleViewChange(View v) {
435         Address old_coord;
436         Vector<Address> new_members = v.getMembers();
437         boolean send_up_null_state_rsp = false;
438
439         synchronized(members){
440             old_coord = (!members.isEmpty() ? members.firstElement() : null);
441             members.clear();
442             members.addAll(new_members);
443             
444             if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)){
445                 send_up_null_state_rsp = true;
446             }
447         }
448
449         if(send_up_null_state_rsp){
450             log.warn("discovered that the state provider (" + old_coord
451                     + ") crashed; will return null state to application");
452         }
453     }
454
455     private void handleStateReq(StateHeader hdr) {
456         Address sender = hdr.sender;
457         String JavaDoc id = hdr.state_id;
458         if(sender == null){
459             if(log.isErrorEnabled())
460                 log.error("sender is null !");
461             return;
462         }
463
464         synchronized(state_requesters){
465             Set<Address> requesters = state_requesters.get(id);
466             if(requesters == null){
467                 requesters = new HashSet<Address>();
468             }
469             requesters.add(sender);
470             state_requesters.put(id, requesters);
471         }
472
473         if(isDigestNeeded()) // FLUSH protocol is not present
474
{
475             down_prot.down(new Event(Event.CLOSE_BARRIER)); // drain (and block)
476
// incoming msgs
477
// until after state
478
// has been returned
479
}
480         try{
481             respondToStateRequester(isDigestNeeded());
482         }catch(Throwable JavaDoc t){
483             if(log.isErrorEnabled())
484                 log.error("failed fetching state from application", t);
485             if(isDigestNeeded())
486                 down_prot.down(new Event(Event.OPEN_BARRIER));
487         }
488     }
489
490     void handleStateRsp(StateHeader hdr) {
491         Digest tmp_digest = hdr.my_digest;
492
493         waiting_for_state_response = false;
494         if(isDigestNeeded()){
495             if(tmp_digest == null){
496                 if(log.isWarnEnabled())
497                     log.warn("digest received from " + hdr.sender
498                             + " is null, skipping setting digest !");
499             }else{
500                 down_prot.down(new Event(Event.SET_DIGEST, tmp_digest));
501             }
502         }
503         connectToStateProvider(hdr);
504     }
505
506     void removeFromStateRequesters(Address address, String JavaDoc state_id) {
507         synchronized(state_requesters){
508             Set<Address> requesters = state_requesters.get(state_id);
509             if(requesters != null && !requesters.isEmpty()){
510                 boolean removed = requesters.remove(address);
511                 if(log.isDebugEnabled()){
512                     log.debug("Attempted to clear " + address + " from requesters, successful="
513                             + removed);
514                 }
515                 if(requesters.isEmpty()){
516                     state_requesters.remove(state_id);
517                     if(log.isDebugEnabled()){
518                         log.debug("Cleared all requesters for state " + state_id
519                                 + ",state_requesters=" + state_requesters);
520                     }
521                 }
522             }
523         }
524     }
525
526     private void connectToStateProvider(StateHeader hdr) {
527         IpAddress address = hdr.bind_addr;
528         String JavaDoc tmp_state_id = hdr.getStateId();
529         StreamingInputStreamWrapper wrapper = null;
530         StateTransferInfo sti = null;
531         Socket socket = new Socket();
532         try{
533             socket.bind(new InetSocketAddress(bind_addr, 0));
534             int bufferSize = socket.getReceiveBufferSize();
535             socket.setReceiveBufferSize(socket_buffer_size);
536             if(log.isDebugEnabled())
537                 log.debug("Connecting to state provider " + address.getIpAddress() + ":"
538                         + address.getPort() + ", original buffer size was " + bufferSize
539                         + " and was reset to " + socket.getReceiveBufferSize());
540             socket.connect(new InetSocketAddress(address.getIpAddress(), address.getPort()));
541             if(log.isDebugEnabled())
542                 log.debug("Connected to state provider, my end of the socket is "
543                         + socket.getLocalAddress() + ":" + socket.getLocalPort()
544                         + " passing inputstream up...");
545
546             // write out our state_id and address so state provider can clear
547
// this request
548
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
549             out.writeObject(tmp_state_id);
550             out.writeObject(local_addr);
551
552             wrapper = new StreamingInputStreamWrapper(socket);
553             sti = new StateTransferInfo(hdr.sender, wrapper, tmp_state_id);
554         }catch(IOException e){
555             if(log.isWarnEnabled()){
556                 log.warn("State reader socket thread spawned abnormaly", e);
557             }
558
559             // pass null stream up so that JChannel.getState() returns false
560
InputStream is = null;
561             sti = new StateTransferInfo(hdr.sender, is, tmp_state_id);
562         }finally{
563             if(!socket.isConnected()){
564                 if(log.isWarnEnabled())
565                     log.warn("Could not connect to state provider. Closing socket...");
566                 try{
567                     if(wrapper != null){
568                         wrapper.close();
569                     }else{
570                         socket.close();
571                     }
572
573                 }catch(IOException e){
574                 }
575                 // since socket did not connect properly we have to
576
// clear our entry in state providers hashmap "manually"
577
Message m = new Message(hdr.sender);
578                 StateHeader mhdr = new StateHeader(StateHeader.STATE_REMOVE_REQUESTER, local_addr,
579                                                     tmp_state_id);
580                 m.putHeader(NAME, mhdr);
581                 down_prot.down(new Event(Event.MSG, m));
582             }
583             passStreamUp(sti);
584         }
585     }
586
587     private void passStreamUp(final StateTransferInfo sti) {
588         Runnable JavaDoc readingThread = new Runnable JavaDoc() {
589             public void run() {
590                 up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti));
591             }
592         };
593         if(use_reading_thread){
594             new Thread JavaDoc(Util.getGlobalThreadGroup(), readingThread,
595                         "STREAMING_STATE_TRANSFER.reader").start();
596
597         }else{
598             readingThread.run();
599         }
600     }
601
602     /*
603      * ------------------------ End of Private Methods
604      * ------------------------------
605      */

606
607     private class StateProviderThreadSpawner implements Runnable JavaDoc {
608         ExecutorService JavaDoc pool;
609
610         ServerSocket serverSocket;
611
612         IpAddress address;
613
614         Thread JavaDoc runner;
615
616         volatile boolean running = true;
617
618         public StateProviderThreadSpawner(ExecutorService JavaDoc pool,ServerSocket stateServingSocket){
619             super();
620             this.pool = pool;
621             this.serverSocket = stateServingSocket;
622             this.address = new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr,
623                                             serverSocket.getLocalPort());
624         }
625
626         public void run() {
627             runner = Thread.currentThread();
628             for(;running;){
629                 try{
630                     if(log.isDebugEnabled())
631                         log.debug("StateProviderThreadSpawner listening at "
632                                 + getServerSocketAddress() + "...");
633
634                     final Socket socket = serverSocket.accept();
635                     pool.execute(new Runnable JavaDoc() {
636                         public void run() {
637                             if(log.isDebugEnabled())
638                                 log.debug("Accepted request for state transfer from "
639                                         + socket.getInetAddress() + ":" + socket.getPort()
640                                         + " handing of to PooledExecutor thread");
641                             new StateProviderHandler().process(socket);
642                         }
643                     });
644
645                 }catch(IOException e){
646                     if(log.isWarnEnabled()){
647                         // we get this exception when we close server socket
648
// exclude that case
649
if(serverSocket != null && !serverSocket.isClosed()){
650                             log.warn("Spawning socket from server socket finished abnormaly", e);
651                         }
652                     }
653                 }
654             }
655         }
656
657         public IpAddress getServerSocketAddress() {
658             return address;
659         }
660
661         public void stop() {
662             running = false;
663             try{
664                 if(serverSocket != null && !serverSocket.isClosed()){
665                     serverSocket.close();
666                 }
667             }catch(IOException e){
668             }finally{
669                 if(log.isDebugEnabled())
670                     log.debug("Waiting for StateProviderThreadSpawner to die ... ");
671
672                 if(runner != null){
673                     try{
674                         runner.join(3000);
675                     }catch(InterruptedException JavaDoc ignored){
676                         Thread.currentThread().interrupt();
677                     }
678                 }
679
680                 if(log.isDebugEnabled())
681                     log.debug("Shutting the thread pool down... ");
682
683                 pool.shutdownNow();
684                 try{
685                     pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME,
686                                           TimeUnit.MILLISECONDS);
687                 }catch(InterruptedException JavaDoc ignored){
688                     Thread.currentThread().interrupt();
689                 }
690             }
691             if(log.isDebugEnabled())
692                 log.debug("Thread pool is shutdown. All pool threads are cleaned up.");
693         }
694     }
695
696     private class StateProviderHandler {
697         public void process(Socket socket) {
698             StreamingOutputStreamWrapper wrapper = null;
699             ObjectInputStream ois = null;
700             try{
701                 int bufferSize = socket.getSendBufferSize();
702                 socket.setSendBufferSize(socket_buffer_size);
703                 if(log.isDebugEnabled())
704                     log.debug("Running on " + Thread.currentThread()
705                             + ". Accepted request for state transfer from "
706                             + socket.getInetAddress() + ":" + socket.getPort()
707                             + ", original buffer size was " + bufferSize + " and was reset to "
708                             + socket.getSendBufferSize() + ", passing outputstream up... ");
709
710                 // read out state requesters state_id and address and clear this
711
// request
712
ois = new ObjectInputStream(socket.getInputStream());
713                 String JavaDoc state_id = (String JavaDoc) ois.readObject();
714                 Address stateRequester = (Address) ois.readObject();
715                 removeFromStateRequesters(stateRequester, state_id);
716
717                 wrapper = new StreamingOutputStreamWrapper(socket);
718                 StateTransferInfo sti = new StateTransferInfo(stateRequester, wrapper, state_id);
719                 up_prot.up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti));
720             }catch(IOException e){
721                 if(log.isWarnEnabled()){
722                     log.warn("State writer socket thread spawned abnormaly", e);
723                 }
724             }catch(ClassNotFoundException JavaDoc e){
725                 // thrown by ois.readObject()
726
// should never happen since String/Address are core classes
727
}finally{
728                 if(socket != null && !socket.isConnected()){
729                     if(log.isWarnEnabled())
730                         log.warn("Accepted request for state transfer but socket " + socket
731                                 + " not connected properly. Closing it...");
732                     try{
733                         if(wrapper != null){
734                             wrapper.close();
735                         }else{
736                             socket.close();
737                         }
738                     }catch(IOException e){
739                     }
740                 }
741             }
742         }
743     }
744
745     private class StreamingInputStreamWrapper extends InputStream {
746
747         private Socket inputStreamOwner;
748
749         private InputStream delegate;
750
751         private Channel channelOwner;
752
753         public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException{
754             super();
755             this.inputStreamOwner = inputStreamOwner;
756             this.delegate = new BufferedInputStream(inputStreamOwner.getInputStream());
757             this.channelOwner = stack.getChannel();
758         }
759
760         public int available() throws IOException {
761             return delegate.available();
762         }
763
764         public void close() throws IOException {
765             if(log.isDebugEnabled()){
766                 log.debug("State reader " + inputStreamOwner + " is closing the socket ");
767             }
768             if(channelOwner != null && channelOwner.isConnected()){
769                 channelOwner.down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED));
770             }
771             inputStreamOwner.close();
772         }
773
774         public synchronized void mark(int readlimit) {
775             delegate.mark(readlimit);
776         }
777
778         public boolean markSupported() {
779             return delegate.markSupported();
780         }
781
782         public int read() throws IOException {
783             return delegate.read();
784         }
785
786         public int read(byte[] b, int off, int len) throws IOException {
787             return delegate.read(b, off, len);
788         }
789
790         public int read(byte[] b) throws IOException {
791             return delegate.read(b);
792         }
793
794         public synchronized void reset() throws IOException {
795             delegate.reset();
796         }
797
798         public long skip(long n) throws IOException {
799             return delegate.skip(n);
800         }
801     }
802
803     private class StreamingOutputStreamWrapper extends OutputStream {
804         private Socket outputStreamOwner;
805
806         private OutputStream delegate;
807
808         private long bytesWrittenCounter = 0;
809
810         private Channel channelOwner;
811
812         public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException{
813             super();
814             this.outputStreamOwner = outputStreamOwner;
815             this.delegate = new BufferedOutputStream(outputStreamOwner.getOutputStream());
816             this.channelOwner = stack.getChannel();
817         }
818
819         public void close() throws IOException {
820             if(log.isDebugEnabled()){
821                 log.debug("State writer " + outputStreamOwner + " is closing the socket ");
822             }
823             try{
824                 if(channelOwner != null && channelOwner.isConnected()){
825                     channelOwner.down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED));
826                 }
827                 outputStreamOwner.close();
828             }catch(IOException e){
829                 throw e;
830             }finally{
831                 if(stats){
832                     avg_state_size = num_bytes_sent.addAndGet(bytesWrittenCounter) / num_state_reqs.doubleValue();
833                 }
834             }
835         }
836
837         public void flush() throws IOException {
838             delegate.flush();
839         }
840
841         public void write(byte[] b, int off, int len) throws IOException {
842             delegate.write(b, off, len);
843             bytesWrittenCounter += len;
844         }
845
846         public void write(byte[] b) throws IOException {
847             delegate.write(b);
848             if(b != null){
849                 bytesWrittenCounter += b.length;
850             }
851         }
852
853         public void write(int b) throws IOException {
854             delegate.write(b);
855             bytesWrittenCounter += 1;
856         }
857     }
858
859     public static class StateHeader extends Header implements Streamable {
860         public static final byte STATE_REQ = 1;
861
862         public static final byte STATE_RSP = 2;
863
864         public static final byte STATE_REMOVE_REQUESTER = 3;
865
866         long id = 0; // state transfer ID (to separate multiple state
867
// transfers at the same time)
868

869         byte type = 0;
870
871         Address sender; // sender of state STATE_REQ or STATE_RSP
872

873         Digest my_digest = null; // digest of sender (if type is STATE_RSP)
874

875         IpAddress bind_addr = null;
876
877         String JavaDoc state_id = null; // for partial state transfer
878

879         public StateHeader(){ // for externalization
880
}
881
882         public StateHeader(byte type,Address sender,String JavaDoc state_id){
883             this.type = type;
884             this.sender = sender;
885             this.state_id = state_id;
886         }
887
888         public StateHeader(byte type,Address sender,long id,Digest digest){
889             this.type = type;
890             this.sender = sender;
891             this.id = id;
892             this.my_digest = digest;
893         }
894
895         public StateHeader(
896                             byte type,
897                             Address sender,
898                             IpAddress bind_addr,
899                             Digest digest,
900                             String JavaDoc state_id){
901             this.type = type;
902             this.sender = sender;
903             this.my_digest = digest;
904             this.bind_addr = bind_addr;
905             this.state_id = state_id;
906         }
907
908         public int getType() {
909             return type;
910         }
911
912         public Digest getDigest() {
913             return my_digest;
914         }
915
916         public String JavaDoc getStateId() {
917             return state_id;
918         }
919
920         public boolean equals(Object JavaDoc o) {
921             StateHeader other;
922
923             if(sender != null && o != null){
924                 if(!(o instanceof StateHeader))
925                     return false;
926                 other = (StateHeader) o;
927                 return sender.equals(other.sender) && id == other.id;
928             }
929             return false;
930         }
931
932         public int hashCode() {
933             if(sender != null)
934                 return sender.hashCode() + (int) id;
935             else
936                 return (int) id;
937         }
938
939         public String JavaDoc toString() {
940             StringBuilder JavaDoc sb = new StringBuilder JavaDoc();
941             sb.append("type=").append(type2Str(type));
942             if(sender != null)
943                 sb.append(", sender=").append(sender).append(" id=").append(id);
944             if(my_digest != null)
945                 sb.append(", digest=").append(my_digest);
946             return sb.toString();
947         }
948
949         static String JavaDoc type2Str(int t) {
950             switch(t){
951             case STATE_REQ:
952                 return "STATE_REQ";
953             case STATE_RSP:
954                 return "STATE_RSP";
955             case STATE_REMOVE_REQUESTER:
956                 return "STATE_REMOVE_REQUESTER";
957             default:
958                 return "<unknown>";
959             }
960         }
961
962         public void writeExternal(ObjectOutput out) throws IOException {
963             out.writeObject(sender);
964             out.writeLong(id);
965             out.writeByte(type);
966             out.writeObject(my_digest);
967             out.writeObject(bind_addr);
968             out.writeUTF(state_id);
969         }
970
971         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
972             sender = (Address) in.readObject();
973             id = in.readLong();
974             type = in.readByte();
975             my_digest = (Digest) in.readObject();
976             bind_addr = (IpAddress) in.readObject();
977             state_id = in.readUTF();
978         }
979
980         public void writeTo(DataOutputStream out) throws IOException {
981             out.writeByte(type);
982             out.writeLong(id);
983             Util.writeAddress(sender, out);
984             Util.writeStreamable(my_digest, out);
985             Util.writeStreamable(bind_addr, out);
986             Util.writeString(state_id, out);
987         }
988
989         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc,
990                                                 InstantiationException JavaDoc {
991             type = in.readByte();
992             id = in.readLong();
993             sender = Util.readAddress(in);
994             my_digest = (Digest) Util.readStreamable(Digest.class, in);
995             bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, in);
996             state_id = Util.readString(in);
997         }
998
999         public int size() {
1000            int retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
1001

1002            retval += Util.size(sender);
1003
1004            retval += Global.BYTE_SIZE; // presence byte for my_digest
1005
if(my_digest != null)
1006                retval += my_digest.serializedSize();
1007
1008            retval += Global.BYTE_SIZE; // presence byte for state_id
1009
if(state_id != null)
1010                retval += state_id.length() + 2;
1011            return retval;
1012        }
1013    }
1014}
1015
Popular Tags