1 17 18 package org.apache.sandesha; 19 20 import org.apache.axis.AxisFault; 21 import org.apache.axis.SimpleChain; 22 import org.apache.axis.client.Call; 23 import org.apache.axis.components.logger.LogFactory; 24 import org.apache.commons.logging.Log; 25 import org.apache.sandesha.client.ClientHandlerUtil; 26 import org.apache.sandesha.client.ClientListener; 27 import org.apache.sandesha.client.ClientStorageManager; 28 import org.apache.sandesha.server.InvokeStrategy; 29 import org.apache.sandesha.server.InvokerFactory; 30 import org.apache.sandesha.server.Sender; 31 import org.apache.sandesha.server.ServerStorageManager; 32 import org.apache.sandesha.util.PolicyLoader; 33 import org.apache.sandesha.util.PropertyLoader; 34 35 import java.io.IOException ; 36 import java.net.InetAddress ; 37 import java.net.UnknownHostException ; 38 import java.util.ArrayList ; 39 import java.util.HashMap ; 40 import java.util.Iterator ; 41 42 48 public class SandeshaContext { 49 50 private static final Log log = LogFactory.getLog(SandeshaContext.class.getName()); 51 52 private static boolean rmInvokerStarted = false; 53 private static boolean cleintSenderStarted = false; 54 private static boolean serverSenderStarted = false; 55 private static boolean listenerStarted = false; 56 private static ClientListener clientListner = null; 57 private static Sender cleintSender; 58 private static Sender serverSender; 59 private static boolean insideServer; 60 61 private static HashMap seqMap = new HashMap (); 62 private HashMap callMap = new HashMap (); 63 private long key; 64 65 private String toURL; 66 private String sourceURL; 67 private String replyToURL; 68 69 private String faultToURL; 70 private String fromURL; 71 private String acksToURL; 72 private boolean sendOffer; 73 private long messageNumber; 74 private boolean sync; 75 76 private RMReport report; 77 78 public String getReplyToURL() { 79 return replyToURL; 80 } 81 82 public void setReplyToURL(String replyToURL) { 83 this.replyToURL = replyToURL; 84 } 85 86 public boolean isSync() { 87 return sync; 88 } 89 90 public void setSync(boolean sync) { 91 this.sync = sync; 92 } 93 94 public long getMessageNumber() { 95 return messageNumber; 96 } 97 98 public void setMessageNumber(long messageNumber) { 99 this.messageNumber = messageNumber; 100 } 101 102 public boolean isSendOffer() { 103 return sendOffer; 104 } 105 106 public void setSendOffer(boolean sendOffer) { 107 this.sendOffer = sendOffer; 108 } 109 110 public final String getAcksToURL() { 111 return acksToURL; 112 } 113 114 public void setAcksToURL(String acksToURL) { 115 this.acksToURL = acksToURL; 116 } 117 118 public String getFromURL() { 119 return fromURL; 120 } 121 122 public void setFromURL(String fromURL) { 123 this.fromURL = fromURL; 124 } 125 126 public final String getFaultURL() { 127 return faultToURL; 128 } 129 130 public void setFaultToURL(String faultURL) { 131 this.faultToURL = faultURL; 132 } 133 134 public String getSourceURL() { 135 return sourceURL; 136 } 137 138 public void setSourceURL(String sourceURL) { 139 this.sourceURL = sourceURL; 140 } 141 142 public String getToURL() { 143 return toURL; 144 } 145 146 public void setToURL(String toURL) { 147 this.toURL = toURL; 148 } 149 150 public SandeshaContext() throws AxisFault { 151 messageNumber = 0; 152 key = System.currentTimeMillis(); 153 SandeshaContext.insideServer = false; 154 init(true); 155 startListener(); 156 seqMap.put(new Long (key), this); 157 report = new RMReport(); 158 } 159 160 public SandeshaContext(int sync) throws AxisFault { 161 this.sync = true; 162 messageNumber = 0; 163 key = System.currentTimeMillis(); 164 SandeshaContext.insideServer = false; 165 init(true); 166 seqMap.put(new Long (key), this); 167 report = new RMReport(); 168 } 169 170 public SandeshaContext(boolean insideServer) throws AxisFault { 171 messageNumber = 0; 172 key = System.currentTimeMillis(); 173 SandeshaContext.insideServer = insideServer; 174 init(true); 175 seqMap.put(new Long (key), this); 176 report = new RMReport(); 177 } 178 179 public SandeshaContext(boolean insideServer, int sync) throws AxisFault { 180 this.sync = true; 181 messageNumber = 0; 182 key = System.currentTimeMillis(); 183 SandeshaContext.insideServer = insideServer; 184 init(true); 185 seqMap.put(new Long (key), this); 186 report = new RMReport(); 187 } 188 189 public void initCall(Call call, String targetUrl, String action, short MEP) throws AxisFault { 190 if (toURL != null) 191 call.setProperty(Constants.ClientProperties.TO, toURL); 192 if (sourceURL != null) 193 call.setProperty(Constants.ClientProperties.SOURCE_URL, sourceURL); 194 if (faultToURL != null) 195 call.setProperty(Constants.ClientProperties.FAULT_TO, faultToURL); 196 if (fromURL != null) 197 call.setProperty(Constants.ClientProperties.FROM, fromURL); 198 if (replyToURL != null) 199 call.setProperty(Constants.ClientProperties.REPLY_TO, replyToURL); 200 if (acksToURL != null) 201 call.setProperty(Constants.ClientProperties.ACKS_TO, acksToURL); 202 203 call.setProperty(Constants.ClientProperties.SEND_OFFER, Boolean.valueOf(sendOffer)); 204 call.setProperty(Constants.ClientProperties.SYNC, Boolean.valueOf(sync)); 205 call.setProperty(Constants.CONTEXT, this); 206 207 String key = initialize(call, targetUrl, action, MEP); 208 callMap.put(key, call); 209 } 210 211 public final HashMap getCallMap() { 212 return callMap; 213 } 214 215 public void setCallMap(HashMap callMap) { 216 this.callMap = callMap; 217 } 218 219 public static IStorageManager init(boolean client) throws AxisFault { 220 if (client) { 221 IStorageManager storageManager = new ClientStorageManager(); 222 if (!cleintSenderStarted) { 223 startClientSender(storageManager); 224 } 225 return storageManager; 226 } else { 227 if (!serverSenderStarted) { 228 startServerSender(); 229 } 230 if (!rmInvokerStarted) { 231 InvokeStrategy strategy = null; 232 try { 233 strategy = InvokerFactory.getInstance().createInvokerStrategy(); 234 } catch (Exception e) { 235 log.error(e); 236 throw new AxisFault("Could not start the Invoker."); 237 } 238 strategy.start(); 239 rmInvokerStarted = true; 240 } 241 return new ServerStorageManager(); 242 } 243 } 244 245 private static void startClientSender(IStorageManager storageManager) throws AxisFault { 246 if (log.isDebugEnabled()) { 247 log.debug(Constants.InfomationMessage.SENDER_STARTED); 248 } 249 250 cleintSender = new Sender(storageManager); 251 SimpleChain reqChain = null; 252 SimpleChain resChain = null; 253 try { 254 reqChain = getRequestChain(); 255 resChain = getResponseChain(); 256 } catch (Exception e) { 257 throw new AxisFault(e.getMessage()); 258 } 259 if (reqChain != null) 260 cleintSender.setRequestChain(reqChain); 261 if (resChain != null) 262 cleintSender.setResponseChain(resChain); 263 cleintSender.startSender(); 264 cleintSenderStarted = true; 265 } 266 267 private static void startServerSender() { 268 if (log.isDebugEnabled()) { 269 log.debug(Constants.InfomationMessage.SENDER_STARTED); 270 } 271 serverSender = new Sender(); 272 serverSender.startSender(); 273 serverSenderStarted = true; 274 } 275 276 private void validateProperties(Call call, String targetUrl, String action, short MEP) 277 throws AxisFault { 278 if (action == null) 279 throw new AxisFault("Please sepeicfy Action"); 280 if (targetUrl == null) 281 throw new AxisFault("TargetUrl cannot be null"); 282 if (call == null) 283 throw new AxisFault("Call cannot be null"); 284 if (!(MEP == Constants.ClientProperties.IN_ONLY || MEP == Constants.ClientProperties.IN_OUT)) 285 throw new AxisFault("Invalid MEP"); 286 } 287 288 public final RMReport endSequence() throws AxisFault { 289 290 IStorageManager storageManager = new ClientStorageManager(); 291 long startingTime = System.currentTimeMillis(); 292 long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout(); 293 294 Iterator ite = callMap.keySet().iterator(); 295 296 while (ite.hasNext()) { 297 String key = (String ) ite.next(); 298 Call tempCall = (Call) callMap.get(key); 299 String seqId = (String ) tempCall.getProperty(Constants.ClientProperties.CALL_KEY); 300 while (!storageManager.isSequenceComplete(seqId)) { 301 try { 302 if (log.isDebugEnabled()) { 303 log.debug(Constants.InfomationMessage.WAITING_TO_STOP_CLIENT); 304 } 305 Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE); 306 if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) { 307 stopClientByForce(); 308 this.report.setError("Inactivity Time Out Reached. Sequence not complete"); 309 } 310 } catch (InterruptedException e) { 311 log.error(e); 312 } 313 } 314 } 315 316 if (this.report.getError() == null) { 317 this.report.setAllAcked(true); 318 } 319 320 seqMap.remove(new Long (key)); 321 if (seqMap.isEmpty()) { 322 if (listenerStarted) { 323 clientListner.stop(); 324 listenerStarted = false; 325 } 326 cleintSender.stop(); 327 cleintSenderStarted = false; 328 storageManager.clearStorage(); 329 } 330 331 return this.report; 332 333 } 334 335 336 public void stopClientByForce() throws AxisFault { 337 if (listenerStarted) { 338 clientListner.stop(); 339 listenerStarted = false; 340 } 341 cleintSender.stop(); 342 cleintSenderStarted = false; 343 throw new AxisFault("Inactivity Timeout Reached, No Response from the Server"); 344 } 345 346 private String initialize(Call call, String targetUrl, String action, short MEP) 347 throws AxisFault { 348 validateProperties(call, targetUrl, action, MEP); 349 String keyOfCall = this.key + action; 350 call.setTargetEndpointAddress(targetUrl); 351 call.setProperty(Constants.ClientProperties.ACTION, action); 352 call.setTransport(new RMTransport(targetUrl, "")); 353 call.setProperty(Constants.ClientProperties.MEP, new Short (MEP)); 354 call.setProperty(Constants.ClientProperties.CALL_KEY, keyOfCall); 355 call.setProperty(Constants.ClientProperties.REPORT, this.report); 356 357 if (!insideServer) { 358 InetAddress addr = null; 359 try { 360 addr = InetAddress.getLocalHost(); 361 } catch (UnknownHostException e) { 362 log.error(e); 363 } 364 365 String sourceURL = null; 366 367 sourceURL = Constants.HTTP + Constants.COLON + Constants.SLASH + 368 Constants.SLASH + addr.getHostAddress() + Constants.COLON + 369 PropertyLoader.getClientSideListenerPort() + Constants.URL_RM_SERVICE; 370 371 372 call.setProperty(Constants.ClientProperties.SOURCE_URL, sourceURL); 373 } 374 return keyOfCall; 375 } 376 377 378 private static void startListener() { 379 if (!insideServer) { 380 if (!listenerStarted) { 381 listenerStarted = true; 382 try { 383 clientListner = new ClientListener(PropertyLoader.getClientSideListenerPort()); 384 clientListner.start(); 385 } catch (IOException e) { 386 log.error(e); 387 } 388 } 389 } 390 391 } 392 393 394 private static SimpleChain getRequestChain() { 395 ArrayList arr = PropertyLoader.getRequestHandlerNames(); 396 return ClientHandlerUtil.getHandlerChain(arr); 397 } 398 399 400 private static SimpleChain getResponseChain() { 401 ArrayList arr = PropertyLoader.getResponseHandlerNames(); 402 return ClientHandlerUtil.getHandlerChain(arr); 403 } 404 405 406 public void setLastMessage(Call call) { 407 call.setProperty(Constants.ClientProperties.LAST_MESSAGE, Boolean.valueOf(true)); 408 } 409 410 public boolean isLastMessage(Call call) { 411 return ((Boolean ) call.getProperty(Constants.ClientProperties.LAST_MESSAGE)).booleanValue(); 412 } 413 414 public long getMessageNumber(Call call) { 415 return ((Long ) call.getProperty(Constants.ClientProperties.MSG_NUMBER)).longValue(); 416 } 417 418 public void setMessageNumber(Call call, long msgNumber) { 419 call.setProperty(Constants.ClientProperties.MSG_NUMBER, new Long (msgNumber)); 420 } 421 } 422 | Popular Tags |