1 7 package com.maverick.multiplex; 8 9 import java.io.EOFException ; 10 import java.io.IOException ; 11 import java.io.InterruptedIOException ; 12 13 19 public class MessageStore { 20 21 22 static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 24 .getLog(MessageStore.class); 25 27 Message header = new Message(); 28 public static final int NO_MESSAGES = -1; 29 Channel channel; 30 boolean closed = false; 31 MessageObserver stickyMessageObserver; 32 33 public MessageStore(Channel channel, MessageObserver stickyMessageObserver) { 34 this.channel = channel; 35 this.stickyMessageObserver = stickyMessageObserver; 36 header.next = header.previous = header; 37 } 38 39 46 public Message nextMessage(MessageObserver observer) throws IOException , EOFException { 47 return nextMessage(observer, 0); 48 } 49 50 public Message nextMessage(MessageObserver observer, int timeoutMS) throws IOException , EOFException { 51 if (log.isDebugEnabled()) 53 log.debug("Waiting for next message timeout=" + timeoutMS); 54 56 try { 57 58 long startTime = System.currentTimeMillis(); 59 60 synchronized (header) { 61 Message msg = null; 62 63 while (msg == null && !isClosed()) { 64 65 msg = hasMessage(observer); 66 67 if (msg != null) { 68 if (stickyMessageObserver != null && stickyMessageObserver.wantsNotification(msg)) { 69 if (log.isDebugEnabled()) 71 log.debug("Message that wants notification found"); 72 return msg; 74 } 75 76 if (log.isDebugEnabled()) 78 log.debug("Message that wants notification found"); 79 81 remove(msg); 82 return msg; 83 } 84 85 if (timeoutMS > 0) { 86 if ((System.currentTimeMillis() - startTime) > timeoutMS) 87 throw new InterruptedIOException ("Timeout waiting for message"); 88 } 89 header.wait(1000); 90 } 91 } 92 } catch (InterruptedException ex) { 93 } 94 if (log.isDebugEnabled()) 96 log.debug("Message could not be found"); 97 99 throw new EOFException ("The required message could not be found in the message store"); 100 } 101 102 public boolean isClosed() { 103 synchronized (header) { 104 return closed; 105 } 106 } 107 108 private void remove(Message e) { 109 110 if (e == header) { 111 throw new IndexOutOfBoundsException (); 112 } 113 114 e.previous.next = e.next; 115 e.next.previous = e.previous; 116 } 117 118 public Message hasMessage(MessageObserver observer) { 119 120 synchronized (header) { 121 122 if (header.next == null) { 123 return null; 124 } 125 126 for (Message e = header.next; e != header; e = e.next) { 127 if (observer.wantsNotification(e)) 128 return e; 129 } 130 131 return null; 132 133 } 134 } 135 136 public void close() { 137 138 synchronized (header) { 139 closed = true; 140 header.notifyAll(); 141 } 142 } 143 144 void addMessage(Message msg) { 145 synchronized (header) { 146 msg.next = header; 147 msg.previous = header.previous; 148 msg.previous.next = msg; 149 msg.next.previous = msg; 150 header.notifyAll(); 151 } 152 } 153 } 154 | Popular Tags |