1 30 31 32 package org.objectweb.jonas_jms; 33 34 import java.util.Enumeration ; 35 import java.util.Hashtable ; 36 import java.util.Vector ; 37 38 import javax.jms.ConnectionFactory ; 39 import javax.jms.JMSException ; 40 import javax.jms.Queue ; 41 import javax.jms.QueueConnectionFactory ; 42 import javax.jms.Topic ; 43 import javax.jms.TopicConnectionFactory ; 44 import javax.jms.XAConnectionFactory ; 45 import javax.jms.XAQueueConnectionFactory ; 46 import javax.jms.XATopicConnectionFactory ; 47 import javax.naming.InitialContext ; 48 import javax.naming.NamingException ; 49 50 import org.objectweb.jonas_jms.api.JmsAdministration; 51 import org.objectweb.jonas_jms.api.JmsManager; 52 import org.objectweb.transaction.jta.TransactionManager; 53 import org.objectweb.util.monolog.api.BasicLevel; 54 55 66 public class JmsManagerImpl implements JmsManager, JmsJmxManagement { 67 68 private JmsAdministration momadmin = null; 69 private InitialContext ictx = null; 70 private ConnectionFactory cf = null; 71 private TopicConnectionFactory tcf = null; 72 private QueueConnectionFactory qcf = null; 73 private Hashtable queues = new Hashtable (); 74 private Hashtable topics = new Hashtable (); 75 private Vector namelist = new Vector (); 76 77 private static TransactionManager tm = null; 78 private static JmsManagerImpl unique = null; 79 80 83 private JmsManagerImpl() { 84 TraceJms.logger.log(BasicLevel.DEBUG,""); 85 } 86 87 90 public static JmsManager getJmsManager() { 91 if (unique == null) { 92 unique = new JmsManagerImpl(); 93 } 94 return (JmsManager) unique; 95 } 96 97 100 public static JmsJmxManagement getJmsJmxManagement() { 101 if (unique == null) { 102 unique = new JmsManagerImpl(); 103 } 104 return (JmsJmxManagement) unique; 105 } 106 107 111 114 public static TransactionManager getTransactionManager() { 115 return tm; 116 } 117 118 122 129 public void init(Class cl, boolean collocated, String url, TransactionManager trm) throws Exception { 130 TraceJms.logger.log(BasicLevel.DEBUG,""); 131 tm = trm; 132 int maxloops = collocated ? 1 : 5; 133 for (int i = 0; i < maxloops; i++) { 134 try { 135 ictx = new InitialContext (); 137 momadmin = (JmsAdministration) cl.newInstance(); 139 momadmin.start(collocated, url); 140 break; 141 } catch (NamingException e) { 142 TraceJms.logger.log(BasicLevel.ERROR, "cannot get Initial Context", e); 143 throw e; 144 } catch (NullPointerException e) { 145 TraceJms.logger.log(BasicLevel.ERROR, "exception: ", e); 146 throw e; 147 } catch (Exception e) { 148 if (i < maxloops) { 149 if(TraceJms.isDebug()) 150 TraceJms.logger.log(BasicLevel.DEBUG, "cannot reach the MOM - retrying..."); 151 try { 152 Thread.sleep(2000*(i+1)); 154 } catch (InterruptedException e2) { 155 throw new JMSException ("Cannot reach the MOM"); 156 } 157 } else { 158 TraceJms.logger.log(BasicLevel.ERROR, "cannot load admin class "+e); 159 throw e; 160 } 161 } 162 } 163 getConnectionFactory(); 166 getTopicConnectionFactory(); 167 getQueueConnectionFactory(); 168 } 169 170 173 public Queue createQueue(String name) throws Exception { 174 Queue queue = null; 175 try { 177 queue = (Queue ) ictx.lookup(name); 178 if(TraceJms.isDebug()) 179 TraceJms.logger.log(BasicLevel.DEBUG, "queue " + name +" already found"); 180 return queue; 181 } catch (NamingException ex) { 182 } 183 if(TraceJms.isDebug()) 184 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering queue "+name); 185 queue = momadmin.createQueue(name); 186 namelist.addElement(name); 189 queues.put(name, queue); 190 return queue; 191 } 192 193 196 public Topic createTopic(String name) throws Exception { 197 Topic topic = null; 198 try { 200 topic = (Topic ) ictx.lookup(name); 201 if(TraceJms.isDebug()) 202 TraceJms.logger.log(BasicLevel.DEBUG, "topic " + name + " already found"); 203 return topic; 204 } catch (NamingException ex) { 205 } 206 if(TraceJms.isDebug()) 207 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering topic " + name); 208 topic = momadmin.createTopic(name); 209 namelist.addElement(name); 212 topics.put(name, topic); 213 return topic; 214 } 215 216 219 public ConnectionFactory getConnectionFactory() { 220 if (cf == null) { 221 String name = "CF"; 222 cf = new org.objectweb.jonas_jms.JConnectionFactory(name); 223 try { 224 if(TraceJms.isDebug()) 225 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering "+name); 226 ictx.rebind(name, cf); 227 } catch (NamingException e) { 228 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "+name+" :"+e); 229 } 230 } 231 return cf; 232 } 233 234 237 public TopicConnectionFactory getTopicConnectionFactory() { 238 if (tcf == null) { 239 String name = "TCF"; 240 tcf = new org.objectweb.jonas_jms.JTopicConnectionFactory(name); 241 try { 242 if(TraceJms.isDebug()) 243 TraceJms.logger.log(BasicLevel.DEBUG,"creating and registering " + name); 244 ictx.rebind(name, tcf); 245 } catch (NamingException e) { 246 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e); 247 } 248 } 249 return tcf; 250 } 251 252 255 public QueueConnectionFactory getQueueConnectionFactory() { 256 if (qcf == null) { 257 String name = "QCF"; 258 qcf = new org.objectweb.jonas_jms.JQueueConnectionFactory(name); 259 try { 260 if(TraceJms.isDebug()) 261 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering " + name); 262 ictx.rebind(name, qcf); 263 } catch (NamingException e) { 264 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e); 265 } 266 } 267 return qcf; 268 } 269 270 273 public XAConnectionFactory getXAConnectionFactory() { 274 return momadmin.getXAConnectionFactory(); 275 } 276 277 280 public XATopicConnectionFactory getXATopicConnectionFactory() { 281 return momadmin.getXATopicConnectionFactory(); 282 } 283 284 287 public XAQueueConnectionFactory getXAQueueConnectionFactory() { 288 return momadmin.getXAQueueConnectionFactory(); 289 } 290 291 294 public Queue getQueue(String name) throws Exception { 295 Queue q = (Queue )queues.get(name); 296 if (q == null) { 297 q = createQueue(name); 298 } 299 return q; 300 } 301 302 305 public Topic getTopic(String name) throws Exception { 306 Topic t = (Topic )topics.get(name); 307 if (t == null) { 308 t = createTopic(name); 309 } 310 return t; 311 } 312 313 316 public Enumeration getTopicsNames() { 317 return topics.keys(); 318 } 319 320 323 public Enumeration getQueuesNames() { 324 return queues.keys(); 325 } 326 327 330 public void stop() throws Exception { 331 if (cf != null) { 333 ((JConnectionFactory) cf).cleanPool(); 334 } 335 336 if (tcf != null) { 337 ((JConnectionFactory) tcf).cleanPool(); 338 } 339 340 if (qcf != null) { 341 ((JConnectionFactory) qcf).cleanPool(); 342 } 343 344 if (momadmin != null) { 346 momadmin.stop(); 347 } 348 349 Enumeration ln = namelist.elements(); 351 while (ln.hasMoreElements()) { 352 String name = (String )ln.nextElement(); 353 try { 354 ictx.unbind(name); 355 if(TraceJms.isDebug()) 356 TraceJms.logger.log(BasicLevel.DEBUG, "unbind " + name); 357 } catch(NamingException e) { 358 if(TraceJms.isDebug()) 359 TraceJms.logger.log(BasicLevel.ERROR, "cannot unbind " + name); 360 } 361 } 362 363 } 364 365 369 372 public int getCurrentNumberOfJmsConnectionFactory() { 373 int result = 0; 374 if (cf != null) result++; 375 return result; 376 } 377 378 381 public int getCurrentNumberOfJmsTopicConnectionFactory() { 382 int result = 0; 383 if (tcf != null) result++; 384 return result; 385 } 386 387 390 public int getCurrentNumberOfJmsQueueConnectionFactory() { 391 int result = 0; 392 if (qcf != null) result++; 393 return result; 394 } 395 396 399 public int getCurrentNumberOfJmsTopicDestination() { 400 return topics.size(); 401 } 402 403 406 public int getCurrentNumberOfJmsQueueDestination() { 407 return queues.size(); 408 } 409 410 415 public String removeJmsDestination(String jndiName) throws Exception { 416 417 momadmin.deleteDestination(jndiName); 418 419 String destType = null; 420 if ((queues.containsKey(jndiName))||(topics.containsKey(jndiName))) { 421 try { 422 InitialContext ictx = new InitialContext (); 423 ictx.unbind(jndiName); 424 Object o = null; o = queues.remove(jndiName); 426 if (o == null) { 427 o = topics.remove(jndiName); 429 if (o != null) { 430 destType = "topic"; 431 } else { 432 destType = "unknown"; 434 } 435 } else { 436 destType = "queue"; 437 } 438 return destType; 439 } catch(Exception ex) { 440 throw new Exception ("JmsManagerImpl remove destination : cannot unbind " + jndiName ); 441 } 442 } else { 443 throw new Exception ("Unexisting jms destination :" + jndiName +" remove abord"); 444 } 445 } 446 447 450 public String getDefaultConnectionFactoryName() { 451 return "CF"; 452 } 453 454 457 public String getDefaultQueueConnectionFactoryName() { 458 return "QCF"; 459 } 460 461 464 public String getDefaultTopicConnectionFactoryName() { 465 return "TCF"; 466 } 467 468 470 476 public String getConnectionFactoryMode(String jndiName) throws Exception { 477 TraceJms.logger.log(BasicLevel.DEBUG, ""); 478 479 if (jndiName.equals("CF")) 480 return "Point-To-Point and Publish/Subscribe"; 481 else if (jndiName.equals("QCF")) 482 return "Point-To-Point"; 483 else if (jndiName.equals("TCF")) 484 return "Publish/Subscribe"; 485 else 486 throw new IllegalStateException ("Unknown factory."); 487 } 488 489 490 496 public int getPendingMessages(String jndiName) throws Exception { 497 TraceJms.logger.log(BasicLevel.DEBUG, ""); 498 Queue queue = (Queue )queues.get(jndiName); 499 int n; 500 if (queue != null) { 501 n = momadmin.getPendingMessages(queue); 503 } else { 504 throw new IllegalStateException (jndiName + " not a queue created by the jms service"); 505 } 506 return n; 507 } 508 509 515 public int getPendingRequests(String jndiName) throws Exception { 516 TraceJms.logger.log(BasicLevel.DEBUG, ""); 517 Queue queue = (Queue )queues.get(jndiName); 518 int n; 519 if (queue != null) { 520 n = momadmin.getPendingRequests(queue); 522 } else { 523 throw new IllegalStateException (jndiName + " not a queue created by the jms service"); 524 } 525 return n; 526 } 527 528 534 public int getSubscriptions(String jndiName) throws Exception { 535 Topic topic = (Topic )topics.get(jndiName); 536 int n; 537 if (topic != null) { 538 n = momadmin.getSubscriptions(topic); 540 } else { 541 throw new IllegalStateException (jndiName + " not a topic created by the jms service"); 542 } 543 return n; 544 } 545 } 546 | Popular Tags |