1 18 package org.apache.activemq.transport.stomp; 19 20 import java.io.IOException ; 21 import java.io.OutputStreamWriter ; 22 import java.io.PrintWriter ; 23 import java.util.HashMap ; 24 import java.util.Iterator ; 25 import java.util.Map ; 26 27 import javax.jms.Destination ; 28 import javax.jms.JMSException ; 29 30 import org.apache.activemq.command.ActiveMQBytesMessage; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.ActiveMQMessage; 33 import org.apache.activemq.command.ActiveMQTextMessage; 34 import org.apache.activemq.command.Command; 35 import org.apache.activemq.command.ConnectionId; 36 import org.apache.activemq.command.ConnectionInfo; 37 import org.apache.activemq.command.ConsumerId; 38 import org.apache.activemq.command.ConsumerInfo; 39 import org.apache.activemq.command.LocalTransactionId; 40 import org.apache.activemq.command.MessageAck; 41 import org.apache.activemq.command.MessageDispatch; 42 import org.apache.activemq.command.MessageId; 43 import org.apache.activemq.command.ProducerId; 44 import org.apache.activemq.command.ProducerInfo; 45 import org.apache.activemq.command.Response; 46 import org.apache.activemq.command.SessionId; 47 import org.apache.activemq.command.SessionInfo; 48 import org.apache.activemq.command.ShutdownInfo; 49 import org.apache.activemq.command.TransactionId; 50 import org.apache.activemq.command.TransactionInfo; 51 import org.apache.activemq.util.ByteArrayOutputStream; 52 import org.apache.activemq.util.IdGenerator; 53 import org.apache.activemq.util.IntrospectionSupport; 54 import org.apache.activemq.util.LongSequenceGenerator; 55 56 import java.util.concurrent.ConcurrentHashMap ; 57 import java.util.concurrent.atomic.AtomicBoolean ; 58 59 63 public class ProtocolConverter { 64 65 private static final IdGenerator connectionIdGenerator = new IdGenerator(); 66 private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); 67 private final SessionId sessionId = new SessionId(connectionId, -1); 68 private final ProducerId producerId = new ProducerId(sessionId, 1); 69 70 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 71 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 72 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 73 74 private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap (); 75 private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap (); 76 private final Map transactions = new ConcurrentHashMap (); 77 private final StompTransportFilter transportFilter; 78 79 private final Object commnadIdMutex = new Object (); 80 private int lastCommandId; 81 private final AtomicBoolean connected = new AtomicBoolean (false); 82 private final FrameTranslator frameTranslator; 83 84 public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator) 85 { 86 this.transportFilter = stompTransportFilter; 87 this.frameTranslator = translator; 88 } 89 90 protected int generateCommandId() { 91 synchronized(commnadIdMutex){ 92 return lastCommandId++; 93 } 94 } 95 96 protected ResponseHandler createResponseHandler(StompFrame command){ 97 final String receiptId = (String ) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 98 if( receiptId != null ) { 100 return new ResponseHandler() { 101 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 102 StompFrame sc = new StompFrame(); 103 sc.setAction(Stomp.Responses.RECEIPT); 104 sc.setHeaders(new HashMap (1)); 105 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 106 transportFilter.sendToStomp(sc); 107 } 108 }; 109 } 110 return null; 111 } 112 113 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 114 command.setCommandId(generateCommandId()); 115 if(handler!=null) { 116 command.setResponseRequired(true); 117 resposeHandlers.put(new Integer (command.getCommandId()), handler); 118 } 119 transportFilter.sendToActiveMQ(command); 120 } 121 122 protected void sendToStomp(StompFrame command) throws IOException { 123 transportFilter.sendToStomp(command); 124 } 125 126 130 public void onStompCommad( StompFrame command ) throws IOException , JMSException { 131 try { 132 133 if( command.getClass() == StompFrameError.class ) { 134 throw ((StompFrameError)command).getException(); 135 } 136 137 String action = command.getAction(); 138 if (action.startsWith(Stomp.Commands.SEND)) 139 onStompSend(command); 140 else if (action.startsWith(Stomp.Commands.ACK)) 141 onStompAck(command); 142 else if (action.startsWith(Stomp.Commands.BEGIN)) 143 onStompBegin(command); 144 else if (action.startsWith(Stomp.Commands.COMMIT)) 145 onStompCommit(command); 146 else if (action.startsWith(Stomp.Commands.ABORT)) 147 onStompAbort(command); 148 else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) 149 onStompSubscribe(command); 150 else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) 151 onStompUnsubscribe(command); 152 else if (action.startsWith(Stomp.Commands.CONNECT)) 153 onStompConnect(command); 154 else if (action.startsWith(Stomp.Commands.DISCONNECT)) 155 onStompDisconnect(command); 156 else 157 throw new ProtocolException("Unknown STOMP action: "+action); 158 159 } catch (ProtocolException e) { 160 161 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 163 PrintWriter stream = new PrintWriter (new OutputStreamWriter (baos,"UTF-8")); 164 e.printStackTrace(stream); 165 stream.close(); 166 167 HashMap headers = new HashMap (); 168 headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage()); 169 170 final String receiptId = (String ) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 171 if( receiptId != null ) { 172 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 173 } 174 175 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray()); 176 sendToStomp(errorMessage); 177 178 if( e.isFatal() ) 179 getTransportFilter().onException(e); 180 } 181 } 182 183 protected void onStompSend(StompFrame command) throws IOException , JMSException { 184 checkConnected(); 185 186 Map headers = command.getHeaders(); 187 String stompTx = (String ) headers.get(Stomp.Headers.TRANSACTION); 188 189 ActiveMQMessage message = convertMessage(command); 190 191 message.setProducerId(producerId); 192 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 193 message.setMessageId(id); 194 message.setJMSTimestamp(System.currentTimeMillis()); 195 196 if (stompTx!=null) { 197 TransactionId activemqTx = (TransactionId) transactions.get(stompTx); 198 if (activemqTx == null) 199 throw new ProtocolException("Invalid transaction id: "+stompTx); 200 message.setTransactionId(activemqTx); 201 } 202 203 message.onSend(); 204 sendToActiveMQ(message, createResponseHandler(command)); 205 206 } 207 208 209 protected void onStompAck(StompFrame command) throws ProtocolException { 210 checkConnected(); 211 212 216 Map headers = command.getHeaders(); 217 String messageId = (String ) headers.get(Stomp.Headers.Ack.MESSAGE_ID); 218 if (messageId == null) 219 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 220 221 TransactionId activemqTx=null; 222 String stompTx = (String ) headers.get(Stomp.Headers.TRANSACTION); 223 if (stompTx!=null) { 224 activemqTx = (TransactionId) transactions.get(stompTx); 225 if (activemqTx == null) 226 throw new ProtocolException("Invalid transaction id: "+stompTx); 227 } 228 229 boolean acked=false; 230 for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 231 StompSubscription sub = (StompSubscription) iter.next(); 232 MessageAck ack = sub.onStompMessageAck(messageId); 233 if( ack!=null ) { 234 ack.setTransactionId(activemqTx); 235 sendToActiveMQ(ack,createResponseHandler(command)); 236 acked=true; 237 break; 238 } 239 } 240 241 if( !acked ) 242 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 243 244 } 245 246 247 protected void onStompBegin(StompFrame command) throws ProtocolException { 248 checkConnected(); 249 250 Map headers = command.getHeaders(); 251 252 String stompTx = (String )headers.get(Stomp.Headers.TRANSACTION); 253 254 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 255 throw new ProtocolException("Must specify the transaction you are beginning"); 256 } 257 258 if( transactions.get(stompTx)!=null ) { 259 throw new ProtocolException("The transaction was allready started: "+stompTx); 260 } 261 262 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 263 transactions.put(stompTx, activemqTx); 264 265 TransactionInfo tx = new TransactionInfo(); 266 tx.setConnectionId(connectionId); 267 tx.setTransactionId(activemqTx); 268 tx.setType(TransactionInfo.BEGIN); 269 270 sendToActiveMQ(tx, createResponseHandler(command)); 271 272 } 273 274 protected void onStompCommit(StompFrame command) throws ProtocolException { 275 checkConnected(); 276 277 Map headers = command.getHeaders(); 278 279 String stompTx = (String ) headers.get(Stomp.Headers.TRANSACTION); 280 if (stompTx==null) { 281 throw new ProtocolException("Must specify the transaction you are committing"); 282 } 283 284 TransactionId activemqTx = (TransactionId) transactions.remove(stompTx); 285 if (activemqTx == null) { 286 throw new ProtocolException("Invalid transaction id: "+stompTx); 287 } 288 289 TransactionInfo tx = new TransactionInfo(); 290 tx.setConnectionId(connectionId); 291 tx.setTransactionId(activemqTx); 292 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 293 294 sendToActiveMQ(tx, createResponseHandler(command)); 295 } 296 297 protected void onStompAbort(StompFrame command) throws ProtocolException { 298 checkConnected(); 299 Map headers = command.getHeaders(); 300 301 String stompTx = (String ) headers.get(Stomp.Headers.TRANSACTION); 302 if (stompTx==null) { 303 throw new ProtocolException("Must specify the transaction you are committing"); 304 } 305 306 TransactionId activemqTx = (TransactionId) transactions.remove(stompTx); 307 if (activemqTx == null) { 308 throw new ProtocolException("Invalid transaction id: "+stompTx); 309 } 310 311 TransactionInfo tx = new TransactionInfo(); 312 tx.setConnectionId(connectionId); 313 tx.setTransactionId(activemqTx); 314 tx.setType(TransactionInfo.ROLLBACK); 315 316 sendToActiveMQ(tx, createResponseHandler(command)); 317 318 } 319 320 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 321 checkConnected(); 322 Map headers = command.getHeaders(); 323 324 String subscriptionId = (String )headers.get(Stomp.Headers.Subscribe.ID); 325 String destination = (String )headers.get(Stomp.Headers.Subscribe.DESTINATION); 326 327 ActiveMQDestination actual_dest = frameTranslator.convertDestination(destination); 328 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 329 ConsumerInfo consumerInfo = new ConsumerInfo(id); 330 consumerInfo.setPrefetchSize(1000); 331 consumerInfo.setDispatchAsync(true); 332 333 String selector = (String ) headers.remove(Stomp.Headers.Subscribe.SELECTOR); 334 consumerInfo.setSelector(selector); 335 336 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 337 338 consumerInfo.setDestination(frameTranslator.convertDestination(destination)); 339 340 StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); 341 stompSubscription.setDestination(actual_dest); 342 343 String ackMode = (String )headers.get(Stomp.Headers.Subscribe.ACK_MODE); 344 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 345 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 346 } else { 347 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 348 } 349 350 subscriptionsByConsumerId.put(id, stompSubscription); 351 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 352 353 } 354 355 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 356 checkConnected(); 357 Map headers = command.getHeaders(); 358 359 ActiveMQDestination destination=null; 360 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 361 if( o!=null ) 362 destination = frameTranslator.convertDestination((String ) o); 363 364 String subscriptionId = (String )headers.get(Stomp.Headers.Unsubscribe.ID); 365 366 if (subscriptionId==null && destination==null) { 367 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 368 } 369 370 for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 374 StompSubscription sub = (StompSubscription) iter.next(); 375 if ( 376 (subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) || 377 (destination!=null && destination.equals(sub.getDestination()) ) 378 ) { 379 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 380 iter.remove(); 381 return; 382 } 383 } 384 385 throw new ProtocolException("No subscription matched."); 386 } 387 388 protected void onStompConnect(StompFrame command) throws ProtocolException { 389 390 if(connected.get()) { 391 throw new ProtocolException("Allready connected."); 392 } 393 394 final Map headers = command.getHeaders(); 395 396 String login = (String )headers.get(Stomp.Headers.Connect.LOGIN); 398 String passcode = (String )headers.get(Stomp.Headers.Connect.PASSCODE); 399 String clientId = (String )headers.get(Stomp.Headers.Connect.CLIENT_ID); 400 401 final ConnectionInfo connectionInfo = new ConnectionInfo(); 402 403 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 404 405 connectionInfo.setConnectionId(connectionId); 406 if( clientId!=null ) 407 connectionInfo.setClientId(clientId); 408 else 409 connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString()); 410 411 connectionInfo.setResponseRequired(true); 412 connectionInfo.setUserName(login); 413 connectionInfo.setPassword(passcode); 414 415 sendToActiveMQ(connectionInfo, new ResponseHandler(){ 416 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 417 418 final SessionInfo sessionInfo = new SessionInfo(sessionId); 419 sendToActiveMQ(sessionInfo,null); 420 421 422 final ProducerInfo producerInfo = new ProducerInfo(producerId); 423 sendToActiveMQ(producerInfo,new ResponseHandler(){ 424 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 425 426 connected.set(true); 427 HashMap responseHeaders = new HashMap (); 428 429 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 430 String requestId = (String ) headers.get(Stomp.Headers.Connect.REQUEST_ID); 431 if (requestId == null) { 432 requestId = (String ) headers.get(Stomp.Headers.RECEIPT_REQUESTED); 434 } 435 if( requestId !=null ){ 436 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 438 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 439 } 440 441 StompFrame sc = new StompFrame(); 442 sc.setAction(Stomp.Responses.CONNECTED); 443 sc.setHeaders(responseHeaders); 444 sendToStomp(sc); 445 } 446 }); 447 448 } 449 }); 450 } 451 452 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 453 checkConnected(); 454 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 455 connected.set(false); 456 } 457 458 459 protected void checkConnected() throws ProtocolException { 460 if(!connected.get()) { 461 throw new ProtocolException("Not connected."); 462 } 463 } 464 465 470 public void onActiveMQCommad( Command command ) throws IOException , JMSException { 471 472 if ( command.isResponse() ) { 473 474 Response response = (Response) command; 475 ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer (response.getCorrelationId())); 476 if( rh !=null ) { 477 rh.onResponse(this, response); 478 } 479 480 } else if( command.isMessageDispatch() ) { 481 482 MessageDispatch md = (MessageDispatch)command; 483 StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId()); 484 if (sub != null) { 485 sub.onMessageDispatch(md); 486 } 487 } 488 } 489 490 public ActiveMQMessage convertMessage(StompFrame command) throws IOException , JMSException { 491 ActiveMQMessage msg = frameTranslator.convertFrame(command); 492 return msg; 493 } 494 495 public StompFrame convertMessage(ActiveMQMessage message) throws IOException , JMSException { 496 return frameTranslator.convertMessage(message); 497 } 498 499 public StompTransportFilter getTransportFilter() { 500 return transportFilter; 501 } 502 } 503 | Popular Tags |