1 43 44 package org.exolab.jms.persistence; 45 46 import java.sql.Connection ; 47 import java.sql.PreparedStatement ; 48 import java.sql.ResultSet ; 49 import java.sql.SQLException ; 50 import java.util.ArrayList ; 51 import java.util.Collections ; 52 import java.util.Date ; 53 import java.util.HashMap ; 54 import java.util.Iterator ; 55 import java.util.List ; 56 import java.util.Vector ; 57 58 import javax.jms.JMSException ; 59 import javax.sql.DataSource ; 60 61 import org.apache.commons.logging.Log; 62 import org.apache.commons.logging.LogFactory; 63 64 import org.exolab.jms.client.JmsDestination; 65 import org.exolab.jms.client.JmsTopic; 66 67 68 75 class Consumers { 76 77 80 private HashMap _consumers; 81 82 85 private static Consumers _instance; 86 87 91 private static final Boolean block = new Boolean (true); 92 93 96 private static final String CONSUMER_ID_SEED = "consumerId"; 97 98 102 private static final String CONSUMER_MESSAGE = "message_handles"; 103 104 107 private static final Log _log = LogFactory.getLog(Consumers.class); 108 109 110 118 public static Consumers instance() { 119 return _instance; 120 } 121 122 129 public static Consumers initialise(Connection connection) 130 throws PersistenceException { 131 132 if (_instance == null) { 133 synchronized (block) { 134 if (_instance == null) { 135 _instance = new Consumers(); 136 _instance.load(connection); 137 } 138 } 139 } 140 return _instance; 141 } 142 143 155 public synchronized void add(Connection connection, String dest, 156 String consumer) 157 throws PersistenceException { 158 159 JmsDestination destination = null; 160 Destinations singleton = Destinations.instance(); 161 long destinationId = 0; 162 163 synchronized (singleton) { 164 destination = singleton.get(dest); 165 if (destination == null) { 166 raise("add", consumer, dest, "destination is invalid"); 167 } 168 destinationId = singleton.getId(dest); 169 } 170 171 if ((destination instanceof JmsTopic) && 174 (consumer.equals(dest))) { 175 raise("add", consumer, dest, 176 "The consumer name and destination name cannot be the same"); 177 } 178 179 long consumerId = SeedGenerator.instance().next(connection, 181 CONSUMER_ID_SEED); 182 183 PreparedStatement insert = null; 184 try { 185 insert = connection.prepareStatement( 186 "insert into consumers values (?,?,?,?)"); 187 188 long created = (new Date ()).getTime(); 189 insert.setString(1, consumer); 190 insert.setLong(2, destinationId); 191 insert.setLong(3, consumerId); 192 insert.setLong(4, created); 193 insert.executeUpdate(); 194 195 Consumer map = new Consumer(consumer, consumerId, 196 destinationId, created); 197 198 if (!_consumers.containsKey(consumer)) { 201 _consumers.put(consumer, map); 202 } else { 203 _log.error("Durable consumer with name " + consumer 204 + " already exists."); 205 } 206 } catch (Exception exception) { 207 throw new PersistenceException( 208 "Failed to add consumer, destination=" + dest + 209 ", name=" + consumer, exception); 210 } finally { 211 SQLHelper.close(insert); 212 } 213 } 214 215 224 public synchronized void remove(Connection connection, String name) 225 throws PersistenceException { 226 227 PreparedStatement delete = null; 228 229 Consumer map = (Consumer) _consumers.get(name); 231 if (map == null) { 232 raise("remove", name, "consumer does not exist"); 233 } 234 235 try { 236 delete = connection.prepareStatement( 237 "delete from consumers where name=?"); 238 delete.setString(1, name); 239 delete.executeUpdate(); 240 241 remove(CONSUMER_MESSAGE, map.consumerId, connection); 244 245 _consumers.remove(name); 247 } catch (SQLException exception) { 248 throw new PersistenceException( 249 "Failed to remove consumer=" + name, exception); 250 } finally { 251 SQLHelper.close(delete); 252 } 253 } 254 255 262 public synchronized long getConsumerId(String name) { 263 Consumer map = (Consumer) _consumers.get(name); 264 return (map != null) ? map.consumerId : 0; 265 } 266 267 272 public synchronized boolean exists(String name) { 273 return (_consumers.get(name) != null); 274 } 275 276 281 public synchronized Vector getDurableConsumers(String destination) { 282 Vector result = new Vector (); 283 long destinationId = Destinations.instance().getId(destination); 284 if (destinationId != 0) { 285 Iterator iter = _consumers.values().iterator(); 286 while (iter.hasNext()) { 287 Consumer map = (Consumer) iter.next(); 288 if (map.destinationId == destinationId) { 289 result.add(map.name); 290 } 291 } 292 } 293 294 return result; 295 } 296 297 302 public synchronized HashMap getAllDurableConsumers() { 303 HashMap result = new HashMap (); 304 305 Iterator iter = _consumers.values().iterator(); 306 while (iter.hasNext()) { 307 Consumer map = (Consumer) iter.next(); 308 JmsDestination dest = Destinations.instance().get( 309 map.destinationId); 310 311 if (dest instanceof JmsTopic) { 312 result.put(map.name, dest.getName()); 313 } 314 } 315 316 return result; 317 } 318 319 324 public synchronized String getConsumerName(long id) { 325 String name = null; 326 Iterator iter = _consumers.values().iterator(); 327 328 while (iter.hasNext()) { 329 Consumer map = (Consumer) iter.next(); 330 if (map.consumerId == id) { 331 name = map.name; 332 break; 333 } 334 } 335 336 return name; 337 } 338 339 342 public synchronized void close() { 343 _consumers.clear(); 344 _consumers = null; 345 346 _instance = null; 347 } 348 349 354 protected synchronized void removeCached(long destinationId) { 355 Object [] list = _consumers.values().toArray(); 356 for (int i = 0; i < list.length; i++) { 357 Consumer map = (Consumer) list[i]; 358 if (map.destinationId == destinationId) { 359 _consumers.remove(map.name); 360 } 361 } 362 } 363 364 367 private Consumers() { 368 _consumers = new HashMap (); 369 } 370 371 383 private void load(Connection connection) 384 throws PersistenceException { 385 386 PreparedStatement select = null; 387 ResultSet set = null; 388 try { 389 select = connection.prepareStatement( 390 "select name, consumerid, destinationid, created " 391 + "from consumers"); 392 set = select.executeQuery(); 393 String name = null; 394 long consumerId = 0; 395 long destinationId = 0; 396 long created = 0; 397 Consumer map = null; 398 while (set.next()) { 399 name = set.getString(1); 400 consumerId = set.getLong(2); 401 destinationId = set.getLong(3); 402 created = set.getLong(4); 403 map = new Consumer(name, consumerId, destinationId, 404 created); 405 _consumers.put(name, map); 406 } 407 } catch (SQLException exception) { 408 throw new PersistenceException("Failed to retrieve consumers", 409 exception); 410 } finally { 411 SQLHelper.close(set); 412 SQLHelper.close(select); 413 } 414 } 415 416 425 private void remove(String table, long consumerId, Connection connection) 426 throws SQLException { 427 428 PreparedStatement delete = null; 429 try { 430 delete = connection.prepareStatement( 431 "delete from " + table + " where consumerId=?"); 432 delete.setLong(1, consumerId); 433 delete.executeUpdate(); 434 } finally { 435 SQLHelper.close(delete); 436 } 437 } 438 439 447 private void raise(String operation, String name, String destination, 448 String reason) 449 throws PersistenceException { 450 throw new PersistenceException("Cannot " + operation + " consumer=" + 451 name + ", destination=" + destination + ": " + reason); 452 } 453 454 461 private void raise(String operation, String name, String reason) 462 throws PersistenceException { 463 throw new PersistenceException("Cannot " + operation + " consumer=" + 464 name + ": " + reason); 465 } 466 467 470 private class Consumer { 471 472 475 public String name; 476 477 480 public long consumerId; 481 482 486 public long destinationId; 487 488 491 public long created; 492 493 494 public Consumer(String name, long consumerId, long destinationId, 495 long created) { 496 497 this.name = name; 498 this.consumerId = consumerId; 499 this.destinationId = destinationId; 500 this.created = created; 501 } 502 503 public String getKey() { 504 return name; 505 } 506 } 507 } 508 | Popular Tags |