1 package net.walend.somnifugi; 2 3 import java.util.Set ; 4 import java.util.HashSet ; 5 import java.util.Map ; 6 import java.util.HashMap ; 7 import java.util.Iterator ; 8 import java.util.List ; 9 import java.util.LinkedList ; 10 11 import java.io.Serializable ; 12 13 import javax.naming.Context ; 14 15 import javax.jms.Session ; 16 import javax.jms.BytesMessage ; 17 import javax.jms.JMSException ; 18 import javax.jms.MapMessage ; 19 import javax.jms.Message ; 20 import javax.jms.ObjectMessage ; 21 import javax.jms.StreamMessage ; 22 import javax.jms.TextMessage ; 23 import javax.jms.MessageListener ; 26 import javax.jms.Destination ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.Queue ; 30 import javax.jms.Topic ; 31 import javax.jms.TopicSubscriber ; 32 import javax.jms.QueueBrowser ; 33 import javax.jms.TemporaryQueue ; 34 import javax.jms.TemporaryTopic ; 35 36 37 38 44 45 public abstract class SomniSession 46 implements Session 47 { 48 protected final Object guard = new Object (); 49 50 private boolean closed = false; 51 private boolean started; 52 private Set <SomniMessageProducer> producers = new HashSet <SomniMessageProducer>(); 53 private Set <SomniMessageConsumer> consumers = new HashSet <SomniMessageConsumer>(); 54 private SomniExceptionListener exceptionListener; 55 private int acknowledgeMode; 56 private String connectionClientID; 57 58 private final String name; 59 60 private Context context; 61 62 private List <Message > messagesWaitingForAck = new LinkedList <Message >(); 63 64 private int consumerCounter = 0; 65 66 protected SomniSession(String name,SomniExceptionListener exceptionListener,boolean started,Context context,int acknowledgeMode,String connectionClientID) 67 { 68 this.exceptionListener = exceptionListener; 69 this.started = started; 70 this.name = name; 71 this.context = context; 72 this.acknowledgeMode = acknowledgeMode; 73 this.connectionClientID = connectionClientID; 74 } 75 76 83 public Map <String ,SomniConsumerReport> createConsumerReports() 84 { 85 Map <String ,SomniConsumerReport> report = new HashMap <String ,SomniConsumerReport>(consumers.size()); 86 synchronized(guard) 87 { 88 for(SomniMessageConsumer consumer : consumers) 89 { 90 report.put(consumer.getName(),consumer.createSomniConsumerReport()); 91 } 92 } 93 return report; 94 } 95 96 97 protected Context getContext() 98 { 99 return context; 100 } 101 102 protected String getConnectionClientID() 103 { 104 return connectionClientID; 105 } 106 107 private int producerCounter = 0; 108 109 protected String createProducerName(String destName,String senderOrPublisher) 110 { 111 synchronized(guard) 112 { 113 StringBuffer buffy = new StringBuffer (); 114 115 buffy.append(destName); 116 buffy.append(":"); 117 buffy.append(name); 118 buffy.append(senderOrPublisher); 119 buffy.append(":"+producerCounter); 120 121 producerCounter++; 122 return buffy.toString(); 123 } 124 } 125 126 protected String createConsumerName(String destName,String receiverOrSubscriber) 127 { 128 synchronized(guard) 129 { 130 StringBuffer buffy = new StringBuffer (); 131 132 buffy.append(destName); 133 buffy.append(":"); 134 buffy.append(name); 135 buffy.append(receiverOrSubscriber); 136 buffy.append(":"+consumerCounter); 137 138 consumerCounter++; 139 return buffy.toString(); 140 } 141 } 142 143 protected SomniExceptionListener getExceptionListener() 144 { 145 return exceptionListener; 146 } 147 148 protected void addProducer(SomniMessageProducer producer) 149 { 150 synchronized(guard) 151 { 152 producers.add(producer); 153 154 StringBuffer buffy = new StringBuffer (); 155 buffy.append("Added "); 156 buffy.append(producer.getName()); 157 buffy.append(" to "); 158 buffy.append(getName()); 159 160 SomniLogger.IT.finer(buffy.toString()); 161 } 162 } 163 164 protected void removeProducer(SomniMessageProducer producer) 165 throws JMSException 166 { 167 synchronized(guard) 168 { 169 producers.remove(producer); 170 171 StringBuffer buffy = new StringBuffer (); 172 buffy.append("Removed "); 173 buffy.append(producer.getName()); 174 buffy.append(" from "); 175 buffy.append(getName()); 176 177 SomniLogger.IT.finer(buffy.toString()); 178 179 producer.close(); 180 } 181 } 182 183 protected void addConsumer(SomniMessageConsumer consumer) 184 { 185 synchronized(guard) 186 { 187 consumers.add(consumer); 188 189 StringBuffer buffy = new StringBuffer (); 190 buffy.append("Added "); 191 buffy.append(consumer.getName()); 192 buffy.append(" to "); 193 buffy.append(getName()); 194 195 SomniLogger.IT.finer(buffy.toString()); 196 197 if(started) 198 { 199 consumer.start(); 200 } 201 } 202 } 203 204 protected void removeConsumer(SomniMessageConsumer consumer) 205 throws JMSException 206 { 207 synchronized(guard) 208 { 209 consumers.remove(consumer); 210 211 StringBuffer buffy = new StringBuffer (); 212 buffy.append("Removed "); 213 buffy.append(consumer.getName()); 214 buffy.append(" from "); 215 buffy.append(getName()); 216 217 SomniLogger.IT.finer(buffy.toString()); 218 219 consumer.close(); 220 } 221 } 222 223 protected void start() 224 { 225 synchronized(guard) 226 { 227 checkClosed(); 228 started = true; 229 230 Iterator it = consumers.iterator(); 231 while(it.hasNext()) 232 { 233 SomniMessageConsumer consumer = (SomniMessageConsumer)it.next(); 234 consumer.start(); 235 } 236 SomniLogger.IT.fine(getName()+" started."); 237 } 238 } 239 240 protected void stop() 241 { 242 synchronized(guard) 243 { 244 started = false; 245 246 Iterator it = consumers.iterator(); 247 while(it.hasNext()) 248 { 249 SomniMessageConsumer consumer = (SomniMessageConsumer)it.next(); 250 consumer.stop(); 251 } 252 SomniLogger.IT.fine(getName()+" stopped."); 253 } 254 } 255 256 protected void checkClosed() 257 { 258 synchronized(guard) 259 { 260 if(closed) 261 { 262 throw new IllegalStateException ("This Session is closed."); 263 } 264 } 265 } 266 267 276 public BytesMessage createBytesMessage() 277 throws JMSException 278 { 279 throw new UnsupportedOperationException ("I haven't done this yet."); 280 } 281 282 292 public MapMessage createMapMessage() 293 throws JMSException 294 { 295 throw new UnsupportedOperationException ("I haven't done this yet."); 296 } 297 298 308 public Message createMessage() 309 throws JMSException 310 { 311 synchronized(guard) 312 { 313 checkClosed(); 314 Message result = new SomniMessage(); 315 return result; 316 } 317 } 318 319 327 public ObjectMessage createObjectMessage() 328 throws JMSException 329 { 330 synchronized(guard) 331 { 332 checkClosed(); 333 ObjectMessage result = new SomniObjectMessage(); 334 335 return result; 336 } 337 } 338 339 349 public ObjectMessage createObjectMessage(Serializable object) 350 throws JMSException 351 { 352 synchronized(guard) 353 { 354 checkClosed(); 355 ObjectMessage result = createObjectMessage(); 356 result.setObject(object); 357 return result; 358 } 359 } 360 361 371 public StreamMessage createStreamMessage() 372 throws JMSException 373 { 374 throw new UnsupportedOperationException ("I haven't done this yet."); 375 376 } 377 378 387 public TextMessage createTextMessage() 388 throws JMSException 389 { 390 synchronized(guard) 391 { 392 checkClosed(); 393 TextMessage result = new SomniTextMessage(); 394 395 return result; 396 } 397 } 398 399 410 public TextMessage createTextMessage(String text) 411 throws JMSException 412 { 413 synchronized(guard) 414 { 415 checkClosed(); 416 TextMessage result = createTextMessage(); 417 result.setText(text); 418 return result; 419 } 420 } 421 422 430 public boolean getTransacted() 431 throws JMSException 432 { 433 return false; 434 } 435 436 450 public int getAcknowledgeMode() throws JMSException 451 { 452 return acknowledgeMode; 453 } 454 455 467 public void commit() 468 throws JMSException 469 { 470 throw new IllegalStateException ("somnifugi methods are never transactional."); 471 } 472 473 483 public void rollback() 484 throws JMSException 485 { 486 throw new IllegalStateException ("somnifugi methods are never transactional."); 487 } 488 489 518 public void close() 519 throws JMSException 520 { 521 try 522 { 523 synchronized(guard) 524 { 525 closed = true; 526 stop(); 527 Iterator it = producers.iterator(); 528 while(it.hasNext()) 529 { 530 SomniMessageProducer producer = (SomniMessageProducer)it.next(); 531 producer.close(); 532 } 533 Iterator itt = consumers.iterator(); 534 while(itt.hasNext()) 535 { 536 SomniMessageConsumer consumer = (SomniMessageConsumer)itt.next(); 537 consumer.close(); 538 } 539 SomniLogger.IT.fine(getName()+" closed."); 540 } 541 } 542 catch(RuntimeException re) 543 { 544 throw new SomniCannotCloseException(re); 545 } 546 } 547 548 574 public void recover() 575 throws JMSException 576 { 577 throw new UnsupportedOperationException ("I haven't done this yet."); 578 } 579 580 593 public MessageListener getMessageListener() 594 throws JMSException 595 { 596 throw new UnsupportedOperationException ("I haven't done this yet."); 597 } 598 599 618 public void setMessageListener(MessageListener listener) 619 throws JMSException 620 { 621 throw new UnsupportedOperationException ("I haven't done this yet."); 622 } 623 624 631 public void run() 632 { 633 throw new UnsupportedOperationException ("I haven't done this yet."); 634 } 635 636 protected String getName() 637 { 638 return name; 639 } 640 641 643 663 664 public abstract MessageProducer createProducer(Destination destination) throws JMSException ; 665 666 667 681 682 public abstract MessageConsumer createConsumer(Destination destination) throws JMSException ; 683 684 710 public abstract MessageConsumer createConsumer(Destination destination, java.lang.String messageSelector) 711 throws JMSException ; 712 713 714 751 public abstract MessageConsumer createConsumer(Destination destination, java.lang.String messageSelector,boolean NoLocal) 752 throws JMSException ; 753 754 755 776 777 public abstract Queue createQueue(String queueName) throws JMSException ; 778 779 800 801 public abstract Topic createTopic(String topicName) throws JMSException ; 802 803 813 815 851 852 public abstract TopicSubscriber createDurableSubscriber(Topic topic,String name) 853 throws JMSException ; 854 855 856 898 899 public abstract TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) 900 throws JMSException ; 901 902 915 public abstract QueueBrowser createBrowser(Queue queue) throws JMSException ; 916 917 918 936 937 public abstract QueueBrowser createBrowser(Queue queue,String messageSelector) 938 throws JMSException ; 939 940 941 951 952 public abstract TemporaryQueue createTemporaryQueue() throws JMSException ; 953 954 955 965 966 public abstract TemporaryTopic createTemporaryTopic() throws JMSException ; 967 968 969 989 990 public abstract void unsubscribe(String name) throws JMSException ; 991 992 993 void addMessageToAcknowledge(SomniMessage message) 995 { 996 synchronized(guard) 997 { 998 messagesWaitingForAck.add(message); 999 } 1000 } 1001 1002 void acknowledge() 1003 { 1004 synchronized(guard) 1005 { 1006 messagesWaitingForAck.clear(); 1007 } 1008 } 1009} 1010 1011 1031 | Popular Tags |