1 package net.walend.somnifugi; 2 3 import javax.naming.Context ; 4 import javax.naming.NamingException ; 5 6 import javax.jms.TopicSession ; 7 import javax.jms.Topic ; 8 import javax.jms.Message ; 9 import javax.jms.JMSException ; 10 import javax.jms.TopicSubscriber ; 11 import javax.jms.TopicPublisher ; 12 import javax.jms.TemporaryTopic ; 13 import javax.jms.InvalidDestinationException ; 14 import javax.jms.Destination ; 15 import javax.jms.MessageProducer ; 16 import javax.jms.MessageConsumer ; 17 import javax.jms.Queue ; 18 import javax.jms.QueueBrowser ; 19 import javax.jms.TemporaryQueue ; 20 21 import net.walend.somnifugi.channel.Takable; 22 23 29 30 public class SomniTopicSession 31 extends SomniSession 32 implements TopicSession 33 { 34 private int tempCount=0; 35 36 protected SomniTopicSession(String name,SomniExceptionListener exceptionLisetener,boolean started,Context context,int acknowledgeMode,String connectionClientID) 37 { 38 super(name,exceptionLisetener,started,context,acknowledgeMode,connectionClientID); 39 } 40 41 private TopicSubscriber createSubscriber(Topic topic,String subscriberName,boolean durable,SomniMessageSelector messageSelector,boolean noLocal) 42 throws JMSException 43 { 44 SomniTopic atopic = (SomniTopic)topic; 45 46 synchronized(guard) 47 { 48 checkClosed(); 49 50 Takable<Message > takable = atopic.addSubscriber(subscriberName,durable,messageSelector,noLocal,getConnectionClientID()); 51 52 SomniTopicSubscriber result = new SomniTopicSubscriber(atopic,takable,subscriberName,getExceptionListener(),this); 53 addConsumer(result); 54 55 return result; 56 } 57 } 58 59 91 public TopicSubscriber createSubscriber(Topic topic,SomniMessageSelector messageSelector) 92 throws JMSException 93 { 94 String subscriberName = createConsumerName(topic.getTopicName(),"Subscriber"); 95 96 return createSubscriber(topic,subscriberName,false,messageSelector,false); 97 } 98 99 101 121 public Topic createTopic(String topicName) 122 throws JMSException 123 { 124 return SomniTopicCache.IT.getTopic(topicName,getContext()); 125 } 126 127 146 public TopicSubscriber createSubscriber(Topic topic) 147 throws JMSException 148 { 149 String subscriberName = createConsumerName(topic.getTopicName(),"Subscriber"); 150 151 return createSubscriber(topic,subscriberName,false,null,false); 152 } 153 154 187 public TopicSubscriber createSubscriber(Topic topic, String messageSelector,boolean noLocal) 188 throws JMSException 189 { 190 String subscriberName = createConsumerName(topic.getTopicName(),"Subscriber"); 191 SomniMessageSelector ms = null; 192 if((messageSelector != null)&&(!("".equals(messageSelector)))) 193 { 194 ms = new SQL92MessageSelector(messageSelector); 195 } 196 197 return createSubscriber(topic,subscriberName,false,ms,noLocal); 198 } 199 200 234 public TopicSubscriber createDurableSubscriber(Topic topic,String name) 235 throws JMSException 236 { 237 return createSubscriber(topic,name,true,null,false); 238 } 239 240 281 public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) 282 throws JMSException 283 { 284 SomniMessageSelector ms = null; 285 if((messageSelector != null)&&(!("".equals(messageSelector)))) 286 { 287 ms = new SQL92MessageSelector(messageSelector); 288 } 289 return createSubscriber(topic,name,true,ms,noLocal); 290 } 291 292 308 public TopicPublisher createPublisher(Topic topic) 309 throws JMSException 310 { 311 synchronized(guard) 312 { 313 checkClosed(); 314 SomniTopicPublisher result = new SomniTopicPublisher((SomniTopic)topic,createProducerName(topic.getTopicName(),"Publisher"),getConnectionClientID()); 315 addProducer(result); 316 return result; 317 } 318 } 319 320 328 public TemporaryTopic createTemporaryTopic() 329 throws JMSException 330 { 331 try 332 { 333 String tempName; 334 synchronized(guard) 335 { 336 tempName = getName()+":temp"+tempCount; 337 tempCount++; 338 } 339 SomniTemporaryTopic topic = new SomniTemporaryTopic(tempName,ChannelFactoryCache.IT.getChannelFactory(tempName,getContext(),false),ChannelFactoryCache.IT.getFanOutFactory(tempName,getContext()),getContext()); 340 SomniTopicCache.IT.putTemporaryTopic(topic); 341 342 return topic; 343 } 344 catch(NamingException ne) 345 { 346 throw new SomniNamingException(ne); 347 } 348 } 349 350 367 public void unsubscribe(String name) 368 throws JMSException 369 { 370 SomniTopicCache.IT.endDurableSubscription(name); 371 SomniLogger.IT.finer(getName()+" unsubscribed from "+name); 372 } 373 374 376 396 397 public MessageProducer createProducer(Destination destination) throws JMSException 398 { 399 try 400 { 401 return createPublisher((Topic )destination); 402 } 403 catch(ClassCastException cce) 404 { 405 throw new InvalidDestinationException ("destination must be a Topic, not a "+destination.getClass().getName(),cce.getMessage()); 406 } 407 } 408 409 423 424 public MessageConsumer createConsumer(Destination destination) throws JMSException 425 { 426 try 427 { 428 return createSubscriber((Topic )destination); 429 } 430 catch(ClassCastException cce) 431 { 432 throw new InvalidDestinationException ("destination must be a Topic, not a "+destination.getClass().getName(),cce.getMessage()); 433 } 434 } 435 436 462 public MessageConsumer createConsumer(Destination destination, String messageSelector) 463 throws JMSException 464 { 465 try 466 { 467 return createSubscriber((Topic )destination,messageSelector,false); 469 } 470 catch(ClassCastException cce) 471 { 472 throw new InvalidDestinationException ("destination must be a Topic, not a "+destination.getClass().getName(),cce.getMessage()); 473 } 474 } 475 476 477 514 public MessageConsumer createConsumer(Destination destination,String messageSelector,boolean NoLocal) 515 throws JMSException 516 { 517 try 518 { 519 return createSubscriber((Topic )destination,messageSelector,NoLocal); 520 } 521 catch(ClassCastException cce) 522 { 523 throw new InvalidDestinationException ("destination must be a Topic, not a "+destination.getClass().getName(),cce.getMessage()); 524 } 525 } 526 527 548 549 public Queue createQueue(String queueName) throws JMSException 550 { 551 throw new IllegalStateException ("Don't use a TopicSession to create a Queue."); 552 } 553 554 567 public QueueBrowser createBrowser(Queue queue) throws JMSException 568 { 569 throw new IllegalStateException ("Don't use a TopicSession to work with Queues."); 570 } 571 572 573 591 592 public QueueBrowser createBrowser(Queue queue,String messageSelector) 593 throws JMSException 594 { 595 throw new IllegalStateException ("Don't use a TopicSession to work with Queues."); 596 } 597 598 599 609 610 public TemporaryQueue createTemporaryQueue() throws JMSException 611 { 612 throw new IllegalStateException ("Don't use a TopicSession to work with Queues."); 613 } 614 } 615 616 636 | Popular Tags |