1 24 package org.objectweb.joram.mom.dest; 25 26 import fr.dyade.aaa.agent.AgentId; 27 import fr.dyade.aaa.agent.Channel; 28 import fr.dyade.aaa.agent.DeleteNot; 29 import fr.dyade.aaa.agent.Notification; 30 import fr.dyade.aaa.agent.UnknownNotificationException; 31 import org.objectweb.joram.mom.MomTracing; 32 import org.objectweb.joram.mom.notifications.*; 33 import org.objectweb.joram.mom.util.*; 34 import org.objectweb.joram.shared.excepts.*; 35 import org.objectweb.joram.shared.messages.Message; 36 import org.objectweb.joram.shared.selectors.Selector; 37 38 import java.io.IOException ; 39 import java.util.Enumeration ; 40 import java.util.Hashtable ; 41 import java.util.Properties ; 42 import java.util.Vector ; 43 44 import org.objectweb.util.monolog.api.BasicLevel; 45 46 55 public class BridgeQueueImpl extends QueueImpl { 56 57 private BridgeUnifiedModule jmsModule; 58 65 private Hashtable outTable; 66 67 73 public BridgeQueueImpl(AgentId destId, AgentId adminId) 74 { 75 super(destId, adminId); 76 outTable = new Hashtable (); 77 } 78 79 80 public String toString() 81 { 82 return "BridgeQueueImpl:" + destId.toString(); 83 } 84 85 86 92 public void init(Properties prop) { 93 String jmsMode = (String ) prop.get("jmsMode"); 94 95 if (jmsMode == null) 96 throw new IllegalArgumentException ("Missing 'jmsMode' property"); 97 98 if (jmsMode.equalsIgnoreCase("PTP")) 99 jmsModule = new BridgePtpModule(); 100 else if (jmsMode.equalsIgnoreCase("PubSub")) 101 jmsModule = new BridgePubSubModule(); 102 else if (jmsMode.equalsIgnoreCase("Unified")) 103 jmsModule = new BridgeUnifiedModule(); 104 else 105 throw new IllegalArgumentException ("Invalid 'jmsMode' value: " 106 + jmsMode); 107 108 jmsModule.init(destId, prop); 110 } 111 112 116 public void react(AgentId from, Notification not) 117 throws UnknownNotificationException 118 { 119 if (not instanceof BridgeDeliveryNot) { 120 doReact((BridgeDeliveryNot) not); 121 } 122 else if (not instanceof BridgeAckNot) 123 doReact((BridgeAckNot) not); 124 else 125 super.react(from, not); 126 } 127 128 132 protected void doReact(BridgeDeliveryNot not) 133 { 134 ClientMessages clientMessages = new ClientMessages(); 135 clientMessages.addMessage(not.getMessage()); 136 super.doProcess(clientMessages); 137 } 138 139 143 protected void doReact(BridgeAckNot not) 144 { 145 outTable.remove(not.getIdentifier()); 146 } 147 148 157 protected void doReact(AgentId from, ReceiveRequest not) 158 throws AccessException 159 { 160 if (! isReader(from)) 162 throw new AccessException("READ right not granted"); 163 164 not.requester = from; 166 not.setExpiration(System.currentTimeMillis()); 167 requests.add(not); 168 169 int reqIndex = requests.size() - 1; 171 deliverMessages(reqIndex); 172 173 if ((requests.size() - 1) == reqIndex) { 175 if (not.getTimeOut() == -1) { 178 requests.remove(reqIndex); 179 180 Message msg = null; 181 182 try { 183 msg = jmsModule.receiveNoWait(); 184 } 185 catch (Exception exc) { 187 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 188 MomTracing.dbgDestination.log(BasicLevel.ERROR, 189 "Failing receive request on remote " 190 + "destination: " + exc); 191 } 192 193 if (msg != null && ! Selector.matches(msg, not.getSelector())) 195 msg = null; 196 197 QueueMsgReply reply = new QueueMsgReply(not); 198 reply.addMessage(msg); 199 Channel.sendTo(from, reply); 200 201 if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 202 MomTracing.dbgDestination.log(BasicLevel.DEBUG, 203 "Receive answered."); 204 } 205 else { 207 try { 208 jmsModule.receive(); 209 } 210 catch (Exception exc) { 212 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 213 MomTracing.dbgDestination.log(BasicLevel.ERROR, 214 "Failing receive request on remote " 215 + "destination: " + exc); 216 } 217 } 218 } 219 } 220 221 227 protected void doProcess(ClientMessages not) 228 { 229 Message msg; 231 for (Enumeration msgs = not.getMessages().elements(); 232 msgs.hasMoreElements();) { 233 234 if (arrivalsCounter == Long.MAX_VALUE) 235 arrivalsCounter = 0; 236 237 msg = (Message) msgs.nextElement(); 238 msg.order = arrivalsCounter++; 239 240 outTable.put(msg.getIdentifier(), msg); 241 242 try { 243 jmsModule.send(msg); 244 } 245 catch (Exception exc) { 246 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 247 MomTracing.dbgDestination.log(BasicLevel.ERROR, 248 "Failing sending to remote " 249 + "destination: " 250 + exc); 251 252 outTable.remove(msg.getIdentifier()); 253 ClientMessages deadM; 254 deadM = new ClientMessages(not.getClientContext(), not.getRequestId()); 255 deadM.addMessage(msg); 256 sendToDMQ(deadM, not.getDMQId()); 257 } 258 } 259 } 260 261 268 protected void doProcess(DeleteNot not) 269 { 270 jmsModule.close(); 271 super.doProcess(not); 272 } 273 274 275 276 private void readObject(java.io.ObjectInputStream in) 277 throws IOException , ClassNotFoundException 278 { 279 in.defaultReadObject(); 280 281 messages = new Vector (); 282 deliveredMsgs = new Hashtable (); 283 284 Vector persistedMsgs = MessagePersistenceModule.loadAll(getDestinationId()); 286 287 if (persistedMsgs != null) { 288 Message persistedMsg; 291 AgentId consId; 292 while (! persistedMsgs.isEmpty()) { 293 persistedMsg = (Message) persistedMsgs.remove(0); 294 consId = (AgentId) consumers.get(persistedMsg.getIdentifier()); 295 if (consId == null) 296 storeMessage(persistedMsg); 297 else { 298 deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg); 299 persistedMsg.save(getDestinationId()); 300 } 301 } 302 } 303 304 try { 306 jmsModule.connect(); 307 308 for (int i = 0; i < requests.size(); i++) 310 jmsModule.receive(); 311 312 Message msg; 314 Vector outMessages = new Vector (); 315 Message currentMsg; 316 for (Enumeration keys = outTable.keys(); keys.hasMoreElements();) { 317 msg = (Message) outTable.get(keys.nextElement()); 318 319 int i = 0; 320 while (i < outMessages.size()) { 321 currentMsg = (Message) outMessages.get(i); 322 323 if (msg.order < currentMsg.order) 324 break; 325 326 i++; 327 } 328 outMessages.insertElementAt(msg, i); 329 } 330 331 while (! outMessages.isEmpty()) { 332 msg = (Message) outMessages.remove(0); 333 jmsModule.send(msg); 334 } 335 } 336 catch (Exception exc) { 337 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 338 MomTracing.dbgDestination.log(BasicLevel.ERROR, "" + exc); 339 } 340 } 341 } 342 | Popular Tags |