KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > maverick > multiplex > MessageStore


1 /*
2  * Created on 08-May-2005
3  *
4  * TODO To change the template for this generated file go to
5  * Window - Preferences - Java - Code Style - Code Templates
6  */

7 package com.maverick.multiplex;
8
9 import java.io.EOFException JavaDoc;
10 import java.io.IOException JavaDoc;
11 import java.io.InterruptedIOException JavaDoc;
12
13 /**
14  * @author lee
15  *
16  * TODO To change the template for this generated type comment go to Window -
17  * Preferences - Java - Code Style - Code Templates
18  */

19 public class MessageStore {
20
21
22     // #ifdef DEBUG
23
static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
24         .getLog(MessageStore.class);
25     // #endif
26

27     Message header = new Message();
28     public static final int NO_MESSAGES = -1;
29     Channel channel;
30     boolean closed = false;
31     MessageObserver stickyMessageObserver;
32
33     public MessageStore(Channel channel, MessageObserver stickyMessageObserver) {
34         this.channel = channel;
35         this.stickyMessageObserver = stickyMessageObserver;
36         header.next = header.previous = header;
37     }
38
39     /**
40      *
41      * @param messagefilter
42      * @return
43      * @throws IOException
44      * @throws InterruptedIOException
45      */

46     public Message nextMessage(MessageObserver observer) throws IOException JavaDoc, EOFException JavaDoc {
47         return nextMessage(observer, 0);
48     }
49
50     public Message nextMessage(MessageObserver observer, int timeoutMS) throws IOException JavaDoc, EOFException JavaDoc {
51         // #ifdef DEBUG
52
if (log.isDebugEnabled())
53             log.debug("Waiting for next message timeout=" + timeoutMS);
54         // #endif
55

56         try {
57
58             long startTime = System.currentTimeMillis();
59
60             synchronized (header) {
61                 Message msg = null;
62
63                 while (msg == null && !isClosed()) {
64
65                     msg = hasMessage(observer);
66
67                     if (msg != null) {
68                         if (stickyMessageObserver != null && stickyMessageObserver.wantsNotification(msg)) {
69                             // #ifdef DEBUG
70
if (log.isDebugEnabled())
71                                 log.debug("Message that wants notification found");
72                             // #endif
73
return msg;
74                         }
75
76                         // #ifdef DEBUG
77
if (log.isDebugEnabled())
78                             log.debug("Message that wants notification found");
79                         // #endif
80

81                         remove(msg);
82                         return msg;
83                     }
84
85                     if (timeoutMS > 0) {
86                         if ((System.currentTimeMillis() - startTime) > timeoutMS)
87                             throw new InterruptedIOException JavaDoc("Timeout waiting for message");
88                     }
89                     header.wait(1000);
90                 }
91             }
92         } catch (InterruptedException JavaDoc ex) {
93         }
94         // #ifdef DEBUG
95
if (log.isDebugEnabled())
96             log.debug("Message could not be found");
97         // #endif
98

99         throw new EOFException JavaDoc("The required message could not be found in the message store");
100     }
101
102     public boolean isClosed() {
103         synchronized (header) {
104             return closed;
105         }
106     }
107
108     private void remove(Message e) {
109
110         if (e == header) {
111             throw new IndexOutOfBoundsException JavaDoc();
112         }
113
114         e.previous.next = e.next;
115         e.next.previous = e.previous;
116     }
117
118     public Message hasMessage(MessageObserver observer) {
119
120         synchronized (header) {
121
122             if (header.next == null) {
123                 return null;
124             }
125
126             for (Message e = header.next; e != header; e = e.next) {
127                 if (observer.wantsNotification(e))
128                     return e;
129             }
130
131             return null;
132
133         }
134     }
135
136     public void close() {
137
138         synchronized (header) {
139             closed = true;
140             header.notifyAll();
141         }
142     }
143
144     void addMessage(Message msg) {
145         synchronized (header) {
146             msg.next = header;
147             msg.previous = header.previous;
148             msg.previous.next = msg;
149             msg.next.previous = msg;
150             header.notifyAll();
151         }
152     }
153 }
154
Popular Tags