KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > Queue


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import java.io.IOException JavaDoc;
27 import java.io.ObjectInputStream JavaDoc;
28 import java.io.ObjectOutputStream JavaDoc;
29 import java.util.Properties JavaDoc;
30
31 import org.objectweb.joram.mom.notifications.AbortReceiveRequest;
32 import org.objectweb.joram.mom.notifications.AbstractRequest;
33 import org.objectweb.joram.mom.notifications.AcknowledgeRequest;
34 import org.objectweb.joram.mom.notifications.BrowseRequest;
35 import org.objectweb.joram.mom.notifications.DenyRequest;
36 import org.objectweb.joram.mom.notifications.ExceptionReply;
37 import org.objectweb.joram.mom.notifications.Monit_GetNbMaxMsg;
38 import org.objectweb.joram.mom.notifications.Monit_GetPendingMessages;
39 import org.objectweb.joram.mom.notifications.Monit_GetPendingRequests;
40 import org.objectweb.joram.mom.notifications.ReceiveRequest;
41 import org.objectweb.joram.mom.notifications.SetNbMaxMsgRequest;
42 import org.objectweb.joram.mom.notifications.SetThreshRequest;
43 import org.objectweb.joram.mom.notifications.WakeUpNot;
44 import org.objectweb.joram.mom.proxies.ConnectionManager;
45 import org.objectweb.joram.shared.JoramTracing;
46 import org.objectweb.joram.shared.excepts.MomException;
47 import org.objectweb.util.monolog.api.BasicLevel;
48
49 import fr.dyade.aaa.agent.AgentId;
50 import fr.dyade.aaa.agent.BagSerializer;
51 import fr.dyade.aaa.agent.Channel;
52 import fr.dyade.aaa.agent.Notification;
53 import fr.dyade.aaa.util.Timer;
54 import fr.dyade.aaa.util.TimerTask;
55
56 /**
57  * A <code>Queue</code> agent is an agent hosting a MOM queue, and which
58  * behaviour is provided by a <code>QueueImpl</code> instance.
59  *
60  * @see QueueImpl
61  */

62 public class Queue extends Destination implements BagSerializer {
63   
64   public static final String JavaDoc QUEUE_TYPE = "queue";
65
66   public static String JavaDoc getDestinationType() {
67     return QUEUE_TYPE;
68   }
69
70   /**
71    * Empty constructor for newInstance().
72    */

73   public Queue() {}
74
75   /**
76    * Creates the <tt>QueueImpl</tt>.
77    *
78    * @param adminId Identifier of the queue administrator.
79    * @param prop The initial set of properties.
80    */

81   public DestinationImpl createsImpl(AgentId adminId, Properties JavaDoc prop) {
82     return new QueueImpl(getId(), adminId, prop);
83   }
84
85   private transient Task task;
86
87   /**
88    * Gives this agent an opportunity to initialize after having been deployed,
89    * and each time it is loaded into memory.
90    *
91    * @param firstTime true when first called by the factory
92    *
93    * @exception Exception
94    * unspecialized exception
95    */

96   protected void agentInitialize(boolean firstTime) throws Exception JavaDoc {
97     super.agentInitialize(firstTime);
98     task = new Task(getId());
99     task.schedule();
100   }
101   
102   /**
103    * Distributes the received notifications to the appropriate reactions.
104    * @throws Exception
105    */

106   public void react(AgentId from, Notification not) throws Exception JavaDoc {
107     if (logger.isLoggable(BasicLevel.DEBUG))
108       logger.log(BasicLevel.DEBUG,
109                  "Queue.react(" + from + ',' + not + ')');
110
111     int reqId = -1;
112     if (not instanceof AbstractRequest)
113       reqId = ((AbstractRequest) not).getRequestId();
114
115     try {
116       if (not instanceof SetThreshRequest)
117         ((QueueImpl)destImpl).setThreshRequest(from, (SetThreshRequest) not);
118       else if (not instanceof SetNbMaxMsgRequest)
119         ((QueueImpl)destImpl).setNbMaxMsgRequest(from, (SetNbMaxMsgRequest) not);
120       else if (not instanceof Monit_GetPendingMessages)
121         ((QueueImpl)destImpl).monitGetPendingMessages(from, (Monit_GetPendingMessages) not);
122       else if (not instanceof Monit_GetPendingRequests)
123         ((QueueImpl)destImpl).monitGetPendingRequests(from, (Monit_GetPendingRequests) not);
124       else if (not instanceof Monit_GetNbMaxMsg)
125         ((QueueImpl)destImpl).monitGetNbMaxMsg(from, (Monit_GetNbMaxMsg) not);
126       else if (not instanceof ReceiveRequest)
127         ((QueueImpl)destImpl).receiveRequest(from, (ReceiveRequest) not);
128       else if (not instanceof BrowseRequest)
129         ((QueueImpl)destImpl).browseRequest(from, (BrowseRequest) not);
130       else if (not instanceof AcknowledgeRequest)
131         ((QueueImpl)destImpl).acknowledgeRequest(from, (AcknowledgeRequest) not);
132       else if (not instanceof DenyRequest)
133         ((QueueImpl)destImpl).denyRequest(from, (DenyRequest) not);
134       else if (not instanceof AbortReceiveRequest)
135         ((QueueImpl)destImpl).abortReceiveRequest(from, (AbortReceiveRequest) not);
136 // else if (not instanceof DestinationAdminRequestNot)
137
// ((QueueImpl)destImpl).destinationAdminRequestNot(from, (DestinationAdminRequestNot) not);
138
else if (not instanceof WakeUpNot) {
139         if (task == null)
140           task = new Task(getId());
141         task.schedule();
142         ((QueueImpl)destImpl).wakeUpNot((WakeUpNot) not);
143       }else
144         super.react(from, not);
145
146     } catch (MomException exc) {
147       // MOM Exceptions are sent to the requester.
148
if (logger.isLoggable(BasicLevel.WARN))
149         logger.log(BasicLevel.WARN, exc);
150
151       if (not instanceof AbstractRequest) {
152         AbstractRequest req = (AbstractRequest) not;
153         Channel.sendTo(from, new ExceptionReply(req, exc));
154       }
155     }
156   }
157
158   public void readBag(ObjectInputStream JavaDoc in)
159     throws IOException JavaDoc, ClassNotFoundException JavaDoc {
160     ((QueueImpl) destImpl).readBag(in);
161   }
162
163   public void writeBag(ObjectOutputStream JavaDoc out)
164     throws IOException JavaDoc {
165     ((QueueImpl) destImpl).writeBag(out);
166   }
167
168   private class Task extends TimerTask {
169     private AgentId to;
170
171     private Task(AgentId to) {
172       this.to = to;
173     }
174     
175     /** Method called when the timer expires. */
176     public void run() {
177       try {
178         Channel.sendTo(to, new WakeUpNot());
179       } catch (Exception JavaDoc e) {}
180     }
181
182     public void schedule() {
183       long period = ((QueueImpl) destImpl).getPeriod();
184
185       if (period != -1) {
186         try {
187           Timer timer = ConnectionManager.getTimer();
188           timer.schedule(this, period);
189         } catch (Exception JavaDoc exc) {
190           if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
191             JoramTracing.dbgDestination.log(BasicLevel.ERROR,
192                                             "--- " + this + " Queue(...)", exc);
193         }
194       }
195     }
196   }
197 }
198
Popular Tags