1 28 29 package com.caucho.jms; 30 31 import com.caucho.jms.message.ObjectMessageImpl; 32 import com.caucho.jms.message.TextMessageImpl; 33 import com.caucho.jms.selector.Selector; 34 import com.caucho.jms.session.MessageAvailableListener; 35 import com.caucho.jms.session.MessageProducerImpl; 36 import com.caucho.jms.session.QueueSenderImpl; 37 import com.caucho.jms.session.SessionImpl; 38 import com.caucho.jms.session.TopicPublisherImpl; 39 import com.caucho.loader.Environment; 40 import com.caucho.services.message.MessageSender; 41 import com.caucho.services.message.MessageServiceException; 42 import com.caucho.util.Alarm; 43 import com.caucho.util.Base64; 44 import com.caucho.util.CharBuffer; 45 import com.caucho.util.NullEnumeration; 46 import com.caucho.util.RandomUtil; 47 48 import javax.jms.*; 49 import java.io.Serializable ; 50 import java.lang.ref.SoftReference ; 51 import java.util.ArrayList ; 52 import java.util.Enumeration ; 53 import java.util.HashMap ; 54 import java.util.Iterator ; 55 import java.util.logging.Logger ; 56 57 60 abstract public class AbstractDestination implements Destination, MessageSender 61 { 62 protected static Logger log 63 = Logger.getLogger(AbstractDestination.class.getName()); 64 65 private String _idPrefix; 66 private long _idCount; 67 68 private volatile long _consumerSequenceId; 69 70 private ArrayList <SoftReference <MessageAvailableListener>> _listenerRefs = 71 new ArrayList <SoftReference <MessageAvailableListener>>(); 72 73 76 protected AbstractDestination() 77 { 78 CharBuffer cb = new CharBuffer(); 79 80 cb.append("ID:"); 81 Object serverId = Environment.getAttribute("caucho.server-id"); 82 if (serverId != null) 83 cb.append(serverId); 84 Base64.encode(cb, RandomUtil.getRandomLong()); 85 Base64.encode(cb, Alarm.getCurrentTime()); 86 87 _idPrefix = cb.toString(); 88 } 89 90 93 public synchronized String generateMessageID() 94 { 95 return _idPrefix + _idCount++; 96 } 97 98 101 public long getConsumerSequenceId() 102 { 103 return _consumerSequenceId; 104 } 105 106 109 protected synchronized long nextConsumerSequenceId() 110 { 111 return ++_consumerSequenceId; 112 } 113 114 public MessageProducer createProducer(SessionImpl session) 115 { 116 if (this instanceof Queue) 117 return new QueueSenderImpl(session, (Queue) this); 118 else if (this instanceof Topic) 119 return new TopicPublisherImpl(session, (Topic) this); 120 else 121 return new MessageProducerImpl(session, (Destination) this); 122 } 123 124 129 public void send(Message message) 130 throws JMSException 131 { 132 throw new UnsupportedOperationException (); 133 } 134 135 138 public void addListener(MessageAvailableListener listener) 139 { 140 synchronized (_listenerRefs) { 141 _listenerRefs.add(new SoftReference <MessageAvailableListener>(listener)); 142 } 143 144 listener.messageAvailable(); 145 } 146 147 150 public void removeListener(MessageAvailableListener listener) 151 { 152 synchronized (_listenerRefs) { 153 for (int i = _listenerRefs.size() - 1; i >= 0; i--) { 154 SoftReference <MessageAvailableListener> ref = _listenerRefs.get(i); 155 156 MessageAvailableListener oldListener = ref.get(); 157 158 if (oldListener == null) 159 _listenerRefs.remove(i); 160 else if (oldListener == listener) { 161 _listenerRefs.remove(i); 162 return; 163 } 164 } 165 } 166 } 167 168 171 protected void messageAvailable() 172 { 173 synchronized (_listenerRefs) { 174 for (int i = _listenerRefs.size() - 1; i >= 0; i--) { 175 SoftReference <MessageAvailableListener> ref = _listenerRefs.get(i); 176 177 MessageAvailableListener listener = ref.get(); 178 179 if (listener != null) { 180 listener.messageAvailable(); 181 } 182 else 183 _listenerRefs.remove(i); 184 } 185 } 186 } 187 188 195 public MessageConsumer createConsumer(SessionImpl session, 196 String selector, 197 boolean noLocal) 198 throws JMSException 199 { 200 throw new UnsupportedOperationException (getClass().getName()); 201 } 202 203 209 public QueueBrowser createBrowser(SessionImpl session, 210 String selector) 211 throws JMSException 212 { 213 throw new UnsupportedOperationException (getClass().getName()); 214 } 215 216 224 public TopicSubscriber createDurableSubscriber(SessionImpl session, 225 String selector, 226 boolean noLocal, 227 String name) 228 throws JMSException 229 { 230 throw new UnsupportedOperationException (getClass().getName()); 231 } 232 233 236 public Enumeration getEnumeration(Selector selector) 237 throws JMSException 238 { 239 return NullEnumeration.create(); 240 } 241 242 244 247 public void send(HashMap headers, Object data) 248 throws MessageServiceException 249 { 250 try { 251 Message message; 252 253 if (data instanceof String ) { 254 TextMessage msg = new TextMessageImpl(); 255 msg.setText((String ) data); 256 257 message = msg; 258 } 259 else if (data instanceof Serializable ) { 260 ObjectMessage msg = new ObjectMessageImpl(); 261 msg.setObject((Serializable ) data); 262 message = msg; 263 } 264 else 265 throw new MessageServiceException("not a serializable object: " + data); 266 267 if (headers != null) { 268 Iterator <String > iter = headers.keySet().iterator(); 269 while (iter.hasNext()) { 270 String key = iter.next(); 271 Object value = headers.get(key); 272 273 message.setObjectProperty(key, value); 274 } 275 } 276 277 send(message); 278 } catch (MessageServiceException e) { 279 throw e; 280 } catch (Exception e) { 281 throw new MessageServiceException(e); 282 } 283 } 284 } 285 286 | Popular Tags |