1 54 package extensions.wsrm; 55 56 import org.apache.axis.MessageContext; 57 import org.apache.axis.AxisFault; 58 import org.apache.axis.client.Call; 59 import org.apache.axis.message.SOAPEnvelope; 60 import org.apache.axis.message.SOAPHeaderElement; 61 import org.apache.axis.message.PrefixedQName; 62 import org.apache.axis.message.MessageElement; 63 import org.apache.axis.handlers.BasicHandler; 64 65 import javax.xml.namespace.QName ; 66 import java.util.*; 67 68 public class AckService extends BasicHandler implements RMConstants { 69 static RetryGuy myRetryGuy = null; 70 71 75 class RetryGuy implements Runnable { 76 public void run() { 77 while (true) { 78 try { 79 Thread.sleep(3000); 80 } catch (InterruptedException e) { 81 e.printStackTrace(); 82 } 83 84 Iterator i = sequences.values().iterator(); 85 while (i.hasNext()) { 86 MySequence seq = (MySequence)i.next(); 87 synchronized (seq.activeMessages) { 88 Iterator msgs = seq.activeMessages.iterator(); 89 while (msgs.hasNext()) { 90 MessageRecord msg = (MessageRecord)msgs.next(); 91 if (msg.timestamp < (new Date().getTime() - 20000)) { 92 try { 93 resendMsg(msg, seq.destination); 94 } catch (Exception e) { 95 e.printStackTrace(); } 97 } 98 } 99 } 100 } 101 } 102 } 103 } 104 105 class MySequence { 106 String id; 107 String destination; 108 109 int currentMsgNum = 1; 110 111 LinkedList activeMessages = new LinkedList(); 113 } 114 115 class MessageRecord { 116 int sequenceNumber; 117 long timestamp = new Date().getTime(); 118 SOAPEnvelope env; 119 } 120 121 void resendMsg(MessageRecord msg, String destination) throws Exception { 122 System.out.println("Resending : dest = " + destination + ", msg " + msg.sequenceNumber); 123 124 Call call = new Call(destination); 125 call.setProperty("OneWay", Boolean.TRUE); 126 call.invoke(msg.env); 127 msg.timestamp = new Date().getTime(); 128 } 129 130 Map sequences = new HashMap(); 131 132 public void invoke(MessageContext msgContext) throws AxisFault { 133 processAck(msgContext.getRequestMessage().getSOAPEnvelope()); 134 } 135 136 public void processAck(SOAPEnvelope req) throws AxisFault 137 { 138 SOAPHeaderElement header = req.getHeaderByName(NS_URI_WSRM, "SequenceAcknowledgement"); 139 if (header == null) 140 return; 142 Iterator i = header.getChildElements(new PrefixedQName(NS_URI_WSU, "Identifier", null)); 143 if (!i.hasNext()) { 144 throw new AxisFault("WSRM.Fault", "Missing identifier in Sequence", null, null); 146 } 147 148 MessageElement el = (MessageElement)i.next(); 149 String id = el.getValue(); 150 MySequence seq = (MySequence)sequences.get(id); 151 if (seq == null) { 152 throw new AxisFault("WSRM.UnknownSequence", "Don't recognize ack of sequence '" + id + "'", null, null); 154 } 155 156 i = header.getChildElements(new PrefixedQName(NS_URI_WSRM, "AcknowledgementRange", null)); 157 while (i.hasNext()) { 158 el = (MessageElement)i.next(); 159 String val = el.getAttributeValue("Upper"); 160 int upper = Integer.parseInt(val); 161 val = el.getAttributeValue("Lower"); 162 int lower = Integer.parseInt(val); 163 acknowledgeRange(seq, lower, upper); 164 } 165 166 header.setProcessed(true); 167 168 header = req.getHeaderByName(NS_URI_WSA, "From"); 169 if (header != null) { 170 header.setProcessed(true); 171 } 172 173 header = req.getHeaderByName(NS_URI_WSA, "To"); 174 if (header != null) { 175 header.setProcessed(true); 176 } 177 178 header = req.getHeaderByName(NS_URI_WSA, "MessageID"); 179 if (header != null) { 180 header.setProcessed(true); 181 } 182 183 header = req.getHeaderByName(NS_URI_WSA, "Action"); 184 if (header != null) { 185 header.setProcessed(true); 186 } 187 } 188 189 197 public void acknowledgeRange(MySequence seq, int min, int max) { 198 LinkedList activeMessages = seq.activeMessages; 199 synchronized (activeMessages) { 200 for (int i = 0; i < activeMessages.size(); i++) { 201 MessageRecord curMsg = (MessageRecord)activeMessages.get(i); 202 if (min <= curMsg.sequenceNumber && max >= curMsg.sequenceNumber) { 203 System.out.println("Removed msg #" + curMsg.sequenceNumber); 204 activeMessages.remove(i); 205 i--; 206 } 207 } 208 } 209 } 210 211 224 public void doit(SOAPEnvelope env, 225 String toAddr, 226 String fromAddr, 227 String identifier, 228 boolean isLast, 229 boolean skip) throws Exception { 230 MySequence myseq = (MySequence)sequences.get(identifier); 231 if (myseq == null) { 232 myseq = new MySequence(); 234 myseq.id = identifier; 235 myseq.destination = toAddr; 236 sequences.put(identifier, myseq); 237 } 238 239 int curMsgNum = myseq.currentMsgNum++; 240 if (skip) { 241 curMsgNum = myseq.currentMsgNum++; 242 } 243 Integer seq = new Integer (curMsgNum); 244 String myAddr = fromAddr; 245 246 SOAPHeaderElement header = 247 new SOAPHeaderElement(SEQUENCE_QNAME.getNamespaceURI(), 248 SEQUENCE_QNAME.getLocalPart()); 249 MessageElement el; 250 251 el = new MessageElement(IDENTIFIER_QNAME, identifier); 252 header.addChild(el); 253 254 el = new MessageElement(MSGNUM_QNAME, seq); 255 header.addChild(el); 256 if (isLast) { 257 el = new MessageElement(NS_URI_WSRM, "LastMessage"); 258 header.addChild(el); 259 } 260 261 env.addHeader(header); 262 263 header = new SOAPHeaderElement(NS_URI_WSA, "Action", PING_URI); 264 header.setMustUnderstand(true); 265 env.addHeader(header); 266 267 header = new SOAPHeaderElement(NS_URI_WSA, "From"); 268 header.setMustUnderstand(true); 269 el = new MessageElement(new QName(NS_URI_WSA, "Address"), myAddr); 270 header.addChild(el); 271 env.addHeader(header); 272 273 header = new SOAPHeaderElement(NS_URI_WSA, "To", toAddr); 274 header.setMustUnderstand(true); 275 env.addHeader(header); 276 277 header = new SOAPHeaderElement(NS_URI_WSA, "MessageID", ReliableMessagingHandler.generateNewMsgID()); 278 header.setMustUnderstand(true); 279 env.addHeader(header); 280 281 MessageRecord msgRec = new MessageRecord(); 282 msgRec.env = env; 283 msgRec.sequenceNumber = curMsgNum; 284 myseq.activeMessages.add(msgRec); 285 286 Iterator seqs = ReliableMessagingHandler.sequences.values().iterator(); 288 while (seqs.hasNext()) { 289 Sequence s = (Sequence)seqs.next(); 290 if (s.endpoint.equals(toAddr)) { 291 ReliableMessagingHandler.generateAck(env, s); 292 } 293 } 294 295 synchronized (this) { 296 if (myRetryGuy == null) { 297 myRetryGuy = new RetryGuy(); 298 new Thread (myRetryGuy).start(); 299 } 300 } 301 } 302 } 303 | Popular Tags |