KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jfox > jms > connector > ConsumerMeta


1 /*
2  * Copyright (c) 2004 Your Corporation. All Rights Reserved.
3  */

4 package org.jfox.jms.connector;
5
6 import java.util.ArrayList JavaDoc;
7 import java.util.Collections JavaDoc;
8 import java.util.HashMap JavaDoc;
9 import java.util.List JavaDoc;
10 import java.util.Map JavaDoc;
11 import javax.jms.JMSException JavaDoc;
12
13 import org.jfox.jms.message.JMSMessage;
14
15 /**
16  * @author <a HREF="mailto:yy.young@gmail.com">Young Yang</a>
17  */

18
19 public class ConsumerMeta {
20
21     private String JavaDoc consumerId;
22
23     private SessionMeta sessionMeta;
24
25     private List JavaDoc<JMSMessage> messages = new ArrayList JavaDoc<JMSMessage>();
26     private Map JavaDoc<String JavaDoc, JMSMessage> unackMessages = new HashMap JavaDoc<String JavaDoc, JMSMessage>();
27
28     private boolean async = false;
29
30     public ConsumerMeta(String JavaDoc consumerId, SessionMeta sessionMeta) {
31         this.consumerId = consumerId;
32         this.sessionMeta = sessionMeta;
33     }
34
35     public String JavaDoc getSessionId() {
36         return sessionMeta.getSessionId();
37     }
38
39     public void addMessage(JMSMessage message) {
40         messages.add(message);
41         // 如果是异步接收,通知sessionMeta线程发送消息
42
if (this.isAsync()) {
43             synchronized (sessionMeta) {
44                 sessionMeta.notifyAll();
45             }
46         }
47     }
48
49     public synchronized JMSMessage popMessage() throws JMSException JavaDoc {
50         Collections.sort(messages);
51         JMSMessage message = null;
52         while (!messages.isEmpty()) {
53             JMSMessage msg = messages.remove(0);
54             //消息未过时
55
if (msg.getJMSExpiration() == 0 || (System.currentTimeMillis() < msg.getJMSExpiration())) {
56                 message = msg;
57                 //将消息添加到 unack 中
58
unackMessages.put(message.getJMSMessageID(), message);
59                 break;
60             }
61         }
62
63         return message;
64
65     }
66
67     public String JavaDoc getConsumerId() {
68         return consumerId;
69     }
70
71     public void setAsync(boolean isAsync) {
72         this.async = isAsync;
73     }
74
75     public boolean isAsync() {
76         return async;
77     }
78
79     public boolean hasMessage() {
80         return !messages.isEmpty();
81     }
82
83     public void acknowlege(String JavaDoc messageId) throws JMSException JavaDoc {
84         if (!unackMessages.containsKey(messageId)) {
85             throw new JMSException JavaDoc("message " + messageId + " not exists.");
86         }
87         unackMessages.remove(messageId);
88     }
89
90     public static void main(String JavaDoc[] args) {
91
92     }
93 }
94
95
Popular Tags