1 package com.ubermq.jms.client.impl; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.common.datagram.*; 6 import com.ubermq.jms.common.routing.*; 7 import com.ubermq.jms.common.routing.impl.*; 8 import com.ubermq.kernel.*; 9 import com.ubermq.kernel.overflow.*; 10 import com.ubermq.util.*; 11 import com.ubermq.*; 12 import java.io.*; 13 import java.util.*; 14 import javax.jms.*; 15 16 55 final class LocalTopicSubscriber 56 extends AbstractConsumer 57 implements javax.jms.TopicSubscriber 58 { 59 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(LocalTopicSubscriber.class); 60 61 private static int BOUNDED_BUFFER_SIZE = Integer.valueOf(Configurator.getProperty(ClientConfig.SUB_BOUNDED_BUFFER_SIZE, "50")).intValue(); 63 64 67 private Topic t; 68 private boolean noLocal; 69 private String name; private boolean durable; 71 72 85 LocalTopicSubscriber(Topic t, 86 String selector, 87 boolean noLocal, 88 Session ts, 89 IDeliveryManager delivery) 90 throws JMSException 91 { 92 super(ts, 93 selector, 94 delivery, 95 new BoundedPriorityQueue(BOUNDED_BUFFER_SIZE)); 96 97 this.t = t; 99 this.noLocal = noLocal; 100 this.durable = false; 101 102 ts.conn.getClientProcessor().registerSubscription(t.getTopicName(), 104 selector, 105 this); 106 } 107 108 119 LocalTopicSubscriber(Topic t, 120 String selector, 121 boolean noLocal, 122 String name, 123 Session ts, 124 IDeliveryManager delivery) 125 throws JMSException 126 { 127 super(ts, 128 selector, 129 delivery, 130 new LinkedQueue()); 131 132 this.t = t; 133 this.noLocal = noLocal; 134 this.name = name; 135 this.durable = true; 136 137 ts.conn.getClientProcessor().registerDurableSubscription(t.getTopicName(), 139 name, 140 selector, 141 this); 142 } 143 144 public Topic getTopic() {return this.t;} 148 public boolean getNoLocal() {return this.noLocal;} 149 150 public void deliver(IDatagram d) 151 { 152 if (noLocal && 155 getConnection().isSenderLocal(((IMessageDatagram)d).getSenderId())) 156 { 157 log.debug("msg dropped: sender " + ((IMessageDatagram)d).getSenderId() + " is local for topic " + ((IMessageDatagram)d).getTopicName()); 158 return; 159 } 160 161 super.deliver(d); 162 } 163 164 protected void internalAcknowledge(IMessageDatagram md) 165 throws IOException 166 { 167 if (durable) { 168 super.internalAcknowledge(md); 169 } else { 170 } 173 } 174 175 188 public void close() throws JMSException 189 { 190 super.close(); 191 getClientProcessor().unregisterSubscription(t.getTopicName(), 192 this); 193 194 if (durable) 195 { 196 getClientProcessor().durableGoingAway(this.name); 197 } 198 } 199 200 } 201 | Popular Tags |