KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jfox > jms > connector > JMSContainerImpl


1 /* JFox, the OpenSource J2EE Application Server
2  *
3  * Distributable under GNU LGPL license by gun.org
4  * more details please visit http://www.huihoo.org/jfox
5  */

6
7 package org.jfox.jms.connector;
8
9 import java.util.ArrayList JavaDoc;
10 import java.util.Collections JavaDoc;
11 import java.util.HashMap JavaDoc;
12 import java.util.Iterator JavaDoc;
13 import java.util.List JavaDoc;
14 import java.util.Map JavaDoc;
15 import java.util.Random JavaDoc;
16 import javax.jms.Destination JavaDoc;
17 import javax.jms.IllegalStateException JavaDoc;
18 import javax.jms.InvalidDestinationException JavaDoc;
19 import javax.jms.JMSException JavaDoc;
20 import javax.jms.JMSSecurityException JavaDoc;
21 import javax.jms.Message JavaDoc;
22 import javax.jms.Queue JavaDoc;
23 import javax.jms.TemporaryQueue JavaDoc;
24 import javax.jms.TemporaryTopic JavaDoc;
25 import javax.jms.Topic JavaDoc;
26
27 import org.jfox.ioc.connector.AbstractContainer;
28 import org.jfox.ioc.ext.ActiveComponent;
29 import org.jfox.ioc.util.Marshaller;
30 import org.jfox.jms.JMSConnectionRemote;
31 import org.jfox.jms.message.JMSMessage;
32
33
34 /**
35  * @author <a HREF="mailto:young_yy@hotmail.com">Young Yang</a>
36  */

37
38 public class JMSContainerImpl extends AbstractContainer implements JMSContainer, ActiveComponent {
39     /**
40      * 随机数发生器,用来随机分配消息给 Queue
41      */

42     private static Random JavaDoc rand = new Random JavaDoc();
43     /**
44      * all destinations
45      */

46     private JMSDestinations destinations = JMSDestinations.getInstance();
47
48     /**
49      * 所有新到来的消息都先存放在这里,等待分发
50      */

51     private static List JavaDoc<JMSMessage> tempMessages = new ArrayList JavaDoc<JMSMessage>();
52
53     /**
54      * Client connections ClientID=>JMSConnectionRemote stub
55      * for async call back
56      */

57     private Map JavaDoc<String JavaDoc, ConnectionMeta> connections = new HashMap JavaDoc<String JavaDoc, ConnectionMeta>();
58
59     private String JavaDoc user = "";
60     private String JavaDoc password = "";
61
62     public JMSContainerImpl() {
63
64     }
65
66     public Class JavaDoc getHandlerClass() {
67         return JMSHandler.class;
68     }
69
70     public boolean auth(String JavaDoc userName, String JavaDoc password) throws JMSException JavaDoc {
71         if (user.equals("") && this.password.equals("")) {
72             return true;
73         } else {
74             if (user.equals(userName) && this.password.equals(password)) {
75                 return true;
76             } else {
77                 throw new JMSSecurityException JavaDoc("auth error, used user=" + user + ", password=" + password);
78             }
79         }
80     }
81
82     public void setUser(String JavaDoc user) {
83         this.user = user;
84     }
85
86     public void setPassword(String JavaDoc password) {
87         this.password = password;
88     }
89
90     public Queue JavaDoc createQueue(java.lang.String JavaDoc queueName) throws JMSException JavaDoc {
91         return null;
92     }
93
94     public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
95         return null;
96     }
97
98     public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
99         return null;
100     }
101
102     public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
103         return null;
104     }
105
106     public synchronized void registerConnection(String JavaDoc clientId, Object JavaDoc conn) throws JMSException JavaDoc {
107         JMSConnectionRemote jmsconn = (JMSConnectionRemote) Marshaller.unmarshall(conn);
108         logger.debug("register jms client connection: " + clientId + " => " + jmsconn);
109         if (connections.containsKey(clientId)) {
110             throw new JMSException JavaDoc("connection clientId " + clientId + " already registered.");
111         }
112         ConnectionMeta meta = new ConnectionMeta(clientId, jmsconn);
113         connections.put(clientId, meta);
114     }
115
116     public synchronized void unregisterConnection(String JavaDoc clientId) throws JMSException JavaDoc {
117         logger.debug("unregisterConnection clientId=" + clientId);
118         ConnectionMeta meta = connections.remove(clientId);
119         if (meta != null) {
120             meta.close();
121         }
122     }
123
124     public synchronized boolean isConnectionRegistered(String JavaDoc clientId) {
125         return connections.containsKey(clientId);
126     }
127
128     public void registerSession(String JavaDoc connectionId, String JavaDoc sessionId) throws JMSException JavaDoc {
129         logger.debug("register jms session, sessionId = " + sessionId + ", clientId = " + connectionId);
130         if (!connections.containsKey(connectionId)) {
131             throw new JMSException JavaDoc("connection clientId " + connectionId + " not registered.");
132         }
133         connections.get(connectionId).registerSession(sessionId);
134     }
135
136     public void registerConsumer(String JavaDoc clientId, String JavaDoc sessionId, String JavaDoc consumerId, Destination JavaDoc destnation) throws JMSException JavaDoc {
137         logger.debug("register jms consumer, consumerId =" + consumerId + ", sessionId =" + sessionId + " ,clientId = " + clientId);
138         if (!connections.containsKey(clientId)) {
139             throw new JMSException JavaDoc("connection clientId " + clientId + " not registered.");
140         }
141         SessionMeta sessionMeta = getSession(clientId, sessionId);
142         ConsumerMeta meta = sessionMeta.registerCunsumer(consumerId);
143         destinations.registerConsumer(destnation, meta);
144         synchronized (this) {
145             notifyAll();
146         }
147     }
148
149     /**
150      * 客户端调用该方法发送消息
151      * 服务器端接收到消息之后,把引用存到每一个注册了消息地址的Cosumer中
152      * 真正发送的时候,建立该消息的副本发送
153      *
154      * @param message
155      */

