1 package com.ubermq.jms.client.impl; 2 3 import com.ubermq.jms.client.*; 4 import com.ubermq.kernel.*; 5 import com.ubermq.util.*; 6 7 import java.util.*; 8 import javax.jms.Message ; 9 import EDU.oswego.cs.dl.util.concurrent.*; 10 11 15 public class SimpleDeliveryManager 16 implements IDeliveryManager 17 { 18 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SimpleDeliveryManager.class); 19 20 23 private final HashMap senders; 24 25 private static final int SENDERS_INITIAL_MAP_SIZE = 100; 26 27 31 private static ClockDaemon cd = new ClockDaemon(); 32 private static final int UNSEQUENCED_FLUSH_INTERVAL = 33 Integer.valueOf(Configurator.getProperty(ClientConfig.SUB_UNSEQUENCED_FLUSH_TIMEOUT, "750")).intValue(); 34 static { 35 cd.setThreadFactory(new ThreadFactory() { 36 public Thread newThread(Runnable command) 37 { 38 Thread t = new Thread (command, "SimpleDeliveryManager Resequencing Thread"); 39 t.setDaemon(true); 40 return t; 41 } 42 }); 43 } 44 45 49 private final class SimpleSenderState 50 { 51 private long lastSequence; private TreeSet outgoingQueue; 54 SimpleSenderState(long initialSequence) 55 { 56 lastSequence = initialSequence; 57 outgoingQueue = new TreeSet(); 58 } 59 60 long getLastSequence() {return lastSequence;} 61 62 synchronized void deliver(long sequence, 63 Message msg, 64 IMessageSender s) 65 { 66 if (sequence == lastSequence + 1) 67 sendImmediately(sequence, msg, s); 68 else if (sequence <= lastSequence) 69 drop(); 70 else { 72 enqueue(sequence, msg, s); 73 } 74 } 75 76 private void sendImmediately(long sequence, Message msg, IMessageSender s) 77 { 78 s.sendMessage(msg); 80 lastSequence = sequence; 81 82 flush(); 84 } 85 86 private void drop() 87 { 88 } 89 90 private void enqueue(long sequence, Message msg, IMessageSender s) 91 { 92 outgoingQueue.add(new OrderedMessage(sequence, msg, s)); 93 } 94 95 private void flush() 96 { 97 long nextSequence = lastSequence + 1; 98 OrderedMessage om; 99 100 while(outgoingQueue.size() > 0 && 101 (om = (OrderedMessage)outgoingQueue.first()).sequence == nextSequence) 102 { 103 outgoingQueue.remove(om); 104 om.send(); 105 106 log.debug("flushing out of order, sequence=" + nextSequence); 107 108 lastSequence++; 109 nextSequence++; 110 } 111 112 } 113 114 final synchronized void flushQueue() 115 { 116 if (outgoingQueue.size() == 0) return; 117 118 lastSequence = ((OrderedMessage)outgoingQueue.first()).sequence - 1; 119 flush(); 120 } 121 } 122 123 private final class OrderedMessage 124 implements Comparable 125 { 126 private long sequence; 127 private Message payload; 128 private IMessageSender sender; 129 130 OrderedMessage(long sequence, Message payload, IMessageSender sender) 131 { 132 this.sequence = sequence; 133 this.payload = payload; 134 this.sender = sender; 135 } 136 137 public int compareTo(Object o) 138 { 139 OrderedMessage om = (OrderedMessage)o; 140 141 if (om.sequence == sequence) return 0; 142 else return (sequence > om.sequence) ? 1 : -1; 143 } 144 145 public boolean equals(Object o) 146 { 147 OrderedMessage om = (OrderedMessage)o; 148 return om.sequence == sequence; 149 } 150 151 public int hashCode() 152 { 153 return (int)(sequence ^ (sequence >>> 32)); 154 } 155 156 public long getSequence() {return sequence;} 157 public void send() {sender.sendMessage(payload);} 158 } 159 160 164 public SimpleDeliveryManager() 165 { 166 senders = new HashMap(SENDERS_INITIAL_MAP_SIZE); 167 cd.executePeriodically(UNSEQUENCED_FLUSH_INTERVAL, 168 new Runnable () { 169 public void run() 170 { 171 synchronized(senders) 172 { 173 Iterator iter = senders.values().iterator(); 174 while (iter.hasNext()) 175 { 176 SimpleSenderState element = (SimpleSenderState)iter.next(); 177 element.flushQueue(); 178 } 179 } 180 } 181 }, false); 182 } 183 184 198 public void deliver(long senderId, 199 long sequence, 200 Message msg, 201 IMessageSender s) 202 { 203 SimpleSenderState senderState = ((SimpleSenderState)senders.get(new Long (senderId))); 204 if (senderState == null) { 205 senderState = new SimpleSenderState(sequence - 1); synchronized(senders) 207 { 208 senders.put(new Long (senderId), 209 senderState); 210 } 211 } 212 213 senderState.deliver(sequence, 214 msg, 215 s); 216 } 217 218 public IDeliveryManager newInstance() 219 { 220 return new SimpleDeliveryManager(); 221 } 222 223 224 } 225 | Popular Tags |