1 23 package org.objectweb.joram.client.jms.connection; 24 25 import org.objectweb.joram.shared.client.AbstractJmsRequest; 26 import org.objectweb.joram.shared.client.AbstractJmsReply; 27 import org.objectweb.joram.shared.client.ConsumerMessages; 28 29 import javax.jms.JMSException ; 30 31 import org.objectweb.joram.shared.JoramTracing; 32 import org.objectweb.util.monolog.api.BasicLevel; 33 34 public class Requestor 35 implements ReplyListener, ErrorListener { 36 37 private static class Status { 38 41 public static final int INIT = 0; 42 43 49 public static final int RUN = 1; 50 51 57 public static final int DONE = 2; 58 59 public static final int CLOSE = 3; 60 61 private static final String [] names = { 62 "INIT", "RUN", "DONE", "CLOSE"}; 63 64 public static String toString(int status) { 65 return names[status]; 66 } 67 } 68 69 public static final String DEFAULT_REQUEST_TIMEOUT_PROPERTY = "org.objectweb.joram.client.jms.connection.Requestor.defaultRequestTimeout"; 70 71 public static final long DEFAULT_REQUEST_TIMEOUT_VALUE = 0; 72 73 private long defaultRequestTimeout; 74 75 private RequestMultiplexer mtpx; 76 77 private Object reply; 78 79 private int requestId; 80 81 private int status; 82 83 public Requestor(RequestMultiplexer mtpx) { 84 this.mtpx = mtpx; 85 init(); 86 } 87 88 private void setStatus(int status) { 89 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 90 JoramTracing.dbgClient.log( 91 BasicLevel.DEBUG, "Requestor.setStatus(" + 92 Status.toString(status) + ')'); 93 this.status = status; 94 } 95 96 public final synchronized int getRequestId() { 97 return requestId; 98 } 99 100 private void init() { 101 defaultRequestTimeout = Long.getLong(DEFAULT_REQUEST_TIMEOUT_PROPERTY, 103 DEFAULT_REQUEST_TIMEOUT_VALUE).longValue(); 104 if (status == Status.DONE) { 105 setStatus(Status.INIT); 106 reply = null; 107 requestId = -1; 108 } 109 } 112 113 public synchronized AbstractJmsReply request( 114 AbstractJmsRequest request) 115 throws JMSException { 116 return request(request, defaultRequestTimeout); 117 } 118 119 132 public synchronized AbstractJmsReply request( 133 AbstractJmsRequest request, 134 long timeout) 135 throws JMSException { 136 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 137 JoramTracing.dbgClient.log( 138 BasicLevel.DEBUG, "Requestor.request(" + 139 request + ',' + timeout + ')'); 140 141 if (status != Status.INIT) { 142 if (status == Status.CLOSE) { 143 return null; 145 } else { 146 throw new javax.jms.IllegalStateException ("Requestor already used"); 147 } 148 } 149 mtpx.sendRequest(request, this); 150 setStatus(Status.RUN); 151 requestId = request.getRequestId(); 152 153 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 154 JoramTracing.dbgClient.log( 155 BasicLevel.DEBUG, " -> request #" + requestId + " wait"); 156 157 try { 158 wait(timeout); 159 } catch (InterruptedException exc) { 160 if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN)) 161 JoramTracing.dbgClient.log(BasicLevel.WARN, "", exc); 162 setStatus(Status.DONE); 163 } 164 165 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 166 JoramTracing.dbgClient.log( 167 BasicLevel.DEBUG, " -> request #" + requestId + " awake"); 168 169 try { 170 if (status == Status.RUN) { 171 mtpx.abortRequest(requestId); 174 return null; 175 } else if (status == Status.CLOSE) { 176 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 177 JoramTracing.dbgClient.log( 178 BasicLevel.DEBUG, " -> deny " + reply); 179 if (reply instanceof ConsumerMessages) { 180 mtpx.deny((ConsumerMessages)reply); 181 } 182 return null; 183 } else if (status == Status.DONE) { 184 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 186 JoramTracing.dbgClient.log( 187 BasicLevel.DEBUG, " -> request #" + requestId + " done"); 188 if (reply instanceof AbstractJmsReply) { 189 return (AbstractJmsReply)reply; 190 } else if (reply instanceof JMSException ) { 191 throw (JMSException )reply; 192 } else { 193 return null; 195 } 196 } else throw new Error (); 197 } finally { 198 init(); 199 } 200 } 201 202 public synchronized boolean replyReceived(AbstractJmsReply reply) 203 throws AbortedRequestException { 204 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 205 JoramTracing.dbgClient.log( 206 BasicLevel.DEBUG, "Requestor.replyReceived(" + 207 reply + ')'); 208 209 if (status == Status.RUN && 210 reply.getCorrelationId() == requestId) { 211 this.reply = reply; 212 setStatus(Status.DONE); 213 notify(); 214 return true; 215 } else { 216 throw new AbortedRequestException(); 218 } 219 } 220 221 public synchronized void errorReceived( 222 int replyId, JMSException exc) { 223 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 224 JoramTracing.dbgClient.log( 225 BasicLevel.DEBUG, "Requestor.errorReceived(" + 226 replyId + ',' + exc + ')'); 227 228 if (status == Status.RUN && 229 replyId == requestId) { 230 reply = exc; 231 setStatus(Status.DONE); 232 notify(); 233 } 234 } 237 238 public synchronized void replyAborted(int replyId) { 239 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 240 JoramTracing.dbgClient.log( 241 BasicLevel.DEBUG, "Requestor.replyAborted(" + 242 replyId + ')'); 243 if (status == Status.RUN && 244 replyId == requestId) { 245 reply = null; 246 setStatus(Status.DONE); 247 notify(); 248 } 249 } 252 253 public synchronized void abortRequest() { 254 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 255 JoramTracing.dbgClient.log( 256 BasicLevel.DEBUG, 257 "Requestor[" + Status.toString(status) + 258 ',' + requestId + "].abortRequest()"); 259 if (status == Status.RUN && requestId > 0) { 260 mtpx.abortRequest(requestId); 261 setStatus(Status.DONE); 262 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 263 JoramTracing.dbgClient.log( 264 BasicLevel.DEBUG, " -> notify requestor"); 265 notify(); 266 } 267 } 270 271 public synchronized void close() { 272 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 273 JoramTracing.dbgClient.log( 274 BasicLevel.DEBUG, "Requestor.close()"); 275 if (status != Status.CLOSE) { 276 abortRequest(); 277 setStatus(Status.CLOSE); 278 } 279 } 281 } 282 | Popular Tags |