KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > Jdk13ReplicationListener


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19
20
21
22 import java.net.Socket JavaDoc;
23 import java.net.ServerSocket JavaDoc;
24 import java.net.InetSocketAddress JavaDoc;
25 import java.nio.channels.Selector JavaDoc;
26
27 import org.apache.catalina.cluster.io.ListenCallback;
28 import org.apache.catalina.cluster.io.Jdk13ObjectReader;
29
30 /**
31  * @author Filip Hanik
32  * @version $Revision: 1.5 $, $Date: 2005/03/25 22:08:59 $
33  */

34 public class Jdk13ReplicationListener implements Runnable JavaDoc
35 {
36
37     private static org.apache.commons.logging.Log log =
38         org.apache.commons.logging.LogFactory.getLog( Jdk13ReplicationListener.class );
39     private ThreadPool pool = null;
40     private boolean doListen = false;
41     private ListenCallback callback;
42     private java.net.InetAddress JavaDoc bind;
43     private int port;
44     private long timeout = 0;
45     ServerSocket JavaDoc serverSocket = null;
46     
47     /**
48      * sendAck
49      */

50     private boolean sendAck = true ;
51     /**
52      * Compress message data bytes
53      */

54     private boolean compress = true ;
55     
56
57     public Jdk13ReplicationListener(ListenCallback callback,
58                                int poolSize,
59                                java.net.InetAddress JavaDoc bind,
60                                int port,
61                                long timeout,
62                                boolean sendAck)
63     {
64         this.sendAck=sendAck;
65         this.callback = callback;
66         this.bind = bind;
67         this.port = port;
68         this.timeout = timeout;
69     }
70
71     /**
72      * @return Returns the compress.
73      */

74     public boolean isCompress() {
75         return compress;
76     }
77     
78     /**
79      * @param compress The compress to set.
80      */

81     public void setCompress(boolean compress) {
82         this.compress = compress;
83     }
84     public boolean isSendAck() {
85         return sendAck;
86     }
87     public void setSendAck(boolean sendAck) {
88         this.sendAck = sendAck;
89     }
90
91     public void run()
92     {
93         try
94         {
95             listen();
96         }
97         catch ( Exception JavaDoc x )
98         {
99             log.fatal("Unable to start cluster listener.",x);
100         }
101     }
102
103     public void listen ()
104         throws Exception JavaDoc
105     {
106         doListen = true;
107         // Get the associated ServerSocket to bind it with
108
serverSocket = new ServerSocket JavaDoc();
109         serverSocket.bind (new InetSocketAddress JavaDoc (bind,port));
110         while (doListen) {
111             Socket JavaDoc socket = serverSocket.accept();
112             ClusterListenThread t = new ClusterListenThread(socket,new Jdk13ObjectReader(socket,callback,compress),sendAck);
113             t.setDaemon(true);
114             t.start();
115         }//while
116
serverSocket.close();
117     }
118
119     public void stopListening(){
120         doListen = false;
121         try {
122             serverSocket.close();
123         } catch ( Exception JavaDoc x ) {
124             log.error("Unable to stop the replication listen socket",x);
125         }
126     }
127
128     protected static class ClusterListenThread extends Thread JavaDoc {
129         private Socket JavaDoc socket;
130         private Jdk13ObjectReader reader;
131         private boolean keepRunning = true;
132         private boolean sendAck ;
133         private static byte[] ACK_COMMAND = new byte[] {6,2,3};
134         ClusterListenThread(Socket JavaDoc socket, Jdk13ObjectReader reader, boolean sendAck) {
135             this.socket = socket;
136             this.reader = reader;
137             this.sendAck = sendAck ;
138         }
139
140         public void run() {
141             try {
142                 byte[] buffer = new byte[1024];
143                 while (keepRunning) {
144                     java.io.InputStream JavaDoc in = socket.getInputStream();
145                     int cnt = in.read(buffer);
146                     int ack = 0;
147                     if ( cnt > 0 ) {
148                         ack = reader.append(buffer, 0, cnt);
149                     }
150                     if(sendAck) {
151                         while ( ack > 0 ) {
152                             sendAck();
153                             ack--;
154                         }
155                     }
156                 }
157             } catch ( Exception JavaDoc x ) {
158                 keepRunning = false;
159                 log.error("Unable to read data from client, disconnecting.",x);
160                 try { socket.close(); } catch ( Exception JavaDoc ignore ) {}
161             }
162         }
163
164         private void sendAck() throws java.io.IOException JavaDoc {
165             //send a reply-acknowledgement
166
socket.getOutputStream().write(ACK_COMMAND);
167         }
168
169     }
170 }
171
Popular Tags