156     public void sendMessage(JMSMessage message) throws JMSException JavaDoc {
157         logger.debug("receive message: " + message);
158         Destination JavaDoc destination = message.getJMSDestination();
159         if (!destinations.isDestinationRegistered(destination)) {
160             throw new InvalidDestinationException JavaDoc("Destination " + destination + " not exists.");
161         }
162
163         tempMessages.add((JMSMessage) message);
164
165         synchronized (this) {
166             notifyAll();
167         }
168     }
169
170     /**
171      * 客户端调用该方法批量发送消息,用于本地事务的Commit
172      *
173      * @param messages
174      */

175     public void sendMessageBatch(JMSMessage[] messages) throws JMSException JavaDoc {
176         throw new JMSException JavaDoc("not support now!");
177     }
178
179     /**
180      * 客户端调用该方法得到消息
181      *
182      * @param clientId
183      * @param sessionId
184      * @param consumerId
185      * @param timeout 0 forever; -1 noWait; >1 timeToWait
186      * @return
187      */

188     public JMSMessage receiveMessage(String JavaDoc clientId, String JavaDoc sessionId, String JavaDoc consumerId, long timeout) throws JMSException JavaDoc {
189         ConnectionMeta connMeta = getConnection(clientId);
190         if (!connMeta.isStarted()) {
191             throw new IllegalStateException JavaDoc("connection " + clientId + " not started, can't receive message.");
192         }
193         ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId);
194         JMSMessage message = meta.popMessage();
195         while (message == null) {
196             try {
197                 if (timeout == 0) {
198                     Thread.sleep(50L);
199                 } else if (timeout > 0) {
200                     Thread.sleep(timeout);
201                     timeout = -1; // end loop
202
} else {
203                     break;
204                 }
205             } catch (Exception JavaDoc e) {
206                 e.printStackTrace();
207             }
208             message = meta.popMessage();
209         }
210         logger.debug("receive message " + message + ", clientId=" + clientId + ", sessionId=" + sessionId + ", consumerId=" + consumerId);
211         return message;
212     }
213
214     public void acknowledge(String JavaDoc clientId, String JavaDoc sessionId, String JavaDoc consumerId, String JavaDoc messageId) throws JMSException JavaDoc {
215         logger.debug("acknowledge message: messageId=" + messageId + ", clientId=" + clientId + ", sessionId=" + sessionId + ", consumerId=" + consumerId);
216         ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId);
217         meta.acknowlege(messageId);
218     }
219
220     public void startConnection(String JavaDoc clientId) throws JMSException JavaDoc {
221         logger.debug("startConnection clientId=" + clientId);
222         ConnectionMeta connMeta = getConnection(clientId);
223         connMeta.start();
224     }
225
226     public void stopConnection(String JavaDoc clientId) throws JMSException JavaDoc {
227         logger.debug("stopConnection clientId=" + clientId);
228         ConnectionMeta connMeta = getConnection(clientId);
229         connMeta.stop();
230     }
231
232     public void setConsumerAsync(String JavaDoc clientId, String JavaDoc sessionId, String JavaDoc consumerId, boolean async) throws JMSException JavaDoc {
233         ConsumerMeta meta = getConsumer(clientId, sessionId, consumerId);
234         meta.setAsync(async);
235     }
236
237     public void closeSession(String JavaDoc clientId, String JavaDoc sessionId) throws JMSException JavaDoc {
238         logger.debug("closeSession sessionId=" + sessionId + ", clientId=" + clientId);
239         SessionMeta meta = getSession(clientId, sessionId);
240         meta.close();
241     }
242
243     protected void doInit() throws Exception JavaDoc {
244         //负责把TempMessages分发到给Consumer
245
new Thread JavaDoc(this, getName()).start();
246     }
247
248     protected void doDestroy() throws Exception JavaDoc {
249         super.doDestroy();
250     }
251
252     protected void doStart() throws Exception JavaDoc {
253         super.doStart();
254     }
255
256     protected void doStop() throws Exception JavaDoc {
257         super.doStop();
258     }
259
260     public void run() {
261         while (isStarted()) {
262             try {
263                 while (beWait()) {
264                     synchronized (this) {
265                         wait();
266                     }
267                 }
268                 Collections.sort(tempMessages);
269                 for (Iterator JavaDoc it = tempMessages.iterator(); it.hasNext();) {
270 // for(JMSMessage message : tempMessages) {
271
JMSMessage message = (JMSMessage) it.next();
272                     Destination JavaDoc destination = message.getJMSDestination();
273
274                     if (destinations.hashConsumer(destination)) {
275                         List JavaDoc<ConsumerMeta> consumers = destinations.getConsumerMetas(destination);
276                         boolean isQueue = (destination instanceof Queue JavaDoc) ? true : false;
277                         if (isQueue) {
278                             //随机分配给一个Consumer
279
int index = rand.nextInt(consumers.size());
280                             ConsumerMeta meta = consumers.get(index);
281                             meta.addMessage(message);
282                             logger.debug("dispatch message " + message + " to consumer " + meta.getConsumerId());
283                         } else {
284                             //分配给每一个Consumer
285
for (ConsumerMeta meta : consumers) {
286                                 meta.addMessage(message);
287                             }
288                         }
289                         it.remove();
290                     }
291                 }
292             } catch (Exception JavaDoc e) {
293                 e.printStackTrace();
294             }
295         }
296
297     }
298
299     /**
300      * 分派线程是否进入等待状态
301      * 1.没有消息
302      * 2.有消息,但是所有的消息都没有consumer
303      *
304      * @return
305      * @throws JMSException
306      */

