1 24 package org.objectweb.joram.mom.dest; 25 26 import java.util.Enumeration ; 27 import java.util.Vector ; 28 import java.util.Properties ; 29 30 import fr.dyade.aaa.agent.AgentId; 31 import fr.dyade.aaa.agent.Notification; 32 import fr.dyade.aaa.agent.UnknownAgent; 33 34 import org.objectweb.joram.mom.notifications.*; 35 import org.objectweb.joram.shared.excepts.*; 36 import org.objectweb.joram.mom.messages.Message; 37 import org.objectweb.joram.shared.selectors.Selector; 38 39 import org.objectweb.joram.shared.JoramTracing; 40 import org.objectweb.util.monolog.api.BasicLevel; 41 42 47 public class DeadMQueueImpl extends QueueImpl { 48 49 static AgentId id = null; 50 51 static Integer threshold = null; 52 53 60 public DeadMQueueImpl(AgentId destId, AgentId adminId, Properties prop) { 61 super(destId, adminId, prop); 62 } 63 64 65 public String toString() { 66 return "DeadMQueueImpl:" + destId.toString(); 67 } 68 69 70 71 public static AgentId getId() { 72 return id; 73 } 74 75 76 public static Integer getDefaultThreshold() { 77 return threshold; 78 } 79 80 86 public void setDMQRequest(AgentId from, SetDMQRequest req) 87 throws AccessException { 88 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 89 JoramTracing.dbgDestination.log(BasicLevel.WARN, 90 "Unexpected request: " + req); 91 } 92 93 98 public ClientMessages preProcess(AgentId from, ClientMessages not) { 99 Message msg; 101 for (Enumeration msgs = not.getMessages().elements(); 103 msgs.hasMoreElements();) { 104 msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 105 msg.setExpiration(0L); 106 msg.order = arrivalsCounter++; 107 messages.add(msg); 108 setMsgTxName(msg); 110 msg.save(); 111 } 112 return null; 113 } 114 115 121 public void setThreshRequest(AgentId from, SetThreshRequest req) 122 throws AccessException { 123 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 124 JoramTracing.dbgDestination.log(BasicLevel.WARN, 125 "Unexpected request: " + req); 126 } 127 128 135 public void browseRequest(AgentId from, BrowseRequest not) 136 throws AccessException { 137 if (! isReader(from)) 139 throw new AccessException("READ right not granted"); 140 141 BrowseReply rep = new BrowseReply(not); 143 144 Message message; 146 for (int i = 0; i < messages.size(); i++) { 147 message = (Message) messages.get(i); 148 if (Selector.matches(message.msg, not.getSelector())) 150 rep.addMessage(message.msg); 151 } 152 forward(from, rep); 154 155 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 156 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Request answered."); 157 } 158 159 164 public void acknowledgeRequest(AgentId from, AcknowledgeRequest not) {} 165 166 171 public void denyRequest(AgentId from, DenyRequest not) {} 172 173 178 protected void doUnknownAgent(UnknownAgent uA) { 179 AgentId client = uA.agent; 180 Notification not = uA.not; 181 182 if (! (not instanceof QueueMsgReply)) 184 return; 185 186 Vector msgList = ((QueueMsgReply) not).getMessages(); 188 for (int i = 0; i < msgList.size(); i++) { 189 Message msg = (Message)msgList.elementAt(i); 190 msg.order = arrivalsCounter++; 191 messages.add(msg); 192 setMsgTxName(msg); 194 msg.save(); 195 } 196 deliverMessages(0); 198 } 199 200 206 protected void deliverMessages(int index) { 207 ReceiveRequest notRec = null; 208 boolean replied; 209 int j = 0; 210 Message message; 211 QueueMsgReply notMsg; 212 213 while (! messages.isEmpty() && index < requests.size()) { 215 notRec = (ReceiveRequest) requests.get(index); 216 replied = false; 217 notMsg = new QueueMsgReply(notRec); 218 219 while (j < messages.size()) { 221 message = (Message) messages.get(j); 222 223 if (Selector.matches(message.msg, notRec.getSelector())) { 225 notMsg.addMessage(message.msg); 226 227 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 228 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Message " 229 + message.getIdentifier() 230 + " sent to " 231 + notRec.requester 232 + " as a reply to " 233 + notRec.getRequestId()); 234 235 messages.remove(j); 237 message.delete(); 238 replied = true; 240 requests.remove(index); 241 break; 242 } 243 else 245 j++; 246 } 247 248 if (notMsg.getSize() > 0) { 249 forward(notRec.requester, notMsg); 250 } 251 252 j = 0; 254 if (! replied) 255 index++; 256 } 257 } 258 259 263 protected void sendToDMQ(Vector deadMessages, AgentId dmqId) {} 264 } 265 | Popular Tags |