1 24 package org.objectweb.joram.mom.dest; 25 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.util.Properties ; 30 31 import org.objectweb.joram.mom.notifications.AbortReceiveRequest; 32 import org.objectweb.joram.mom.notifications.AbstractRequest; 33 import org.objectweb.joram.mom.notifications.AcknowledgeRequest; 34 import org.objectweb.joram.mom.notifications.BrowseRequest; 35 import org.objectweb.joram.mom.notifications.DenyRequest; 36 import org.objectweb.joram.mom.notifications.ExceptionReply; 37 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsg; 38 import org.objectweb.joram.mom.notifications.Monit_GetPendingMessages; 39 import org.objectweb.joram.mom.notifications.Monit_GetPendingRequests; 40 import org.objectweb.joram.mom.notifications.ReceiveRequest; 41 import org.objectweb.joram.mom.notifications.SetNbMaxMsgRequest; 42 import org.objectweb.joram.mom.notifications.SetThreshRequest; 43 import org.objectweb.joram.mom.notifications.WakeUpNot; 44 import org.objectweb.joram.mom.proxies.ConnectionManager; 45 import org.objectweb.joram.shared.JoramTracing; 46 import org.objectweb.joram.shared.excepts.MomException; 47 import org.objectweb.util.monolog.api.BasicLevel; 48 49 import fr.dyade.aaa.agent.AgentId; 50 import fr.dyade.aaa.agent.BagSerializer; 51 import fr.dyade.aaa.agent.Channel; 52 import fr.dyade.aaa.agent.Notification; 53 import fr.dyade.aaa.util.Timer; 54 import fr.dyade.aaa.util.TimerTask; 55 56 62 public class Queue extends Destination implements BagSerializer { 63 64 public static final String QUEUE_TYPE = "queue"; 65 66 public static String getDestinationType() { 67 return QUEUE_TYPE; 68 } 69 70 73 public Queue() {} 74 75 81 public DestinationImpl createsImpl(AgentId adminId, Properties prop) { 82 return new QueueImpl(getId(), adminId, prop); 83 } 84 85 private transient Task task; 86 87 96 protected void agentInitialize(boolean firstTime) throws Exception { 97 super.agentInitialize(firstTime); 98 task = new Task(getId()); 99 task.schedule(); 100 } 101 102 106 public void react(AgentId from, Notification not) throws Exception { 107 if (logger.isLoggable(BasicLevel.DEBUG)) 108 logger.log(BasicLevel.DEBUG, 109 "Queue.react(" + from + ',' + not + ')'); 110 111 int reqId = -1; 112 if (not instanceof AbstractRequest) 113 reqId = ((AbstractRequest) not).getRequestId(); 114 115 try { 116 if (not instanceof SetThreshRequest) 117 ((QueueImpl)destImpl).setThreshRequest(from, (SetThreshRequest) not); 118 else if (not instanceof SetNbMaxMsgRequest) 119 ((QueueImpl)destImpl).setNbMaxMsgRequest(from, (SetNbMaxMsgRequest) not); 120 else if (not instanceof Monit_GetPendingMessages) 121 ((QueueImpl)destImpl).monitGetPendingMessages(from, (Monit_GetPendingMessages) not); 122 else if (not instanceof Monit_GetPendingRequests) 123 ((QueueImpl)destImpl).monitGetPendingRequests(from, (Monit_GetPendingRequests) not); 124 else if (not instanceof Monit_GetNbMaxMsg) 125 ((QueueImpl)destImpl).monitGetNbMaxMsg(from, (Monit_GetNbMaxMsg) not); 126 else if (not instanceof ReceiveRequest) 127 ((QueueImpl)destImpl).receiveRequest(from, (ReceiveRequest) not); 128 else if (not instanceof BrowseRequest) 129 ((QueueImpl)destImpl).browseRequest(from, (BrowseRequest) not); 130 else if (not instanceof AcknowledgeRequest) 131 ((QueueImpl)destImpl).acknowledgeRequest(from, (AcknowledgeRequest) not); 132 else if (not instanceof DenyRequest) 133 ((QueueImpl)destImpl).denyRequest(from, (DenyRequest) not); 134 else if (not instanceof AbortReceiveRequest) 135 ((QueueImpl)destImpl).abortReceiveRequest(from, (AbortReceiveRequest) not); 136 else if (not instanceof WakeUpNot) { 139 if (task == null) 140 task = new Task(getId()); 141 task.schedule(); 142 ((QueueImpl)destImpl).wakeUpNot((WakeUpNot) not); 143 }else 144 super.react(from, not); 145 146 } catch (MomException exc) { 147 if (logger.isLoggable(BasicLevel.WARN)) 149 logger.log(BasicLevel.WARN, exc); 150 151 if (not instanceof AbstractRequest) { 152 AbstractRequest req = (AbstractRequest) not; 153 Channel.sendTo(from, new ExceptionReply(req, exc)); 154 } 155 } 156 } 157 158 public void readBag(ObjectInputStream in) 159 throws IOException , ClassNotFoundException { 160 ((QueueImpl) destImpl).readBag(in); 161 } 162 163 public void writeBag(ObjectOutputStream out) 164 throws IOException { 165 ((QueueImpl) destImpl).writeBag(out); 166 } 167 168 private class Task extends TimerTask { 169 private AgentId to; 170 171 private Task(AgentId to) { 172 this.to = to; 173 } 174 175 176 public void run() { 177 try { 178 Channel.sendTo(to, new WakeUpNot()); 179 } catch (Exception e) {} 180 } 181 182 public void schedule() { 183 long period = ((QueueImpl) destImpl).getPeriod(); 184 185 if (period != -1) { 186 try { 187 Timer timer = ConnectionManager.getTimer(); 188 timer.schedule(this, period); 189 } catch (Exception exc) { 190 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR)) 191 JoramTracing.dbgDestination.log(BasicLevel.ERROR, 192 "--- " + this + " Queue(...)", exc); 193 } 194 } 195 } 196 } 197 } 198 | Popular Tags |