KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > connection > Requestor


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2004 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.client.jms.connection;
24
25 import org.objectweb.joram.shared.client.AbstractJmsRequest;
26 import org.objectweb.joram.shared.client.AbstractJmsReply;
27 import org.objectweb.joram.shared.client.ConsumerMessages;
28
29 import javax.jms.JMSException JavaDoc;
30
31 import org.objectweb.joram.shared.JoramTracing;
32 import org.objectweb.util.monolog.api.BasicLevel;
33
34 public class Requestor
35     implements ReplyListener, ErrorListener {
36
37   private static class Status {
38     /**
39      * The requestor is free: it can be called by a client thread.
40      */

41     public static final int INIT = 0;
42
43     /**
44      * The requestor is busy: the client thread is waiting.
45      * Two threads can make a call:
46      * 1- the demultiplexer thread can call replyReceived and replyAborted.
47      * 2- another client thread can abort the request.
48      */

49     public static final int RUN = 1;
50
51     /**
52      * The requestor is either completed (by the demultiplxer thread) or
53      * aborted (by another client thread or a timeout).
54      * This state is transitional. It enables the requesting client thread to
55      * finalize its request.
56      */

57     public static final int DONE = 2;
58
59     public static final int CLOSE = 3;
60
61     private static final String JavaDoc[] names = {
62       "INIT", "RUN", "DONE", "CLOSE"};
63
64     public static String JavaDoc toString(int status) {
65       return names[status];
66     }
67   }
68
69   public static final String JavaDoc DEFAULT_REQUEST_TIMEOUT_PROPERTY = "org.objectweb.joram.client.jms.connection.Requestor.defaultRequestTimeout";
70
71   public static final long DEFAULT_REQUEST_TIMEOUT_VALUE = 0;
72
73   private long defaultRequestTimeout;
74
75   private RequestMultiplexer mtpx;
76
77   private Object JavaDoc reply;
78
79   private int requestId;
80
81   private int status;
82
83   public Requestor(RequestMultiplexer mtpx) {
84     this.mtpx = mtpx;
85     init();
86   }
87
88   private void setStatus(int status) {
89     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
90       JoramTracing.dbgClient.log(
91         BasicLevel.DEBUG, "Requestor.setStatus(" +
92         Status.toString(status) + ')');
93     this.status = status;
94   }
95
96   public final synchronized int getRequestId() {
97     return requestId;
98   }
99
100   private void init() {
101    // set the default request timeout
102
defaultRequestTimeout = Long.getLong(DEFAULT_REQUEST_TIMEOUT_PROPERTY,
103                                         DEFAULT_REQUEST_TIMEOUT_VALUE).longValue();
104     if (status == Status.DONE) {
105       setStatus(Status.INIT);
106       reply = null;
107       requestId = -1;
108     }
109     // Else the requestor can be closed.
110
// Nothing to do.
111
}
112
113   public synchronized AbstractJmsReply request(
114     AbstractJmsRequest request)
115     throws JMSException JavaDoc {
116     return request(request, defaultRequestTimeout);
117   }
118
119   /**
120    * Method sending a synchronous request to the server and waiting for an
121    * answer.
122    *
123    * @exception IllegalStateException If the connection is closed or broken,
124    * if the server state does not allow to
125    * process the request.
126    * @exception JMSSecurityException When sending a request to a destination
127    * not accessible because of security.
128    * @exception InvalidDestinationException When sending a request to a
129    * destination that no longer exists.
130    * @exception JMSException If the request failed for any other reason.
131    */

132   public synchronized AbstractJmsReply request(
133     AbstractJmsRequest request,
134     long timeout)
135     throws JMSException JavaDoc {
136     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
137       JoramTracing.dbgClient.log(
138         BasicLevel.DEBUG, "Requestor.request(" +
139         request + ',' + timeout + ')');
140
141     if (status != Status.INIT) {
142       if (status == Status.CLOSE) {
143         // throw new javax.jms.IllegalStateException("Closed requestor");
144
return null;
145       } else {
146         throw new javax.jms.IllegalStateException JavaDoc("Requestor already used");
147       }
148     }
149     mtpx.sendRequest(request, this);
150     setStatus(Status.RUN);
151     requestId = request.getRequestId();
152     
153     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
154       JoramTracing.dbgClient.log(
155         BasicLevel.DEBUG, " -> request #" + requestId + " wait");
156     
157     try {
158       wait(timeout);
159     } catch (InterruptedException JavaDoc exc) {
160       if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN))
161         JoramTracing.dbgClient.log(BasicLevel.WARN, "", exc);
162       setStatus(Status.DONE);
163     }
164
165     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
166       JoramTracing.dbgClient.log(
167         BasicLevel.DEBUG, " -> request #" + requestId + " awake");
168     
169     try {
170       if (status == Status.RUN) {
171         // Means that the wait ended with a timeout.
172
// Abort the request.
173
mtpx.abortRequest(requestId);
174         return null;
175       } else if (status == Status.CLOSE) {
176         if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
177           JoramTracing.dbgClient.log(
178             BasicLevel.DEBUG, " -> deny " + reply);
179         if (reply instanceof ConsumerMessages) {
180           mtpx.deny((ConsumerMessages)reply);
181         }
182         return null;
183       } else if (status == Status.DONE) {
184         // Status
185
if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
186           JoramTracing.dbgClient.log(
187             BasicLevel.DEBUG, " -> request #" + requestId + " done");
188         if (reply instanceof AbstractJmsReply) {
189           return (AbstractJmsReply)reply;
190         } else if (reply instanceof JMSException JavaDoc) {
191           throw (JMSException JavaDoc)reply;
192         } else {
193           // Reply aborted or thread interrupted.
194
return null;
195         }
196       } else throw new Error JavaDoc();
197     } finally {
198       init();
199     }
200   }
201
202   public synchronized boolean replyReceived(AbstractJmsReply reply)
203     throws AbortedRequestException {
204     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
205       JoramTracing.dbgClient.log(
206         BasicLevel.DEBUG, "Requestor.replyReceived(" +
207         reply + ')');
208
209     if (status == Status.RUN &&
210         reply.getCorrelationId() == requestId) {
211       this.reply = reply;
212       setStatus(Status.DONE);
213       notify();
214       return true;
215     } else {
216       // The request has been aborted.
217
throw new AbortedRequestException();
218     }
219   }
220
221   public synchronized void errorReceived(
222     int replyId, JMSException JavaDoc exc) {
223     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
224       JoramTracing.dbgClient.log(
225         BasicLevel.DEBUG, "Requestor.errorReceived(" +
226         replyId + ',' + exc + ')');
227     
228     if (status == Status.RUN &&
229         replyId == requestId) {
230       reply = exc;
231       setStatus(Status.DONE);
232       notify();
233     }
234     // Else The request has been aborted.
235
// Do nothing
236
}
237   
238   public synchronized void replyAborted(int replyId) {
239     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
240       JoramTracing.dbgClient.log(
241         BasicLevel.DEBUG, "Requestor.replyAborted(" +
242         replyId + ')');
243     if (status == Status.RUN &&
244         replyId == requestId) {
245       reply = null;
246       setStatus(Status.DONE);
247       notify();
248     }
249     // Else the request has been aborted.
250
// Do nothing
251
}
252
253   public synchronized void abortRequest() {
254     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
255       JoramTracing.dbgClient.log(
256         BasicLevel.DEBUG,
257         "Requestor[" + Status.toString(status) +
258         ',' + requestId + "].abortRequest()");
259     if (status == Status.RUN && requestId > 0) {
260       mtpx.abortRequest(requestId);
261       setStatus(Status.DONE);
262       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
263         JoramTracing.dbgClient.log(
264           BasicLevel.DEBUG, " -> notify requestor");
265       notify();
266     }
267     // Else the request has been completed.
268
// Do nothing
269
}
270
271   public synchronized void close() {
272     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
273       JoramTracing.dbgClient.log(
274         BasicLevel.DEBUG, "Requestor.close()");
275     if (status != Status.CLOSE) {
276       abortRequest();
277       setStatus(Status.CLOSE);
278     }
279     // Else idempotent.
280
}
281 }
282
Popular Tags