KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > ReplicationListener


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19
20 import java.net.InetSocketAddress JavaDoc;
21 import java.net.ServerSocket JavaDoc;
22 import java.nio.channels.SelectableChannel JavaDoc;
23 import java.nio.channels.SelectionKey JavaDoc;
24 import java.nio.channels.Selector JavaDoc;
25 import java.nio.channels.ServerSocketChannel JavaDoc;
26 import java.nio.channels.SocketChannel JavaDoc;
27 import java.util.Iterator JavaDoc;
28
29 import org.apache.catalina.cluster.CatalinaCluster;
30 import org.apache.catalina.cluster.ClusterReceiver;
31 import org.apache.catalina.cluster.tcp.Constants;
32 import org.apache.catalina.cluster.io.ListenCallback;
33 import org.apache.catalina.cluster.io.ObjectReader;
34 import org.apache.catalina.util.StringManager;
35 /**
36 * FIXME i18n log messages
37 * FIXME jmx support
38 * @author Peter Rossbach
39 * @author Filip Hanik
40 * @version $Revision: 1.20 $ $Date: 2005/03/25 22:10:25 $
41 */

42 public class ReplicationListener implements Runnable JavaDoc,ClusterReceiver
43 {
44     private static org.apache.commons.logging.Log log =
45         org.apache.commons.logging.LogFactory.getLog( ReplicationListener.class );
46
47     /**
48      * The descriptive information about this implementation.
49      */

50     private static final String JavaDoc info = "ReplicationListener/1.1";
51
52     /**
53      * The string manager for this package.
54      */

55     protected StringManager sm = StringManager.getManager(Constants.Package);
56
57     
58     private ThreadPool pool = null;
59     private boolean doListen = false;
60     private ListenCallback callback;
61     private java.net.InetAddress JavaDoc bind;
62     private String JavaDoc tcpListenAddress;
63     private int tcpThreadCount;
64     private long tcpSelectorTimeout;
65     private int tcpListenPort;
66     private boolean sendAck;
67     /**
68      * Compress message data bytes
69      */

70     private boolean compress = true ;
71     
72     private Selector JavaDoc selector = null;
73     
74     private Object JavaDoc interestOpsMutex = new Object JavaDoc();
75     
76     public ReplicationListener() {
77     }
78
79     /**
80      * @return Returns the compress.
81      */

82     public boolean isCompress() {
83         return compress;
84     }
85     
86     /**
87      * @param compress The compress to set.
88      */

89     public void setCompress(boolean compressMessageData) {
90         this.compress = compressMessageData;
91     }
92     
93     /**
94      * start cluster receiver
95      * @see org.apache.catalina.cluster.ClusterReceiver#start()
96      */

97     public void start() {
98         try {
99             pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
100             if ( "auto".equals(tcpListenAddress) ) {
101                 tcpListenAddress = java.net.InetAddress.getLocalHost().
102                     getHostAddress();
103             }
104             if(log.isDebugEnabled())
105                 log.debug("Starting replication listener on address:"+tcpListenAddress);
106             bind = java.net.InetAddress.getByName(tcpListenAddress);
107             Thread JavaDoc t = new Thread JavaDoc(this,"ClusterReceiver");
108             t.setDaemon(true);
109             t.start();
110         } catch ( Exception JavaDoc x ) {
111             log.fatal("Unable to start cluster receiver",x);
112         }
113
114     }
115     
116     public void stop() {
117         stopListening();
118     }
119     
120
121     public void run()
122     {
123         try
124         {
125             listen();
126         }
127         catch ( Exception JavaDoc x )
128         {
129             log.error("Unable to start cluster listener.",x);
130         }
131     }
132
133     /**
134      * get data from channel and store in byte array
135      * send it to cluster
136      * @throws Exception
137      */

138     public void listen ()
139         throws Exception JavaDoc
140     {
141         doListen = true;
142         // allocate an unbound server socket channel
143
ServerSocketChannel JavaDoc serverChannel = ServerSocketChannel.open();
144         // Get the associated ServerSocket to bind it with
145
ServerSocket JavaDoc serverSocket = serverChannel.socket();
146         // create a new Selector for use below
147
selector = Selector.open();
148         // set the port the server channel will listen to
149
serverSocket.bind (new InetSocketAddress JavaDoc (bind,tcpListenPort));
150         // set non-blocking mode for the listening socket
151
serverChannel.configureBlocking (false);
152         // register the ServerSocketChannel with the Selector
153
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
154         while (doListen) {
155             // this may block for a long time, upon return the
156
// selected set contains keys of the ready channels
157
try {
158
159                 int n = selector.select(tcpSelectorTimeout);
160                 if (n == 0) {
161                     //there is a good chance that we got here
162
//because the TcpReplicationThread called
163
//selector wakeup().
164
//if that happens, we must ensure that that
165
//thread has enough time to call interestOps
166
synchronized (interestOpsMutex) {
167                         //if we got the lock, means there are no
168
//keys trying to register for the
169
//interestOps method
170
}
171                     continue; // nothing to do
172
}
173                 // get an iterator over the set of selected keys
174
Iterator JavaDoc it = selector.selectedKeys().iterator();
175                 // look at each key in the selected set
176
while (it.hasNext()) {
177                     SelectionKey JavaDoc key = (SelectionKey JavaDoc) it.next();
178                     // Is a new connection coming in?
179
if (key.isAcceptable()) {
180                         ServerSocketChannel JavaDoc server =
181                             (ServerSocketChannel JavaDoc) key.channel();
182                         SocketChannel JavaDoc channel = server.accept();
183                         Object JavaDoc attach = attach = new ObjectReader(channel, selector,
184                                     callback,isCompress()) ;
185                         registerChannel(selector,
186                                         channel,
187                                         SelectionKey.OP_READ,
188                                         attach);
189                     }
190                     // is there data to read on this channel?
191
if (key.isReadable()) {
192                         readDataFromSocket(key);
193                     } else {
194                         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
195                     }
196
197                     // remove key from selected set, it's been handled
198
it.remove();
199                 }
200             }
201             catch (java.nio.channels.CancelledKeyException JavaDoc nx) {
202                 log.warn(
203                     "Replication client disconnected, error when polling key. Ignoring client.");
204             }
205             catch (Exception JavaDoc x) {
206                 log.error("Unable to process request in ReplicationListener", x);
207             }
208
209         }
210         serverChannel.close();
211         selector.close();
212     }
213
214     public void stopListening(){
215         doListen = false;
216         if ( selector != null ) {
217             try {
218                 selector.close();
219                 selector = null;
220             } catch ( Exception JavaDoc x ) {
221                 log.error("Unable to close cluster receiver selector.",x);
222             }
223         }
224     }
225     
226     public void setCatalinaCluster(CatalinaCluster cluster) {
227         callback = cluster;
228     }
229
230     public CatalinaCluster getCatalinaCluster() {
231         return (CatalinaCluster)callback ;
232     }
233     
234     // ----------------------------------------------------------
235

236     /**
237      * Register the given channel with the given selector for
238      * the given operations of interest
239      */

240     protected void registerChannel (Selector JavaDoc selector,
241                                     SelectableChannel JavaDoc channel,
242                                     int ops,
243                                     Object JavaDoc attach)
244     throws Exception JavaDoc {
245         if (channel == null) return; // could happen
246
// set the new channel non-blocking
247
channel.configureBlocking (false);
248         // register it with the selector
249
channel.register (selector, ops, attach);
250     }
251
252     // ----------------------------------------------------------
253

254     /**
255      * Sample data handler method for a channel with data ready to read.
256      * @param key A SelectionKey object associated with a channel
257      * determined by the selector to be ready for reading. If the
258      * channel returns an EOF condition, it is closed here, which
259      * automatically invalidates the associated key. The selector
260      * will then de-register the channel on the next select call.
261      */

262     protected void readDataFromSocket (SelectionKey JavaDoc key)
263         throws Exception JavaDoc
264     {
265         TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
266         if (worker == null) {
267             // No threads available, do nothing, the selection
268
// loop will keep calling this method until a
269
// thread becomes available.
270
// FIXME: This design could be improved.
271
if(log.isDebugEnabled())
272                 log.debug("No TcpReplicationThread available");
273         } else {
274             // invoking this wakes up the worker thread then returns
275
worker.serviceChannel(key, sendAck);
276         }
277     }
278     public String JavaDoc getTcpListenAddress() {
279         return tcpListenAddress;
280     }
281     public void setTcpListenAddress(String JavaDoc tcpListenAddress) {
282         this.tcpListenAddress = tcpListenAddress;
283     }
284     public int getTcpListenPort() {
285         return tcpListenPort;
286     }
287     public void setTcpListenPort(int tcpListenPort) {
288         this.tcpListenPort = tcpListenPort;
289     }
290     public long getTcpSelectorTimeout() {
291         return tcpSelectorTimeout;
292     }
293     public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
294         this.tcpSelectorTimeout = tcpSelectorTimeout;
295     }
296     public int getTcpThreadCount() {
297         return tcpThreadCount;
298     }
299     public void setTcpThreadCount(int tcpThreadCount) {
300         this.tcpThreadCount = tcpThreadCount;
301     }
302     public boolean isSendAck() {
303         return sendAck;
304     }
305     public void setSendAck(boolean sendAck) {
306         this.sendAck = sendAck;
307     }
308     
309     public String JavaDoc getHost() {
310         return getTcpListenAddress();
311     }
312
313     public int getPort() {
314         return getTcpListenPort();
315     }
316     public Object JavaDoc getInterestOpsMutex() {
317         return interestOpsMutex;
318     }
319
320 }
321
Popular Tags