| 1 20 package com.presumo.jms.router; 21 22 import com.presumo.jms.message.JmsMessage; 23 import com.presumo.jms.plugin.MessageQueue; 24 import com.presumo.jms.resources.Resources; 25 import com.presumo.jms.selector.JmsOperand; 26 import com.presumo.jms.selector.Parser; 27 28 import com.presumo.util.log.Logger; 29 import com.presumo.util.log.LoggerFactory; 30 31 import java.io.IOException ; 32 import java.util.ArrayList ; 33 import javax.jms.ExceptionListener ; 34 import javax.jms.DeliveryMode ; 35 import javax.jms.JMSException ; 36 37 38 49 public final class Router extends RouterAdapter 50 { 51 52 private final Parser parser; 53 54 55 private RoutingTarget [] targets = new RoutingTarget[0]; 56 57 58 private int numOfTargets; 59 60 61 private final Object targetChangeLock = "JVM_Router_Target_change_lock"; 62 63 64 private final ArrayList eListeners = new ArrayList (); 65 66 67 private long msgsRouted; 68 69 70 private String name = "router"; 71 72 76 public Router(MessageQueue queue) 77 { 78 super(queue, 100, "JVM Router"); 79 parser = Parser.getInstance(); 80 } 81 82 86 public final String getName() 87 { 88 return name; 89 } 90 91 public final void setName(String value) 92 { 93 name = value; 94 } 95 96 100 public final void addTarget(RoutingTarget target) 101 { 102 synchronized (targetChangeLock) { 103 logger.entry("addTarget: " + target.toString()); 104 105 boolean targetAdded = false; 106 for (int i=0; i < targets.length; ++i) { 107 if (targets[i] == null) { 108 targets[i] = target; 109 target.setTargetID(i); 110 targetAdded = true; 111 break; 112 } 113 } 114 115 if (! targetAdded) { 116 int newSize = targets.length + 1; 117 int targetID = targets.length; 118 RoutingTarget [] tmp = new RoutingTarget[newSize]; 119 System.arraycopy(targets, 0, tmp, 0, targets.length); 120 tmp[targetID] = target; 121 targets = tmp; 122 target.setTargetID(targetID); 123 } 124 125 ++numOfTargets; 126 recalculateFilters(true); 127 128 if (numOfTargets == 1) 129 this.startRouter(); 130 131 logger.exit("addTarget"); 132 } 133 } 134 135 138 public final void removeTarget(RoutingTarget target) 139 { 140 synchronized (targetChangeLock) 141 { 142 logger.entry("removeTarget"); 143 144 for (int i=0; i < targets.length; ++i) { 145 if (target.equals(targets[i])) { 146 targets[i] = null; 147 --numOfTargets; 148 recalculateFilters(false); 149 } 150 } 151 152 if (numOfTargets == 0) { 153 this.stopRouter(); 154 } 155 logger.exit("removeTarget"); 156 } 157 } 158 159 162 public final void recalculateFilters(boolean filterAdded) 163 { 164 synchronized (targetChangeLock) { 165 logger.entry("recalculateFilters"); 166 167 int i,j, targetLoc; 168 169 for (i=0; i < targets.length; ++i) { 170 RoutingTarget t = targets[i]; 171 if (t != null && t.needsFilterUpdates()) { 172 if (numOfTargets == 1) { 173 try { 174 t.setRemoteRoutingFilter(parser.parseFilter("false"), false); 175 } catch (javax.jms.InvalidSelectorException ise) {} 176 break; 177 } 178 JmsOperand [] allFilters = new JmsOperand[numOfTargets-1]; 179 180 for (j=0, targetLoc=0; targetLoc < targets.length; ++targetLoc) { 181 182 if (targetLoc != i && targets[targetLoc] != null) { 184 allFilters[j] = targets[targetLoc].getRoutingFilter(); 185 ++j; 186 } 187 } 188 JmsOperand joinedFilter = parser.orTogether(allFilters); 189 t.setRemoteRoutingFilter(joinedFilter, filterAdded); 190 } 191 } 193 logger.exit("recalculateFilters"); 194 } } 196 197 public final void routeMessage(JmsMessage msg) throws IOException  198 { 199 ++msgsRouted; 200 queueMessage(msg); 201 } 202 203 public final void routeMessages(JmsMessage [] msgs) throws IOException  204 { 205 msgsRouted += msgs.length; 206 queueMessages(msgs); 207 } 208 209 public final void addExceptionListener(ExceptionListener listener) 210 { 211 synchronized (eListeners) { 212 eListeners.add(listener); 213 } 214 } 215 216 public final void removeExceptionListener(ExceptionListener listener) 217 { 218 synchronized (eListeners) { 219 eListeners.remove(listener); 220 } 221 } 222 223 224 228 232 protected final void routeMessages(int batchsize) 233 { 234 235 try { 236 JmsMessage [] msgs = getNext(batchsize); 238 if (msgs == null) return; 239 int length = msgs.length; 240 241 synchronized(targetChangeLock) 242 { 243 parser.obtainLock(); 244 int targetLength = targets.length; 245 246 for (int i=0; i < length; i++) 247 { 248 JmsMessage message = msgs[i]; parser.resetEvaluateOnce(); 252 for (int j=0; j < targetLength; j++) 254 { 255 RoutingTarget t = targets[j]; 256 257 if ( t != null) { 258 message = t.takeMessage(message); 259 if (message == null) 260 break; 261 } 262 } 263 } 265 } 266 parser.releaseLock(); 267 } catch (IOException ioe) { 268 } 270 } 271 272 276 280 private void reportException(Exception e) 281 { 282 JMSException jmsex; 283 if (e instanceof JMSException ) { 284 jmsex = (JMSException ) e; 285 } else { 286 jmsex = new JMSException ("An exception occurred in the Router: " + 287 e.toString()); 288 jmsex.setLinkedException(e); 289 } 290 synchronized (eListeners) { 291 if (eListeners.size() == 0) { 292 jmsex.printStackTrace(); 293 } else { 294 for (int i=0; i < eListeners.size(); ++i) 295 ( (ExceptionListener ) eListeners.get(i)).onException(jmsex); 296 } 297 } 298 } 299 300 private static Logger logger = 302 LoggerFactory.getLogger(Router.class, Resources.getBundle()); 303 } 305 | Popular Tags |