1 28 29 package com.caucho.jms.jca; 30 31 import com.caucho.log.Log; 32 33 import javax.jms.*; 34 import javax.resource.spi.endpoint.MessageEndpoint ; 35 import javax.resource.spi.work.Work ; 36 import java.lang.IllegalStateException ; 37 import java.util.logging.Level ; 38 import java.util.logging.Logger ; 39 40 43 public class MessageListenerTask implements Work { 44 private static final Logger log = Log.open(MessageListenerTask.class); 45 46 private MessageEndpoint _endpoint; 47 private MessageListener _listener; 48 49 private ResourceAdapterImpl _ra; 50 51 private QueueConnection _queueConnection; 52 private QueueSession _queueSession; 53 private QueueReceiver _queueConsumer; 54 55 private TopicConnection _topicConnection; 56 private TopicSession _topicSession; 57 private TopicSubscriber _topicConsumer; 58 59 private Connection _connection; 60 private Session _session; 61 private MessageConsumer _consumer; 62 63 private volatile boolean _isClosed; 64 65 MessageListenerTask(ResourceAdapterImpl ra, MessageEndpoint endpoint) 66 throws JMSException 67 { 68 _endpoint = endpoint; 69 _listener = (MessageListener) endpoint; 70 71 _ra = ra; 72 73 init(); 74 } 75 76 void init() 77 throws JMSException 78 { 79 ConnectionFactory factory = _ra.getConnectionFactory(); 80 81 Destination queue = _ra.getDestination(); 82 83 if (queue instanceof Queue && 84 factory instanceof QueueConnectionFactory) { 85 QueueConnectionFactory queueFactory; 86 queueFactory = (QueueConnectionFactory) factory; 87 88 _queueConnection = queueFactory.createQueueConnection(); 89 _queueSession = _queueConnection.createQueueSession(false, 1); 90 _queueConsumer = _queueSession.createReceiver((Queue) queue); 91 _queueConnection.start(); 92 } 93 else if (queue instanceof Topic && 94 factory instanceof TopicConnectionFactory) { 95 TopicConnectionFactory topicFactory; 96 topicFactory = (TopicConnectionFactory) factory; 97 98 _topicConnection = topicFactory.createTopicConnection(); 99 _topicSession = _topicConnection.createTopicSession(false, 1); 100 _topicConsumer = _topicSession.createSubscriber((Topic) queue); 101 _topicConnection.start(); 102 } 103 else { 104 _connection = factory.createConnection(); 105 _session = _connection.createSession(false, 1); 106 _consumer = _session.createConsumer(queue); 107 _connection.start(); 108 } 109 } 110 111 114 public void run() 115 { 116 while (! _isClosed) { 117 try { 118 Message msg; 119 120 if (_consumer != null) 121 msg = _consumer.receive(60000); 122 else if (_queueConsumer != null) 123 msg = _queueConsumer.receive(60000); 124 else if (_topicConsumer != null) 125 msg = _topicConsumer.receive(60000); 126 else { 127 _isClosed = true; 128 throw new IllegalStateException (); 129 } 130 131 if (msg != null) 132 _listener.onMessage(msg); 133 } catch (Throwable e) { 134 log.log(Level.WARNING, e.toString(), e); 135 } 136 } 137 } 138 139 142 public void release() 143 { 144 _isClosed = true; 145 146 Connection connection = _connection; 147 try { 148 if (connection != null) 149 connection.stop(); 150 } catch (Throwable e) { 151 log.log(Level.WARNING, e.toString(), e); 152 } 153 _connection = null; 154 155 Session session = _session; 156 _session = null; 157 158 MessageConsumer consumer = _consumer; 159 _consumer = null; 160 161 MessageEndpoint endpoint = _endpoint; 162 _endpoint = null; 163 164 try { 165 if (consumer != null) 166 consumer.close(); 167 if (session != null) 168 session.close(); 169 if (connection != null) 170 connection.close(); 171 } catch (Throwable e) { 172 } 173 174 _listener = null; 175 176 if (endpoint != null) 177 endpoint.release(); 178 } 179 } 180 181 | Popular Tags |