1 55 56 package org.jboss.axis.transport.jms; 57 58 import org.jboss.axis.components.jms.JMSVendorAdapter; 59 60 import javax.jms.Connection ; 61 import javax.jms.ConnectionFactory ; 62 import javax.jms.Destination ; 63 import javax.jms.JMSException ; 64 import javax.jms.Message ; 65 import javax.jms.MessageConsumer ; 66 import javax.jms.Queue ; 67 import javax.jms.QueueConnection ; 68 import javax.jms.QueueConnectionFactory ; 69 import javax.jms.QueueReceiver ; 70 import javax.jms.QueueSender ; 71 import javax.jms.QueueSession ; 72 import javax.jms.Session ; 73 import javax.jms.TemporaryQueue ; 74 75 83 public class QueueConnector extends JMSConnector 84 { 85 86 public QueueConnector(ConnectionFactory factory, 87 int numRetries, 88 int numSessions, 89 long connectRetryInterval, 90 long interactRetryInterval, 91 long timeoutTime, 92 boolean allowReceive, 93 String clientID, 94 String username, 95 String password, 96 JMSVendorAdapter adapter) 97 throws JMSException 98 { 99 super(factory, numRetries, numSessions, connectRetryInterval, 100 interactRetryInterval, timeoutTime, allowReceive, clientID, 101 username, password, adapter); 102 } 103 104 public JMSEndpoint createEndpoint(String destination) 105 { 106 return new QueueEndpoint(destination); 107 } 108 109 116 public JMSEndpoint createEndpoint(Destination destination) 117 throws JMSException 118 { 119 if (!(destination instanceof Queue )) 120 throw new IllegalArgumentException ("The input must be a queue for this connector"); 121 return new QueueDestinationEndpoint((Queue )destination); 122 } 123 124 protected Connection internalConnect(ConnectionFactory connectionFactory, 125 String username, 126 String password) 127 throws JMSException 128 { 129 QueueConnectionFactory qcf = (QueueConnectionFactory )connectionFactory; 130 if (username == null) 131 return qcf.createQueueConnection(); 132 133 return qcf.createQueueConnection(username, password); 134 } 135 136 137 protected SyncConnection createSyncConnection(ConnectionFactory factory, 138 Connection connection, 139 int numSessions, 140 String threadName, 141 String clientID, 142 String username, 143 String password) 144 145 throws JMSException 146 { 147 return new QueueSyncConnection((QueueConnectionFactory )factory, 148 (QueueConnection )connection, numSessions, 149 threadName, clientID, username, password); 150 } 151 152 private QueueSession createQueueSession(QueueConnection connection, int ackMode) 153 throws JMSException 154 { 155 return connection.createQueueSession(false, ackMode); 156 } 157 158 private Queue createQueue(QueueSession session, String subject) 159 throws Exception 160 { 161 return m_adapter.getQueue(session, subject); 162 } 163 164 private QueueReceiver createReceiver(QueueSession session, 165 Queue queue, 166 String messageSelector) 167 throws JMSException 168 { 169 return session.createReceiver(queue, messageSelector); 170 } 171 172 private final class QueueSyncConnection extends SyncConnection 173 { 174 QueueSyncConnection(QueueConnectionFactory connectionFactory, 175 QueueConnection connection, 176 int numSessions, 177 String threadName, 178 String clientID, 179 String username, 180 String password) 181 throws JMSException 182 { 183 super(connectionFactory, connection, numSessions, threadName, 184 clientID, username, password); 185 } 186 187 protected SendSession createSendSession(javax.jms.Connection connection) 188 throws JMSException 189 { 190 QueueSession session = createQueueSession((QueueConnection )connection, 191 JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); 192 QueueSender sender = session.createSender(null); 193 return new QueueSendSession(session, sender); 194 } 195 196 private final class QueueSendSession extends SendSession 197 { 198 QueueSendSession(QueueSession session, 199 QueueSender sender) 200 throws JMSException 201 { 202 super(session, sender); 203 } 204 205 protected MessageConsumer createConsumer(Destination destination) 206 throws JMSException 207 { 208 return createReceiver((QueueSession )m_session, (Queue )destination, null); 209 } 210 211 212 protected Destination createTemporaryDestination() 213 throws JMSException 214 { 215 return ((QueueSession )m_session).createTemporaryQueue(); 216 } 217 218 protected void deleteTemporaryDestination(Destination destination) 219 throws JMSException 220 { 221 ((TemporaryQueue )destination).delete(); 222 } 223 224 protected void send(Destination destination, Message message, 225 int deliveryMode, int priority, long timeToLive) 226 throws JMSException 227 { 228 ((QueueSender )m_producer).send((Queue )destination, message, 229 deliveryMode, priority, timeToLive); 230 } 231 232 } 233 } 234 235 private class QueueEndpoint 236 extends JMSEndpoint 237 { 238 String m_queueName; 239 240 QueueEndpoint(String queueName) 241 { 242 super(QueueConnector.this); 243 m_queueName = queueName; 244 } 245 246 Destination getDestination(Session session) 247 throws Exception 248 { 249 return createQueue((QueueSession )session, m_queueName); 250 } 251 252 public String toString() 253 { 254 StringBuffer buffer = new StringBuffer ("QueueEndpoint:"); 255 buffer.append(m_queueName); 256 return buffer.toString(); 257 } 258 259 public boolean equals(Object object) 260 { 261 if (!super.equals(object)) 262 return false; 263 264 if (!(object instanceof QueueEndpoint)) 265 return false; 266 267 return m_queueName.equals(((QueueEndpoint)object).m_queueName); 268 } 269 } 270 271 272 private final class QueueDestinationEndpoint 273 extends QueueEndpoint 274 { 275 Queue m_queue; 276 277 QueueDestinationEndpoint(Queue queue) 278 throws JMSException 279 { 280 super(queue.getQueueName()); 281 m_queue = queue; 282 } 283 284 Destination getDestination(Session session) 285 { 286 return m_queue; 287 } 288 289 } 290 291 protected AsyncConnection createAsyncConnection(ConnectionFactory factory, 292 Connection connection, 293 String threadName, 294 String clientID, 295 String username, 296 String password) 297 throws JMSException 298 { 299 return new QueueAsyncConnection((QueueConnectionFactory )factory, 300 (QueueConnection )connection, threadName, 301 clientID, username, password); 302 } 303 304 private final class QueueAsyncConnection extends AsyncConnection 305 { 306 307 QueueAsyncConnection(QueueConnectionFactory connectionFactory, 308 QueueConnection connection, 309 String threadName, 310 String clientID, 311 String username, 312 String password) 313 throws JMSException 314 { 315 super(connectionFactory, connection, threadName, clientID, username, password); 316 } 317 318 protected ListenerSession createListenerSession(javax.jms.Connection connection, 319 Subscription subscription) 320 throws Exception 321 { 322 QueueSession session = createQueueSession((QueueConnection )connection, 323 subscription.m_ackMode); 324 QueueReceiver receiver = createReceiver(session, 325 (Queue )subscription.m_endpoint.getDestination(session), 326 subscription.m_messageSelector); 327 return new ListenerSession(session, receiver, subscription); 328 } 329 330 } 331 332 } | Popular Tags |