KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > tcp > ReliableTcpClient


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - ScalAgent Distributed Technologies
4  * Copyright (C) 2004 - France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  * Contributor(s):
23  */

24 package org.objectweb.joram.client.jms.tcp;
25
26 import java.io.*;
27 import java.net.*;
28 import java.util.*;
29 import java.util.Timer JavaDoc;
30
31 import javax.jms.*;
32 import javax.jms.IllegalStateException JavaDoc;
33
34 import org.objectweb.joram.shared.client.AbstractJmsMessage;
35 import org.objectweb.joram.client.jms.FactoryParameters;
36 import org.objectweb.joram.shared.stream.StreamUtil;
37
38 import fr.dyade.aaa.util.ReliableTcpConnection;
39
40 import fr.dyade.aaa.util.Debug;
41 import org.objectweb.util.monolog.api.BasicLevel;
42 import org.objectweb.util.monolog.api.Logger;
43
44 public class ReliableTcpClient {
45   public static Logger logger = Debug.getLogger(ReliableTcpClient.class.getName());
46
47   public static final int INIT = 0;
48   public static final int CONNECT = 1;
49   public static final int CLOSE = 2;
50
51   public static final String JavaDoc[] statusNames =
52   {"INIT", "CONNECT", "CLOSE"};
53
54   private FactoryParameters params;
55
56   private String JavaDoc name;
57   
58   private String JavaDoc password;
59
60   private int key;
61
62   private ReliableTcpConnection connection;
63
64   private volatile int status;
65
66   private Vector addresses;
67   /**
68    * True if the client must try to reconnect in case of connection
69    * failure. It depends of cnxPendingTimer on a "normal" TCP connection,
70    * always true on HA.
71    */

72   private boolean reconnect;
73   /**
74    * Time in ms during the client try to reconnect to the server. It depends
75    * of connectingTimer and cnxPendingTimer from the connection parameters.
76    */

77   private int reconnectTimeout = 0;
78   
79   private Timer JavaDoc timer;
80
81   public ReliableTcpClient() {}
82   
83   public void setTimer(Timer JavaDoc timer2) {
84     timer = timer2;
85   }
86
87   public void init(FactoryParameters params,
88                    String JavaDoc name,
89                    String JavaDoc password,
90                    boolean reconnect) {
91     if (logger.isLoggable(BasicLevel.DEBUG))
92       logger.log(BasicLevel.DEBUG,
93                  "ReliableTcpClient.init(" + params + ',' + name + ',' + password + ',' + reconnect + ')');
94
95     this.params = params;
96     this.name = name;
97     this.password = password;
98     this.reconnect = reconnect;
99     if (params.cnxPendingTimer > 0)
100       this.reconnectTimeout =
101         Math.max(2*params.cnxPendingTimer,
102                  (params.connectingTimer*1000)+params.cnxPendingTimer);
103     addresses = new Vector();
104     key = -1;
105     setStatus(INIT);
106   }
107
108   private void setStatus(int status) {
109     if (logger.isLoggable(BasicLevel.DEBUG))
110       logger.log(BasicLevel.DEBUG,
111                  "ReliableTcpClient[" + name + ',' + key + "].setStatus(" + statusNames[status] + ')');
112     this.status = status;
113   }
114
115   public void connect() throws JMSException {
116     connect(false);
117   }
118
119   public synchronized void connect(boolean reconnect) throws JMSException {
120     if (logger.isLoggable(BasicLevel.DEBUG))
121       logger.log(BasicLevel.DEBUG,
122                  "ReliableTcpClient[" + name + ',' + key + "].connect(" + reconnect + ')');
123     
124     if (status != INIT)
125       throw new IllegalStateException JavaDoc("Connect: state error");
126
127     long startTime = System.currentTimeMillis();
128     long endTime = startTime;
129     if (addresses.size() > 1) {
130       // infinite retry in case of HA.
131
endTime = Long.MAX_VALUE;
132     } else {
133       if (reconnect) {
134         endTime += reconnectTimeout;
135       } else {
136         endTime += params.connectingTimer * 1000L;
137       }
138     }
139
140     int attemptsC = 0;
141     long nextSleep = 100;
142     while (true) {
143       if (status == CLOSE)
144         throw new IllegalStateException JavaDoc("Closed connection");
145       attemptsC++;
146       for (int i = 0; i < addresses.size(); i++) {
147         ServerAddress sa = (ServerAddress)addresses.elementAt(i);
148         try {
149           doConnect(sa.hostName, sa.port);
150           setStatus(CONNECT);
151           return;
152         } catch (JMSSecurityException exc) {
153           throw exc;
154         } catch (UnknownHostException uhe) {
155           if (logger.isLoggable(BasicLevel.DEBUG))
156             logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", uhe);
157           IllegalStateException JavaDoc jmsExc =
158             new IllegalStateException JavaDoc("Server's host is unknown: " + sa.hostName);
159           jmsExc.setLinkedException(uhe);
160           throw jmsExc;
161         } catch (IOException ioe) {
162           if (logger.isLoggable(BasicLevel.DEBUG))
163             logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", ioe);
164           // continue
165
} catch (JMSException jmse) {
166           if (logger.isLoggable(BasicLevel.DEBUG))
167             logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", jmse);
168           // continue
169
} catch (Exception JavaDoc e) {
170           if (logger.isLoggable(BasicLevel.DEBUG))
171             logger.log(BasicLevel.DEBUG, "ReliableTcpClient.connect", e);
172           // continue
173
}
174       }
175       long currentTime = System.currentTimeMillis();
176
177       if (logger.isLoggable(BasicLevel.DEBUG))
178         logger.log(BasicLevel.DEBUG,
179                    " -> currentTime = " + currentTime + ",endTime = " + endTime);
180       
181       // Keep on trying as long as timer is ok:
182
if (currentTime < endTime) {
183         if (logger.isLoggable(BasicLevel.DEBUG))
184           logger.log(BasicLevel.DEBUG,
185                      " -> retry connection " + name + ',' + key);
186         
187         if (currentTime + nextSleep > endTime) {
188           nextSleep = endTime - currentTime;
189         }
190         
191         // Sleeping for a while:
192
try {
193           wait(nextSleep);
194         } catch (InterruptedException JavaDoc intExc) {
195           IllegalStateException JavaDoc jmsExc =
196             new IllegalStateException JavaDoc("Could not open the connection with "
197                                       + addresses + ": interrupted");
198         }
199
200         // Trying again!
201
nextSleep = nextSleep * 2;
202       } else {
203           if (logger.isLoggable(BasicLevel.DEBUG))
204             logger.log(BasicLevel.DEBUG,
205                        " -> close connection " + name + ',' + key);
206
207           // If timer is over, throwing an IllegalStateException:
208
long attemptsT = (System.currentTimeMillis() - startTime) / 1000;
209           IllegalStateException JavaDoc jmsExc =
210             new IllegalStateException JavaDoc("Could not connect to JMS server with "
211                                       + addresses
212                                       + " after " + attemptsC
213                                       + " attempts during "
214                                       + attemptsT + " secs: server is"
215                                       + " not listening" );
216           throw jmsExc;
217       }
218     }
219   }
220
221   protected Socket createSocket(String JavaDoc hostName, int port)
222     throws Exception JavaDoc {
223     return new Socket(hostName, port);
224   }
225
226   private void doConnect(String JavaDoc hostName, int port)
227     throws Exception JavaDoc, JMSException {
228     if (logger.isLoggable(BasicLevel.DEBUG))
229       logger.log(BasicLevel.DEBUG,
230                  "ReliableTcpClient[" + name + ',' + key + "].doConnect(" + hostName + ',' + port + ')');
231
232     Socket socket = createSocket(hostName, port);
233     socket.setTcpNoDelay(true);
234     socket.setSoTimeout(0);
235     socket.setSoLinger(true, 1000);
236     
237     ByteArrayOutputStream baos = new ByteArrayOutputStream();
238     OutputStream os = socket.getOutputStream();
239     InputStream is = socket.getInputStream();
240     
241     if (logger.isLoggable(BasicLevel.DEBUG))
242     logger.log(BasicLevel.DEBUG, " -> write name = " + name);
243     StreamUtil.writeTo(name, baos);
244     if (logger.isLoggable(BasicLevel.DEBUG))
245     logger.log(BasicLevel.DEBUG, " -> write password = " + password);
246     StreamUtil.writeTo(password, baos);
247     if (logger.isLoggable(BasicLevel.DEBUG))
248     logger.log(BasicLevel.DEBUG, " -> write key = " + key);
249     StreamUtil.writeTo(key, baos);
250
251     if (key == -1) {
252       if (logger.isLoggable(BasicLevel.DEBUG))
253         logger.log(BasicLevel.DEBUG, " -> open new connection");
254       StreamUtil.writeTo(reconnectTimeout, baos);
255       StreamUtil.writeTo(baos.size(), os);
256       baos.writeTo(os);
257       os.flush();
258
259       int len = StreamUtil.readIntFrom(is);
260       int res = StreamUtil.readIntFrom(is);
261       if (res > 0) {
262         String JavaDoc info = StreamUtil.readStringFrom(is);
263         throwSecurityError(info);
264       }
265
266       key = StreamUtil.readIntFrom(is);
267       if (logger.isLoggable(BasicLevel.DEBUG))
268         logger.log(BasicLevel.DEBUG, " -> key = " + name + ',' + key);
269       connection = new ReliableTcpConnection(timer);
270       if (logger.isLoggable(BasicLevel.DEBUG))
271         logger.log(BasicLevel.DEBUG, " -> init reliable connection");
272     } else {
273       if (logger.isLoggable(BasicLevel.DEBUG))
274         logger.log(BasicLevel.DEBUG, " -> reopen connection " + name + ',' + key);
275       StreamUtil.writeTo(baos.size(), os);
276       baos.writeTo(os);
277       os.flush();
278
279       int len = StreamUtil.readIntFrom(is);
280       int res = StreamUtil.readIntFrom(is);
281       if (logger.isLoggable(BasicLevel.DEBUG))
282         logger.log(BasicLevel.DEBUG, " -> read res = " + res);
283       if (res > 0) {
284         String JavaDoc info = StreamUtil.readStringFrom(is);
285         throwSecurityError(info);
286       }
287
288       if (logger.isLoggable(BasicLevel.DEBUG))
289         logger.log(BasicLevel.DEBUG, " -> reset reliable connection");
290     }
291
292     connection.init(socket);
293   }
294
295   private void throwSecurityError(String JavaDoc info)
296     throws JMSSecurityException {
297     JMSSecurityException jmsExc =
298       new JMSSecurityException("Can't open the connection with the server " +
299                                params.getHost() + " on port " +
300                                params.getPort() + ": " + info);
301     throw jmsExc;
302   }
303
304   public void send(AbstractJmsMessage request)
305     throws Exception JavaDoc {
306     if (logger.isLoggable(BasicLevel.DEBUG))
307       logger.log( BasicLevel.DEBUG,
308                   "ReliableTcpClient[" + name + ',' + key + "].send(" + request + ')');
309     if (status == CLOSE) throw new IOException("Closed connection");
310     if (status != CONNECT) {
311       if (reconnect) waitForReconnection();
312       else throw new IOException("Closed connection");
313     }
314     while (true) {
315       try {
316         connection.send(request);
317         return;
318       } catch (IOException exc) {
319         if (logger.isLoggable(BasicLevel.DEBUG))
320           logger.log(BasicLevel.DEBUG,
321                      "ReliableTcpClient[" + name + ',' + key + "]", exc);
322         if (reconnect) {
323           waitForReconnection();
324         } else {
325           close();
326           throw exc;
327         }
328       }
329     }
330   }
331
332   public Object JavaDoc receive()
333     throws Exception JavaDoc {
334     if (logger.isLoggable(BasicLevel.DEBUG))
335       logger.log(BasicLevel.DEBUG,
336                  "ReliableTcpClient[" + name + ',' + key + "].receive()");
337     while (true) {
338       try {
339         return connection.receive();
340       } catch (IOException exc) {
341         if (logger.isLoggable(BasicLevel.DEBUG))
342           logger.log(BasicLevel.DEBUG,
343                      "ReliableTcpClient[" + name + ',' + key + "]", exc);
344         if (reconnect) {
345           reconnect();
346         } else {
347           close();
348           throw exc;
349         }
350       }
351     }
352   }
353
354   private synchronized void waitForReconnection() throws Exception JavaDoc {
355     if (logger.isLoggable(BasicLevel.DEBUG))
356       logger.log(BasicLevel.DEBUG,
357                  "ReliableTcpClient[" + name + ',' + key + "].waitForReconnection()");
358     while (status == INIT) {
359       try {
360         wait();
361       } catch (InterruptedException JavaDoc exc) {
362         //continue
363
}
364     }
365     switch (status) {
366     case CONNECT:
367       break;
368     case CLOSE:
369       throw new Exception JavaDoc("Connection closed");
370     }
371   }
372
373   private synchronized void reconnect() throws Exception JavaDoc {
374     if (logger.isLoggable(BasicLevel.DEBUG))
375       logger.log(BasicLevel.DEBUG,
376                  "ReliableTcpClient[" + name + ',' + key + "].reconnect()");
377     switch (status) {
378     case CONNECT:
379       setStatus(INIT);
380     case INIT:
381       try {
382         connect(true);
383       } catch (JMSException exc) {
384         close();
385         throw exc;
386       } finally {
387         notifyAll();
388       }
389       break;
390     case CLOSE:
391       throw new Exception JavaDoc("Connection closed");
392     default:
393       throw new Error JavaDoc("State error");
394     }
395   }
396
397   public synchronized void close() {
398     if (logger.isLoggable(BasicLevel.DEBUG))
399       logger.log(BasicLevel.DEBUG,
400                  "ReliableTcpClient[" + name + ',' + key + "].close()");
401     if (status != CLOSE) {
402       setStatus(CLOSE);
403       connection.close();
404     }
405   }
406
407   public void addServerAddress(String JavaDoc host, int port) {
408     addresses.addElement(new ServerAddress(host, port));
409   }
410
411   public String JavaDoc toString() {
412     return '(' + super.toString() + ",params=" + params + ",name=" + name +
413       ",password=" + password + ",key=" + key + ",connection=" + connection +
414       ",status=" + statusNames[status] + ",addresses=" + addresses + ')';
415   }
416
417   static class ServerAddress {
418     String JavaDoc hostName;
419     int port;
420
421     public ServerAddress(String JavaDoc hostName, int port) {
422       this.hostName = hostName;
423       this.port = port;
424     }
425
426     public String JavaDoc toString() {
427       return "(hostName=" + hostName + ",port=" + port + ')';
428     }
429   }
430 }
431
Popular Tags