1 16 17 package org.apache.axis.transport.jms; 18 19 import org.apache.axis.components.jms.JMSVendorAdapter; 20 21 import javax.jms.Connection ; 22 import javax.jms.ConnectionFactory ; 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.Queue ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueConnectionFactory ; 30 import javax.jms.QueueReceiver ; 31 import javax.jms.QueueSender ; 32 import javax.jms.QueueSession ; 33 import javax.jms.Session ; 34 import javax.jms.TemporaryQueue ; 35 36 44 public class QueueConnector extends JMSConnector 45 { 46 47 public QueueConnector(ConnectionFactory factory, 48 int numRetries, 49 int numSessions, 50 long connectRetryInterval, 51 long interactRetryInterval, 52 long timeoutTime, 53 boolean allowReceive, 54 String clientID, 55 String username, 56 String password, 57 JMSVendorAdapter adapter, 58 JMSURLHelper jmsurl) 59 throws JMSException 60 { 61 super(factory, numRetries, numSessions, connectRetryInterval, 62 interactRetryInterval, timeoutTime, allowReceive, clientID, 63 username, password, adapter, jmsurl); 64 } 65 66 public JMSEndpoint createEndpoint(String destination) 67 { 68 return new QueueEndpoint(destination); 69 } 70 71 78 public JMSEndpoint createEndpoint(Destination destination) 79 throws JMSException 80 { 81 if(!(destination instanceof Queue )) 82 throw new IllegalArgumentException ("The input must be a queue for this connector"); 83 return new QueueDestinationEndpoint((Queue )destination); 84 } 85 86 protected Connection internalConnect(ConnectionFactory connectionFactory, 87 String username, 88 String password) 89 throws JMSException 90 { 91 QueueConnectionFactory qcf = (QueueConnectionFactory )connectionFactory; 92 if(username == null) 93 return qcf.createQueueConnection(); 94 95 return qcf.createQueueConnection(username, password); 96 } 97 98 99 protected SyncConnection createSyncConnection(ConnectionFactory factory, 100 Connection connection, 101 int numSessions, 102 String threadName, 103 String clientID, 104 String username, 105 String password) 106 107 throws JMSException 108 { 109 return new QueueSyncConnection((QueueConnectionFactory )factory, 110 (QueueConnection )connection, numSessions, 111 threadName, clientID, username, password); 112 } 113 114 private QueueSession createQueueSession(QueueConnection connection, int ackMode) 115 throws JMSException 116 { 117 return connection.createQueueSession(false, ackMode); 118 } 119 120 private Queue createQueue(QueueSession session, String subject) 121 throws Exception 122 { 123 return m_adapter.getQueue(session, subject); 124 } 125 126 private QueueReceiver createReceiver(QueueSession session, 127 Queue queue, 128 String messageSelector) 129 throws JMSException 130 { 131 return session.createReceiver(queue, messageSelector); 132 } 133 134 private final class QueueSyncConnection extends SyncConnection 135 { 136 QueueSyncConnection(QueueConnectionFactory connectionFactory, 137 QueueConnection connection, 138 int numSessions, 139 String threadName, 140 String clientID, 141 String username, 142 String password) 143 throws JMSException 144 { 145 super(connectionFactory, connection, numSessions, threadName, 146 clientID, username, password); 147 } 148 149 protected SendSession createSendSession(javax.jms.Connection connection) 150 throws JMSException 151 { 152 QueueSession session = createQueueSession((QueueConnection )connection, 153 JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); 154 QueueSender sender = session.createSender(null); 155 return new QueueSendSession(session, sender); 156 } 157 158 private final class QueueSendSession extends SendSession 159 { 160 QueueSendSession(QueueSession session, 161 QueueSender sender) 162 throws JMSException 163 { 164 super(session, sender); 165 } 166 167 protected MessageConsumer createConsumer(Destination destination) 168 throws JMSException 169 { 170 return createReceiver((QueueSession )m_session, (Queue )destination, null); 171 } 172 173 174 protected Destination createTemporaryDestination() 175 throws JMSException 176 { 177 return ((QueueSession )m_session).createTemporaryQueue(); 178 } 179 180 protected void deleteTemporaryDestination(Destination destination) 181 throws JMSException 182 { 183 ((TemporaryQueue )destination).delete(); 184 } 185 186 protected void send(Destination destination, Message message, 187 int deliveryMode, int priority, long timeToLive) 188 throws JMSException 189 { 190 ((QueueSender )m_producer).send((Queue )destination, message, 191 deliveryMode, priority, timeToLive); 192 } 193 194 } 195 } 196 197 private class QueueEndpoint 198 extends JMSEndpoint 199 { 200 String m_queueName; 201 202 QueueEndpoint(String queueName) 203 { 204 super(QueueConnector.this); 205 m_queueName = queueName; 206 } 207 208 Destination getDestination(Session session) 209 throws Exception 210 { 211 return createQueue((QueueSession )session, m_queueName); 212 } 213 214 public String toString() 215 { 216 StringBuffer buffer = new StringBuffer ("QueueEndpoint:"); 217 buffer.append(m_queueName); 218 return buffer.toString(); 219 } 220 221 public boolean equals(Object object) 222 { 223 if(!super.equals(object)) 224 return false; 225 226 if(!(object instanceof QueueEndpoint)) 227 return false; 228 229 return m_queueName.equals(((QueueEndpoint)object).m_queueName); 230 } 231 } 232 233 234 private final class QueueDestinationEndpoint 235 extends QueueEndpoint 236 { 237 Queue m_queue; 238 239 QueueDestinationEndpoint(Queue queue) 240 throws JMSException 241 { 242 super(queue.getQueueName()); 243 m_queue = queue; 244 } 245 246 Destination getDestination(Session session) 247 { 248 return m_queue; 249 } 250 251 } 252 253 protected AsyncConnection createAsyncConnection(ConnectionFactory factory, 254 Connection connection, 255 String threadName, 256 String clientID, 257 String username, 258 String password) 259 throws JMSException 260 { 261 return new QueueAsyncConnection((QueueConnectionFactory )factory, 262 (QueueConnection )connection, threadName, 263 clientID, username, password); 264 } 265 266 private final class QueueAsyncConnection extends AsyncConnection 267 { 268 269 QueueAsyncConnection(QueueConnectionFactory connectionFactory, 270 QueueConnection connection, 271 String threadName, 272 String clientID, 273 String username, 274 String password) 275 throws JMSException 276 { 277 super(connectionFactory, connection, threadName, clientID, username, password); 278 } 279 280 protected ListenerSession createListenerSession(javax.jms.Connection connection, 281 Subscription subscription) 282 throws Exception 283 { 284 QueueSession session = createQueueSession((QueueConnection )connection, 285 subscription.m_ackMode); 286 QueueReceiver receiver = createReceiver(session, 287 (Queue )subscription.m_endpoint.getDestination(session), 288 subscription.m_messageSelector); 289 return new ListenerSession(session, receiver, subscription); 290 } 291 292 } 293 294 } | Popular Tags |