307     private boolean beWait() throws JMSException JavaDoc {
308         if (tempMessages.isEmpty()) {
309             return true;
310         } else {
311             for (Message JavaDoc message : tempMessages) {
312                 Destination JavaDoc destination = message.getJMSDestination();
313                 if (destinations.hashConsumer(destination)) {
314                     return false;
315                 }
316             }
317             return true;
318         }
319     }
320
321     private ConnectionMeta getConnection(String JavaDoc clientId) throws JMSException JavaDoc {
322         if (!connections.containsKey(clientId)) {
323             throw new JMSException JavaDoc("connection clientId " + clientId + " not registered.");
324         }
325         ConnectionMeta connMeta = connections.get(clientId);
326         return connMeta;
327     }
328
329     private SessionMeta getSession(String JavaDoc clientId, String JavaDoc sessionId) throws JMSException JavaDoc {
330         ConnectionMeta connMeta = getConnection(clientId);
331         SessionMeta sessionMeta = connMeta.getSession(sessionId);
332         if (sessionMeta == null) {
333             throw new JMSException JavaDoc("no session " + sessionId + " in connection " + clientId);
334         }
335         return sessionMeta;
336     }
337
338     private ConsumerMeta getConsumer(String JavaDoc clientId, String JavaDoc sessionId, String JavaDoc consumerId) throws JMSException JavaDoc {
339         SessionMeta sessionMeta = getSession(clientId, sessionId);
340         ConsumerMeta consumerMeta = sessionMeta.getConsumer(consumerId);
341         if (consumerMeta == null) {
342             throw new JMSException JavaDoc("no consumer " + consumerId + " in session " + sessionId + " of connection " + clientId);
343         }
344         return consumerMeta;
345     }
346
347     public static void main(String JavaDoc[] args) {
348
349     }
350
351 }
352
Popular Tags