KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > TcpReplicationThread


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18 import java.io.IOException JavaDoc;
19 import java.nio.ByteBuffer JavaDoc;
20 import java.nio.channels.SelectionKey JavaDoc;
21 import java.nio.channels.SocketChannel JavaDoc;
22
23 import org.apache.catalina.cluster.io.ObjectReader;
24
25 /**
26  * A worker thread class which can drain channels and echo-back the input. Each
27  * instance is constructed with a reference to the owning thread pool object.
28  * When started, the thread loops forever waiting to be awakened to service the
29  * channel associated with a SelectionKey object. The worker is tasked by
30  * calling its serviceChannel() method with a SelectionKey object. The
31  * serviceChannel() method stores the key reference in the thread object then
32  * calls notify() to wake it up. When the channel has been drained, the worker
33  * thread returns itself to its parent pool.
34  *
35  * @author Filip Hanik
36  * @version $Revision: 1.14 $, $Date: 2005/03/25 22:18:38 $
37  */

38 public class TcpReplicationThread extends WorkerThread {
39     private static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
40     private static org.apache.commons.logging.Log log =
41         org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
42     private ByteBuffer JavaDoc buffer = ByteBuffer.allocate (1024);
43     private SelectionKey JavaDoc key;
44     private boolean waitForAck=true;
45
46     TcpReplicationThread ()
47     {
48     }
49
50     // loop forever waiting for work to do
51
public synchronized void run()
52     {
53         while (doRun) {
54             try {
55                 // sleep and release object lock
56
this.wait();
57             } catch (InterruptedException JavaDoc e) {
58                 if(log.isInfoEnabled())
59                     log.info("TCP worker thread interrupted in cluster",e);
60                 // clear interrupt status
61
Thread.interrupted();
62             }
63             if (key == null) {
64                 continue; // just in case
65
}
66             try {
67                 drainChannel (key);
68             } catch (Exception JavaDoc e) {
69                 log.error ("TCP Worker thread in cluster caught '"
70                     + e + "' closing channel", e);
71
72                 // close channel and nudge selector
73
try {
74                     key.channel().close();
75                 } catch (IOException JavaDoc ex) {
76                     log.error("Unable to close channel.",ex);
77                 }
78                 key.selector().wakeup();
79             }
80             key = null;
81             // done, ready for more, return to pool
82
this.pool.returnWorker (this);
83         }
84     }
85
86     /**
87      * Called to initiate a unit of work by this worker thread
88      * on the provided SelectionKey object. This method is
89      * synchronized, as is the run() method, so only one key
90      * can be serviced at a given time.
91      * Before waking the worker thread, and before returning
92      * to the main selection loop, this key's interest set is
93      * updated to remove OP_READ. This will cause the selector
94      * to ignore read-readiness for this channel while the
95      * worker thread is servicing it.
96      */

97     synchronized void serviceChannel (SelectionKey JavaDoc key, boolean waitForAck)
98     {
99         this.key = key;
100         this.waitForAck=waitForAck;
101         key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
102         key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
103         this.notify(); // awaken the thread
104
}
105
106     /**
107      * The actual code which drains the channel associated with
108      * the given key. This method assumes the key has been
109      * modified prior to invocation to turn off selection
110      * interest in OP_READ. When this method completes it
111      * re-enables OP_READ and calls wakeup() on the selector
112      * so the selector will resume watching this channel.
113      */

114     private void drainChannel (SelectionKey JavaDoc key)
115         throws Exception JavaDoc
116     {
117         boolean packetReceived=false;
118         SocketChannel JavaDoc channel = (SocketChannel JavaDoc) key.channel();
119         int count;
120         buffer.clear(); // make buffer empty
121
ObjectReader reader = (ObjectReader)key.attachment();
122         // loop while data available, channel is non-blocking
123
while ((count = channel.read (buffer)) > 0) {
124             buffer.flip(); // make buffer readable
125
reader.append(buffer.array(),0,count);
126             buffer.clear(); // make buffer empty
127
}
128         //check to see if any data is available
129
int pkgcnt = reader.execute();
130         if (waitForAck) {
131             while ( pkgcnt > 0 ) {
132                 sendAck(key,channel);
133                 pkgcnt--;
134             }
135         }
136         
137         if (count < 0) {
138             // close channel on EOF, invalidates the key
139
channel.close();
140             return;
141         }
142         
143         //acquire the interestOps mutex
144
Object JavaDoc mutex = this.getPool().getInterestOpsMutex();
145         synchronized (mutex) {
146             // cycle the selector so this key is active again
147
key.selector().wakeup();
148             // resume interest in OP_READ, OP_WRITE
149
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
150             key.interestOps(resumeOps);
151         }
152         
153     }
154
155     /**
156      * send a reply-acknowledgement (6,2,3)
157      * @param key
158      * @param channel
159      */

160     private void sendAck(SelectionKey JavaDoc key, SocketChannel JavaDoc channel) {
161         
162         try {
163             channel.write(ByteBuffer.wrap(ACK_COMMAND));
164         } catch ( java.io.IOException JavaDoc x ) {
165             log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
166         }
167     }
168 }
169
Popular Tags