1 24 package org.objectweb.joram.mom.dest; 25 26 import fr.dyade.aaa.agent.AgentId; 27 import fr.dyade.aaa.agent.Channel; 28 import fr.dyade.aaa.agent.DeleteNot; 29 import fr.dyade.aaa.agent.Notification; 30 import fr.dyade.aaa.agent.UnknownNotificationException; 31 import org.objectweb.joram.mom.MomTracing; 32 import org.objectweb.joram.mom.notifications.*; 33 import org.objectweb.joram.mom.util.*; 34 import org.objectweb.joram.shared.excepts.*; 35 import org.objectweb.joram.shared.messages.Message; 36 37 import java.util.Enumeration ; 38 import java.util.Hashtable ; 39 import java.util.Properties ; 40 import java.util.Vector ; 41 42 import org.objectweb.util.monolog.api.BasicLevel; 43 44 45 54 public class BridgeTopicImpl extends TopicImpl { 55 56 private BridgeUnifiedModule jmsModule; 57 58 59 private long arrivalsCounter = 0; 60 67 private Hashtable outTable; 68 69 75 public BridgeTopicImpl(AgentId destId, AgentId adminId) 76 { 77 super(destId, adminId); 78 outTable = new Hashtable (); 79 } 80 81 82 public String toString() 83 { 84 return "BridgeTopicImpl:" + destId.toString(); 85 } 86 87 88 94 public void init(Properties prop) { 95 String jmsMode = (String ) prop.get("jmsMode"); 96 97 if (jmsMode == null) 98 throw new IllegalArgumentException ("Missing jmsMode property"); 99 100 if (jmsMode.equalsIgnoreCase("PTP")) 101 jmsModule = new BridgePtpModule(); 102 else if (jmsMode.equalsIgnoreCase("PubSub")) 103 jmsModule = new BridgePubSubModule(); 104 else if (jmsMode.equalsIgnoreCase("Unified")) 105 jmsModule = new BridgeUnifiedModule(); 106 else 107 throw new IllegalArgumentException ("Invalid jmsMode value: " + jmsMode); 108 109 jmsModule.init(destId, prop); 111 } 112 113 117 public void react(AgentId from, Notification not) 118 throws UnknownNotificationException 119 { 120 if (not instanceof BridgeDeliveryNot) 121 doReact((BridgeDeliveryNot) not); 122 else if (not instanceof BridgeAckNot) 123 doReact((BridgeAckNot) not); 124 else 125 super.react(from, not); 126 } 127 128 132 protected void doReact(BridgeDeliveryNot not) 133 { 134 ClientMessages clientMessages = new ClientMessages(); 135 clientMessages.addMessage(not.getMessage()); 136 super.doProcess(clientMessages); 137 } 138 139 143 protected void doReact(BridgeAckNot not) 144 { 145 outTable.remove(not.getIdentifier()); 146 } 147 148 156 protected void doReact(AgentId from, SubscribeRequest not) 157 throws AccessException 158 { 159 super.doReact(from, not); 160 161 try { 163 if (subscribers.size() == 1) 164 jmsModule.setMessageListener(); 165 } 166 catch (Exception exc) { 167 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 168 MomTracing.dbgDestination.log(BasicLevel.ERROR, 169 "Failing subscribe request on remote " 170 + "destination: " + exc); 171 } 172 } 173 174 182 protected void doReact(AgentId from, UnsubscribeRequest not) 183 { 184 if (subscribers.isEmpty()) 186 jmsModule.unsetMessageListener(); 187 188 super.doReact(from, not); 189 } 190 191 199 protected void doReact(AgentId from, TopicForwardNot not) 200 { 201 if (not.toFather && fatherId != null) 203 Channel.sendTo(fatherId, not); 204 205 Message msg; 207 for (Enumeration msgs = not.messages.getMessages().elements(); 208 msgs.hasMoreElements();) { 209 210 if (arrivalsCounter == Long.MAX_VALUE) 211 arrivalsCounter = 0; 212 213 msg = (Message) msgs.nextElement(); 214 msg.order = arrivalsCounter++; 215 216 outTable.put(msg.getIdentifier(), msg); 217 218 try { 219 jmsModule.send(msg); 220 } 221 catch (Exception exc) { 222 outTable.remove(msg.getIdentifier()); 223 ClientMessages deadM = new ClientMessages(); 224 deadM.addMessage(msg); 225 sendToDMQ(deadM, null); 226 } 227 } 228 } 229 230 237 protected void doProcess(ClientMessages not) 238 { 239 forwardMessages(not); 241 242 Message msg; 244 for (Enumeration msgs = not.getMessages().elements(); 245 msgs.hasMoreElements();) { 246 247 if (arrivalsCounter == Long.MAX_VALUE) 248 arrivalsCounter = 0; 249 250 msg = (Message) msgs.nextElement(); 251 msg.order = arrivalsCounter++; 252 253 outTable.put(msg.getIdentifier(), msg); 254 255 try { 256 jmsModule.send(msg); 257 } 258 catch (Exception exc) { 259 outTable.remove(msg.getIdentifier()); 260 ClientMessages deadM; 261 deadM = new ClientMessages(not.getClientContext(), not.getRequestId()); 262 deadM.addMessage(msg); 263 sendToDMQ(deadM, not.getDMQId()); 264 } 265 } 266 } 267 268 275 protected void doProcess(DeleteNot not) 276 { 277 jmsModule.close(); 278 super.doProcess(not); 279 } 280 281 282 private void readObject(java.io.ObjectInputStream in) 283 throws java.io.IOException , ClassNotFoundException  284 { 285 in.defaultReadObject(); 286 287 try { 289 jmsModule.connect(); 290 291 if (! subscribers.isEmpty()) 292 jmsModule.setMessageListener(); 293 294 Message msg; 296 Vector outMessages = new Vector (); 297 Message currentMsg; 298 for (Enumeration keys = outTable.keys(); keys.hasMoreElements();) { 299 msg = (Message) outTable.get(keys.nextElement()); 300 301 int i = 0; 302 while (i < outMessages.size()) { 303 currentMsg = (Message) outMessages.get(i); 304 305 if (msg.order < currentMsg.order) 306 break; 307 308 i++; 309 } 310 outMessages.insertElementAt(msg, i); 311 } 312 313 while (! outMessages.isEmpty()) { 314 msg = (Message) outMessages.remove(0); 315 jmsModule.send(msg); 316 } 317 } 318 catch (Exception exc) { 319 if (MomTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 320 MomTracing.dbgDestination.log(BasicLevel.ERROR, "" + exc); 321 } 322 } 323 } 324 | Popular Tags |