1 24 package org.objectweb.joram.mom.dest.bridge; 25 26 import java.util.Enumeration ; 27 import java.util.Hashtable ; 28 import java.util.Properties ; 29 import java.util.Vector ; 30 31 import org.objectweb.joram.mom.dest.TopicForwardNot; 32 import org.objectweb.joram.mom.dest.TopicImpl; 33 import org.objectweb.joram.mom.messages.Message; 34 import org.objectweb.joram.mom.notifications.ClientMessages; 35 import org.objectweb.joram.mom.notifications.SubscribeRequest; 36 import org.objectweb.joram.mom.notifications.UnsubscribeRequest; 37 import org.objectweb.joram.shared.JoramTracing; 38 import org.objectweb.joram.shared.excepts.AccessException; 39 import org.objectweb.util.monolog.api.BasicLevel; 40 41 import fr.dyade.aaa.agent.AgentId; 42 import fr.dyade.aaa.agent.DeleteNot; 43 44 53 public class BridgeTopicImpl extends TopicImpl { 54 55 private BridgeModule jmsModule; 56 57 58 private long arrivalsCounter = 0; 59 60 67 private Hashtable outTable; 68 69 76 public BridgeTopicImpl(AgentId destId, AgentId adminId, Properties prop) { 77 super(destId, adminId, prop); 78 outTable = new Hashtable (); 79 jmsModule = new BridgeModule(); 80 81 jmsModule.init(destId, prop); 83 } 84 85 public String toString() { 86 return "BridgeTopicImpl:" + destId.toString(); 87 } 88 89 93 public void bridgeDeliveryNot(AgentId from, BridgeDeliveryNot not) { 94 ClientMessages clientMessages = new ClientMessages(); 95 clientMessages.addMessage(not.getMessage()); 96 super.doClientMessages(destId, clientMessages); 97 } 98 99 103 public void bridgeAckNot(BridgeAckNot not) { 104 outTable.remove(not.getIdentifier()); 105 } 106 107 115 public void postSubscribe(SubscribeRequest not) { 116 117 try { 119 if (subscribers.size() == 1) 120 jmsModule.setMessageListener(); 121 } catch (Exception exc) { 122 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 123 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 124 "Failing subscribe request on remote destination: ", exc); 125 } 126 } 127 128 136 public void preUnsubscribe(UnsubscribeRequest not) { 137 if (subscribers.isEmpty()) 139 jmsModule.unsetMessageListener(); 140 } 141 142 150 public void topicForwardNot(AgentId from, TopicForwardNot not) { 151 if (not.toFather && fatherId != null) 153 forward(fatherId, not); 154 155 Message message; 157 for (Enumeration msgs = not.messages.getMessages().elements(); 158 msgs.hasMoreElements();) { 159 message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 161 message.order = arrivalsCounter++; 162 163 outTable.put(message.getIdentifier(), message); 164 165 try { 166 jmsModule.send(message.msg); 167 } catch (Exception exc) { 168 outTable.remove(message.getIdentifier()); 169 ClientMessages deadM; 170 deadM = new ClientMessages(); 171 deadM.addMessage(message.msg); 172 sendToDMQ(deadM, null); 173 } 174 } 175 } 176 177 184 public ClientMessages preProcess(AgentId from, ClientMessages not) { 185 if (destId.equals(from)) 186 return not; 187 188 forwardMessages(not); 190 191 Message message; 193 for (Enumeration msgs = not.getMessages().elements(); 194 msgs.hasMoreElements();) { 195 message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 196 message.order = arrivalsCounter++; 197 198 outTable.put(message.getIdentifier(), message); 199 200 try { 201 jmsModule.send(message.msg); 202 } catch (Exception exc) { 203 outTable.remove(message.getIdentifier()); 204 ClientMessages deadM; 205 deadM = new ClientMessages(not.getClientContext(), not.getRequestId()); 206 deadM.addMessage(message.msg); 207 sendToDMQ(deadM, not.getDMQId()); 208 } 209 } 210 return null; 211 } 212 213 220 protected void doDeleteNot(DeleteNot not) { 221 jmsModule.close(); 222 super.doDeleteNot(not); 223 } 224 225 226 private void readObject(java.io.ObjectInputStream in) 227 throws java.io.IOException , ClassNotFoundException { 228 in.defaultReadObject(); 229 230 try { 232 jmsModule.connect(); 233 234 if (! subscribers.isEmpty()) 235 jmsModule.setMessageListener(); 236 237 Message momMsg; 239 Vector outMessages = new Vector (); 240 Message currentMsg; 241 for (Enumeration keys = outTable.keys(); keys.hasMoreElements();) { 242 momMsg = (Message) outTable.get(keys.nextElement()); 243 244 int i = 0; 245 while (i < outMessages.size()) { 246 currentMsg = (Message) outMessages.get(i); 247 248 if (momMsg.order < currentMsg.order) 249 break; 250 251 i++; 252 } 253 outMessages.insertElementAt(momMsg, i); 254 } 255 256 while (! outMessages.isEmpty()) { 257 momMsg = (Message) outMessages.remove(0); 258 jmsModule.send(momMsg.msg); 259 } 260 } 261 catch (Exception exc) { 262 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 263 JoramTracing.dbgDestination.log(BasicLevel.ERROR, "", exc); 264 } 265 } 266 } 267 | Popular Tags |