1 24 package org.objectweb.joram.mom.dest.bridge; 25 26 import java.io.IOException ; 27 import java.util.Enumeration ; 28 import java.util.Hashtable ; 29 import java.util.Properties ; 30 import java.util.Vector ; 31 32 import org.objectweb.joram.mom.dest.QueueImpl; 33 import org.objectweb.joram.mom.messages.Message; 34 import org.objectweb.joram.mom.notifications.ClientMessages; 35 import org.objectweb.joram.mom.notifications.QueueMsgReply; 36 import org.objectweb.joram.mom.notifications.ReceiveRequest; 37 import org.objectweb.joram.shared.JoramTracing; 38 import org.objectweb.joram.shared.excepts.AccessException; 39 import org.objectweb.joram.shared.selectors.Selector; 40 import org.objectweb.util.monolog.api.BasicLevel; 41 42 import fr.dyade.aaa.agent.AgentId; 43 import fr.dyade.aaa.agent.DeleteNot; 44 45 53 public class BridgeQueueImpl extends QueueImpl { 54 55 private BridgeModule jmsModule; 56 63 private Hashtable outTable; 64 65 72 public BridgeQueueImpl(AgentId destId, AgentId adminId, Properties prop) { 73 super(destId, adminId, prop); 74 outTable = new Hashtable (); 75 jmsModule = new BridgeModule(); 76 77 jmsModule.init(destId, prop); 79 } 80 81 public String toString() { 82 return "BridgeQueueImpl:" + destId.toString(); 83 } 84 85 92 public void bridgeDelivery(AgentId from, BridgeDeliveryNot not) { 93 ClientMessages clientMessages = new ClientMessages(); 94 clientMessages.addMessage(not.getMessage()); 95 super.doClientMessages(destId, clientMessages); 98 } 99 100 106 public void bridgeAck(BridgeAckNot not) { 107 outTable.remove(not.getIdentifier()); 108 } 109 110 119 public void receiveRequest(AgentId from, ReceiveRequest not) 120 throws AccessException { 121 if (! isReader(from)) 123 throw new AccessException("READ right not granted"); 124 125 not.requester = from; 127 not.setExpiration(System.currentTimeMillis()); 128 requests.add(not); 129 130 int reqIndex = requests.size() - 1; 132 deliverMessages(reqIndex); 133 134 if ((requests.size() - 1) == reqIndex) { 136 if (not.getTimeOut() == -1) { 139 requests.remove(reqIndex); 140 141 org.objectweb.joram.shared.messages.Message message = null; 142 143 try { 144 message = jmsModule.receiveNoWait(); 145 } catch (Exception exc) { 146 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 148 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 149 "Failing receive request on remote destination: ", exc); 150 } 151 152 if ((message != null) && 154 ! Selector.matches(message, not.getSelector())) 155 message = null; 156 157 QueueMsgReply reply = new QueueMsgReply(not); 158 reply.addMessage(message); 159 forward(from, reply); 160 161 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 162 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 163 "Receive answered."); 164 } else { 165 try { 167 jmsModule.receive(); 168 } catch (Exception exc) { 169 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 171 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 172 "Failing receive request on remote destination: ", exc); 173 } 174 } 175 } 176 } 177 178 184 public ClientMessages preProcess(AgentId from, ClientMessages not) { 185 if (destId.equals(from)) 186 return not; 187 188 Message message; 190 for (Enumeration msgs = not.getMessages().elements(); 191 msgs.hasMoreElements();) { 192 message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 193 message.order = arrivalsCounter++; 194 195 outTable.put(message.getIdentifier(), message); 196 197 try { 198 jmsModule.send(message.msg); 199 } catch (Exception exc) { 200 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 201 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 202 "Failing sending to remote destination: ", exc); 203 204 outTable.remove(message.getIdentifier()); 205 ClientMessages deadM; 206 deadM = new ClientMessages(not.getClientContext(), not.getRequestId()); 207 deadM.addMessage(message.msg); 208 sendToDMQ(deadM, not.getDMQId()); 209 } 210 } 211 return null; 212 } 213 214 221 protected void doDeleteNot(DeleteNot not) { 222 jmsModule.close(); 223 super.doDeleteNot(not); 224 } 225 226 227 228 private void readObject(java.io.ObjectInputStream in) 229 throws IOException , ClassNotFoundException { 230 in.defaultReadObject(); 231 232 messages = new Vector (); 233 deliveredMsgs = new Hashtable (); 234 235 Vector persistedMsgs = null; 238 240 if (persistedMsgs != null) { 243 Message persistedMsg; 244 AgentId consId; 245 while (! persistedMsgs.isEmpty()) { 246 persistedMsg = (Message) persistedMsgs.remove(0); 247 consId = (AgentId) consumers.get(persistedMsg.getIdentifier()); 248 if (consId == null) { 249 addMessage(persistedMsg); 250 } else if (isLocal(consId)) { 251 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 252 JoramTracing.dbgDestination.log( 253 BasicLevel.DEBUG, " -> deny " + persistedMsg.getIdentifier()); 254 consumers.remove(persistedMsg.getIdentifier()); 255 contexts.remove(persistedMsg.getIdentifier()); 256 addMessage(persistedMsg); 257 } else { 258 deliveredMsgs.put(persistedMsg.getIdentifier(), persistedMsg); 259 } 260 } 261 } 262 263 try { 265 jmsModule.connect(); 266 267 for (int i = 0; i < requests.size(); i++) 269 jmsModule.receive(); 270 271 Message momMsg; 273 Vector outMessages = new Vector (); 274 Message currentMsg; 275 for (Enumeration keys = outTable.keys(); keys.hasMoreElements();) { 276 momMsg = (Message) outTable.get(keys.nextElement()); 277 278 int i = 0; 279 while (i < outMessages.size()) { 280 currentMsg = (Message) outMessages.get(i); 281 282 if (momMsg.order < currentMsg.order) 283 break; 284 285 i++; 286 } 287 outMessages.insertElementAt(momMsg, i); 288 } 289 290 while (! outMessages.isEmpty()) { 291 momMsg = (Message) outMessages.remove(0); 292 jmsModule.send(momMsg.msg); 293 } 294 } catch (Exception exc) { 295 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 296 JoramTracing.dbgDestination.log(BasicLevel.ERROR, "", exc); 297 } 298 } 299 } 300 | Popular Tags |