KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > tcp > PooledSocketSender


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.io.IOException JavaDoc;
20 import java.net.InetAddress JavaDoc;
21 import java.util.LinkedList JavaDoc;
22
23 /**
24  * Send cluster messages with a pool of sockets (25).
25  *
26  * FIXME support processing stats
27  *
28  * @author Filip Hanik
29  * @author Peter Rossbach
30  * @version 1.2
31  */

32
33 public class PooledSocketSender extends DataSender {
34
35     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
36             .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
37
38     /**
39      * The descriptive information about this implementation.
40      */

41     private static final String JavaDoc info = "PooledSocketSender/1.2";
42
43     // ----------------------------------------------------- Instance Variables
44

45     private int maxPoolSocketLimit = 25;
46
47     private SenderQueue senderQueue = null;
48
49     // ----------------------------------------------------- Constructor
50

51     public PooledSocketSender(InetAddress JavaDoc host, int port) {
52         super(host, port);
53         senderQueue = new SenderQueue(this, maxPoolSocketLimit);
54     }
55
56     // ----------------------------------------------------- Public Properties
57

58     /**
59      * Return descriptive information about this implementation and the
60      * corresponding version number, in the format
61      * <code>&lt;description&gt;/&lt;version&gt;</code>.
62      */

63     public String JavaDoc getInfo() {
64
65         return (info);
66
67     }
68
69     public void setMaxPoolSocketLimit(int limit) {
70         maxPoolSocketLimit = limit;
71         senderQueue.setLimit(limit);
72     }
73
74     public int getMaxPoolSocketLimit() {
75         return maxPoolSocketLimit;
76     }
77
78     public int getInPoolSize() {
79         return senderQueue.getInPoolSize();
80     }
81
82     public int getInUsePoolSize() {
83         return senderQueue.getInUsePoolSize();
84     }
85
86     // ----------------------------------------------------- Public Methode
87

88     public void connect() throws java.io.IOException JavaDoc {
89         //do nothing, happens in the socket sender itself
90
senderQueue.open();
91         setSocketConnected(true);
92         connectCounter++;
93     }
94
95     public void disconnect() {
96         senderQueue.close();
97         setSocketConnected(false);
98         disconnectCounter++;
99     }
100
101     /**
102      * send Message and use a pool of SocketSenders
103      *
104      * @param messageId Message unique identifier
105      * @param data Message data
106      * @throws java.io.IOException
107      */

108     public void sendMessage(String JavaDoc messageId, byte[] data) throws IOException JavaDoc {
109         //get a socket sender from the pool
110
SocketSender sender = senderQueue.getSender(0);
111         if (sender == null) {
112             log.warn(sm.getString("PoolSocketSender.noMoreSender", this
113                     .getAddress(), new Integer JavaDoc(this.getPort())));
114             return;
115         }
116         //send the message
117
try {
118             sender.sendMessage(messageId, data);
119         } finally {
120             //return the connection to the pool
121
senderQueue.returnSender(sender);
122         }
123         addStats(data.length);
124     }
125
126     public String JavaDoc toString() {
127         StringBuffer JavaDoc buf = new StringBuffer JavaDoc("PooledSocketSender[");
128         buf.append(getAddress()).append(":").append(getPort()).append("]");
129         return buf.toString();
130     }
131
132     // ----------------------------------------------------- Inner Class
133

134     private class SenderQueue {
135         private int limit = 25;
136
137         PooledSocketSender parent = null;
138
139         private LinkedList JavaDoc queue = new LinkedList JavaDoc();
140
141         private LinkedList JavaDoc inuse = new LinkedList JavaDoc();
142
143         private Object JavaDoc mutex = new Object JavaDoc();
144
145         private boolean isOpen = true;
146
147         public SenderQueue(PooledSocketSender parent, int limit) {
148             this.limit = limit;
149             this.parent = parent;
150         }
151
152         /**
153          * @return Returns the limit.
154          */

155         public int getLimit() {
156             return limit;
157         }
158         /**
159          * @param limit The limit to set.
160          */

161         public void setLimit(int limit) {
162             this.limit = limit;
163         }
164         /**
165          * @return
166          */

167         public int getInUsePoolSize() {
168             return inuse.size();
169         }
170
171         /**
172          * @return
173          */

174         public int getInPoolSize() {
175             return queue.size();
176         }
177
178         public SocketSender getSender(long timeout) {
179             SocketSender sender = null;
180             long start = System.currentTimeMillis();
181             long delta = 0;
182             do {
183                 synchronized (mutex) {
184                     if (!isOpen)
185                         throw new IllegalStateException JavaDoc(
186                                 "Socket pool is closed.");
187                     if (queue.size() > 0) {
188                         sender = (SocketSender) queue.removeFirst();
189                     } else if (inuse.size() < limit) {
190                         sender = getNewSocketSender();
191                     } else {
192                         try {
193                             mutex.wait(timeout);
194                         } catch (Exception JavaDoc x) {
195                             PooledSocketSender.log
196                                     .warn(
197                                             sm
198                                                     .getString(
199                                                             "PoolSocketSender.senderQueue.sender.failed",
200                                                             parent.getAddress(),
201                                                             new Integer JavaDoc(parent
202                                                                     .getPort())),
203                                             x);
204                         }//catch
205
}//end if
206
if (sender != null) {
207                         inuse.add(sender);
208                     }
209                 }//synchronized
210
delta = System.currentTimeMillis() - start;
211             } while ((isOpen) && (sender == null)
212                     && (timeout == 0 ? true : (delta < timeout)));
213             //to do
214
return sender;
215         }
216
217         public void returnSender(SocketSender sender) {
218             //to do
219
synchronized (mutex) {
220                 queue.add(sender);
221                 inuse.remove(sender);
222                 mutex.notify();
223             }
224         }
225
226         private SocketSender getNewSocketSender() {
227             //new SocketSender(
228
SocketSender sender = new SocketSender(parent.getAddress(), parent
229                     .getPort());
230             sender.setKeepAliveMaxRequestCount(parent
231                     .getKeepAliveMaxRequestCount());
232             sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
233             sender.setAckTimeout(parent.getAckTimeout());
234             sender.setWaitForAck(parent.isWaitForAck());
235             return sender;
236
237         }
238
239         public void close() {
240             synchronized (mutex) {
241                 for (int i = 0; i < queue.size(); i++) {
242                     SocketSender sender = (SocketSender) queue.get(i);
243                     sender.disconnect();
244                 }//for
245
for (int i = 0; i < inuse.size(); i++) {
246                     SocketSender sender = (SocketSender) inuse.get(i);
247                     sender.disconnect();
248                 }//for
249
queue.clear();
250                 inuse.clear();
251                 isOpen = false;
252                 mutex.notifyAll();
253             }
254         }
255
256         public void open() {
257             synchronized (mutex) {
258                 isOpen = true;
259                 mutex.notifyAll();
260             }
261         }
262     }
263 }
Popular Tags