KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jfox > jms > connector > SessionMeta


1 /*
2  * Copyright (c) 2004 Your Corporation. All Rights Reserved.
3  */

4 package org.jfox.jms.connector;
5
6 import java.util.HashMap JavaDoc;
7 import java.util.Iterator JavaDoc;
8 import java.util.Map JavaDoc;
9
10 import org.jfox.jms.message.JMSMessage;
11
12 /**
13  * @author <a HREF="mailto:yy.young@gmail.com">Young Yang</a>
14  */

15
16 public class SessionMeta implements Runnable JavaDoc {
17     private String JavaDoc sessionId;
18     private ConnectionMeta connectionMeta;
19     private Map JavaDoc<String JavaDoc, ConsumerMeta> consumers = new HashMap JavaDoc<String JavaDoc, ConsumerMeta>();
20
21     private boolean closed = false;
22
23     public SessionMeta(final String JavaDoc sessionId, ConnectionMeta connMeta) {
24         this.sessionId = sessionId;
25         this.connectionMeta = connMeta;
26         //一个 Session 由一个线程负责发送异步消息
27
new Thread JavaDoc(this, "Session - " + sessionId + " Async Sender").start();
28     }
29
30     public String JavaDoc getConnectionId() {
31         return connectionMeta.getConnectionId();
32     }
33
34     public ConsumerMeta registerCunsumer(String JavaDoc consumerId) {
35         ConsumerMeta consumerMeta = new ConsumerMeta(consumerId, this);
36         consumers.put(consumerId, consumerMeta);
37         return consumerMeta;
38     }
39
40     public ConsumerMeta getConsumer(String JavaDoc consumerId) {
41         return consumers.get(consumerId);
42     }
43
44     public void unregisterConsumer(String JavaDoc consumerId) {
45         consumers.remove(consumerId);
46     }
47
48     public String JavaDoc getSessionId() {
49         return sessionId;
50     }
51
52     public void close() {
53         closed = true;
54         connectionMeta.unregisterSession(sessionId);
55         synchronized (this) {
56             notifyAll();
57         }
58     }
59
60     public void run() {
61
62         while (!closed) {
63             try {
64                 if (beWaiting()) {
65                     synchronized (this) {
66                         wait();
67                     }
68                     if (closed) break;
69                 }
70                 for (Iterator JavaDoc it = consumers.entrySet().iterator(); it.hasNext();) {
71                     Map.Entry JavaDoc<String JavaDoc, ConsumerMeta> entry = (Map.Entry JavaDoc<String JavaDoc, ConsumerMeta>) it.next();
72                     String JavaDoc consumerId = entry.getKey();
73                     ConsumerMeta meta = entry.getValue();
74                     if (meta.isAsync()) {
75                         JMSMessage msg = meta.popMessage();
76                         if (msg != null) {
77                             connectionMeta.getJMSConnection().onMessage(sessionId, consumerId, msg);
78                         }
79                     }
80                 }
81             } catch (Exception JavaDoc e) {
82                 e.printStackTrace();
83             }
84         }
85
86     }
87
88     private boolean beWaiting() {
89         if (!connectionMeta.isStarted() || consumers.isEmpty()) {
90             return true;
91         } else {
92             for (ConsumerMeta meta : consumers.values()) {
93                 if (meta.isAsync() && meta.hasMessage()) {
94                     return false;
95                 }
96             }
97             return true;
98         }
99     }
100
101
102     public static void main(String JavaDoc[] args) {
103
104     }
105 }
106
107
Popular Tags