KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > stomp > ProtocolConverter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.stomp;
19
20 import java.io.IOException JavaDoc;
21 import java.io.OutputStreamWriter JavaDoc;
22 import java.io.PrintWriter JavaDoc;
23 import java.util.HashMap JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import javax.jms.Destination JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29
30 import org.apache.activemq.command.ActiveMQBytesMessage;
31 import org.apache.activemq.command.ActiveMQDestination;
32 import org.apache.activemq.command.ActiveMQMessage;
33 import org.apache.activemq.command.ActiveMQTextMessage;
34 import org.apache.activemq.command.Command;
35 import org.apache.activemq.command.ConnectionId;
36 import org.apache.activemq.command.ConnectionInfo;
37 import org.apache.activemq.command.ConsumerId;
38 import org.apache.activemq.command.ConsumerInfo;
39 import org.apache.activemq.command.LocalTransactionId;
40 import org.apache.activemq.command.MessageAck;
41 import org.apache.activemq.command.MessageDispatch;
42 import org.apache.activemq.command.MessageId;
43 import org.apache.activemq.command.ProducerId;
44 import org.apache.activemq.command.ProducerInfo;
45 import org.apache.activemq.command.Response;
46 import org.apache.activemq.command.SessionId;
47 import org.apache.activemq.command.SessionInfo;
48 import org.apache.activemq.command.ShutdownInfo;
49 import org.apache.activemq.command.TransactionId;
50 import org.apache.activemq.command.TransactionInfo;
51 import org.apache.activemq.util.ByteArrayOutputStream;
52 import org.apache.activemq.util.IdGenerator;
53 import org.apache.activemq.util.IntrospectionSupport;
54 import org.apache.activemq.util.LongSequenceGenerator;
55
56 import java.util.concurrent.ConcurrentHashMap JavaDoc;
57 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
58
59 /**
60  *
61  * @author <a HREF="http://hiramchirino.com">chirino</a>
62  */

63 public class ProtocolConverter {
64
65     private static final IdGenerator connectionIdGenerator = new IdGenerator();
66     private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
67     private final SessionId sessionId = new SessionId(connectionId, -1);
68     private final ProducerId producerId = new ProducerId(sessionId, 1);
69
70     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
71     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
72     private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
73
74     private final ConcurrentHashMap JavaDoc resposeHandlers = new ConcurrentHashMap JavaDoc();
75     private final ConcurrentHashMap JavaDoc subscriptionsByConsumerId = new ConcurrentHashMap JavaDoc();
76     private final Map JavaDoc transactions = new ConcurrentHashMap JavaDoc();
77     private final StompTransportFilter transportFilter;
78
79     private final Object JavaDoc commnadIdMutex = new Object JavaDoc();
80     private int lastCommandId;
81     private final AtomicBoolean JavaDoc connected = new AtomicBoolean JavaDoc(false);
82     private final FrameTranslator frameTranslator;
83
84     public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator)
85     {
86         this.transportFilter = stompTransportFilter;
87         this.frameTranslator = translator;
88     }
89
90     protected int generateCommandId() {
91         synchronized(commnadIdMutex){
92             return lastCommandId++;
93         }
94     }
95
96     protected ResponseHandler createResponseHandler(StompFrame command){
97         final String JavaDoc receiptId = (String JavaDoc) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
98         // A response may not be needed.
99
if( receiptId != null ) {
100             return new ResponseHandler() {
101                 public void onResponse(ProtocolConverter converter, Response response) throws IOException JavaDoc {
102                     StompFrame sc = new StompFrame();
103                     sc.setAction(Stomp.Responses.RECEIPT);
104                     sc.setHeaders(new HashMap JavaDoc(1));
105                     sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
106                     transportFilter.sendToStomp(sc);
107                 }
108             };
109         }
110         return null;
111     }
112
113     protected void sendToActiveMQ(Command command, ResponseHandler handler) {
114         command.setCommandId(generateCommandId());
115         if(handler!=null) {
116             command.setResponseRequired(true);
117             resposeHandlers.put(new Integer JavaDoc(command.getCommandId()), handler);
118         }
119         transportFilter.sendToActiveMQ(command);
120     }
121
122     protected void sendToStomp(StompFrame command) throws IOException JavaDoc {
123         transportFilter.sendToStomp(command);
124     }
125
126     /**
127      * Convert a stomp command
128      * @param command
129      */

130     public void onStompCommad( StompFrame command ) throws IOException JavaDoc, JMSException JavaDoc {
131         try {
132
133             if( command.getClass() == StompFrameError.class ) {
134                 throw ((StompFrameError)command).getException();
135             }
136
137             String JavaDoc action = command.getAction();
138             if (action.startsWith(Stomp.Commands.SEND))
139                 onStompSend(command);
140             else if (action.startsWith(Stomp.Commands.ACK))
141                 onStompAck(command);
142             else if (action.startsWith(Stomp.Commands.BEGIN))
143                 onStompBegin(command);
144             else if (action.startsWith(Stomp.Commands.COMMIT))
145                 onStompCommit(command);
146             else if (action.startsWith(Stomp.Commands.ABORT))
147                 onStompAbort(command);
148             else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
149                 onStompSubscribe(command);
150             else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
151                 onStompUnsubscribe(command);
152             else if (action.startsWith(Stomp.Commands.CONNECT))
153                 onStompConnect(command);
154             else if (action.startsWith(Stomp.Commands.DISCONNECT))
155                 onStompDisconnect(command);
156             else
157                 throw new ProtocolException("Unknown STOMP action: "+action);
158
159         } catch (ProtocolException e) {
160
161             // Let the stomp client know about any protocol errors.
162
ByteArrayOutputStream baos = new ByteArrayOutputStream();
163             PrintWriter JavaDoc stream = new PrintWriter JavaDoc(new OutputStreamWriter JavaDoc(baos,"UTF-8"));
164             e.printStackTrace(stream);
165             stream.close();
166
167             HashMap JavaDoc headers = new HashMap JavaDoc();
168             headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
169
170             final String JavaDoc receiptId = (String JavaDoc) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
171             if( receiptId != null ) {
172                 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
173             }
174
175             StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
176             sendToStomp(errorMessage);
177
178             if( e.isFatal() )
179                 getTransportFilter().onException(e);
180         }
181     }
182
183     protected void onStompSend(StompFrame command) throws IOException JavaDoc, JMSException JavaDoc {
184         checkConnected();
185
186         Map JavaDoc headers = command.getHeaders();
187         String JavaDoc stompTx = (String JavaDoc) headers.get(Stomp.Headers.TRANSACTION);
188
189         ActiveMQMessage message = convertMessage(command);
190
191         message.setProducerId(producerId);
192         MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
193         message.setMessageId(id);
194         message.setJMSTimestamp(System.currentTimeMillis());
195
196         if (stompTx!=null) {
197             TransactionId activemqTx = (TransactionId) transactions.get(stompTx);
198             if (activemqTx == null)
199                 throw new ProtocolException("Invalid transaction id: "+stompTx);
200             message.setTransactionId(activemqTx);
201         }
202
203         message.onSend();
204         sendToActiveMQ(message, createResponseHandler(command));
205
206     }
207
208
209     protected void onStompAck(StompFrame command) throws ProtocolException {
210         checkConnected();
211
212         // TODO: acking with just a message id is very bogus
213
// since the same message id could have been sent to 2 different subscriptions
214
// on the same stomp connection. For example, when 2 subs are created on the same topic.
215

216         Map JavaDoc headers = command.getHeaders();
217         String JavaDoc messageId = (String JavaDoc) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
218         if (messageId == null)
219             throw new ProtocolException("ACK received without a message-id to acknowledge!");
220
221         TransactionId activemqTx=null;
222         String JavaDoc stompTx = (String JavaDoc) headers.get(Stomp.Headers.TRANSACTION);
223         if (stompTx!=null) {
224             activemqTx = (TransactionId) transactions.get(stompTx);
225             if (activemqTx == null)
226                 throw new ProtocolException("Invalid transaction id: "+stompTx);
227         }
228
229         boolean acked=false;
230         for (Iterator JavaDoc iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
231             StompSubscription sub = (StompSubscription) iter.next();
232             MessageAck ack = sub.onStompMessageAck(messageId);
233             if( ack!=null ) {
234                 ack.setTransactionId(activemqTx);
235                 sendToActiveMQ(ack,createResponseHandler(command));
236                 acked=true;
237                 break;
238             }
239         }
240
241         if( !acked )
242             throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
243
244     }
245
246
247     protected void onStompBegin(StompFrame command) throws ProtocolException {
248         checkConnected();
249
250         Map JavaDoc headers = command.getHeaders();
251
252         String JavaDoc stompTx = (String JavaDoc)headers.get(Stomp.Headers.TRANSACTION);
253
254         if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
255             throw new ProtocolException("Must specify the transaction you are beginning");
256         }
257
258         if( transactions.get(stompTx)!=null ) {
259             throw new ProtocolException("The transaction was allready started: "+stompTx);
260         }
261
262         LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
263         transactions.put(stompTx, activemqTx);
264
265         TransactionInfo tx = new TransactionInfo();
266         tx.setConnectionId(connectionId);
267         tx.setTransactionId(activemqTx);
268         tx.setType(TransactionInfo.BEGIN);
269
270         sendToActiveMQ(tx, createResponseHandler(command));
271
272     }
273
274     protected void onStompCommit(StompFrame command) throws ProtocolException {
275         checkConnected();
276
277         Map JavaDoc headers = command.getHeaders();
278
279         String JavaDoc stompTx = (String JavaDoc) headers.get(Stomp.Headers.TRANSACTION);
280         if (stompTx==null) {
281             throw new ProtocolException("Must specify the transaction you are committing");
282         }
283
284         TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
285         if (activemqTx == null) {
286             throw new ProtocolException("Invalid transaction id: "+stompTx);
287         }
288
289         TransactionInfo tx = new TransactionInfo();
290         tx.setConnectionId(connectionId);
291         tx.setTransactionId(activemqTx);
292         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
293
294         sendToActiveMQ(tx, createResponseHandler(command));
295     }
296
297     protected void onStompAbort(StompFrame command) throws ProtocolException {
298         checkConnected();
299         Map JavaDoc headers = command.getHeaders();
300
301         String JavaDoc stompTx = (String JavaDoc) headers.get(Stomp.Headers.TRANSACTION);
302         if (stompTx==null) {
303             throw new ProtocolException("Must specify the transaction you are committing");
304         }
305
306         TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
307         if (activemqTx == null) {
308             throw new ProtocolException("Invalid transaction id: "+stompTx);
309         }
310
311         TransactionInfo tx = new TransactionInfo();
312         tx.setConnectionId(connectionId);
313         tx.setTransactionId(activemqTx);
314         tx.setType(TransactionInfo.ROLLBACK);
315
316         sendToActiveMQ(tx, createResponseHandler(command));
317
318     }
319
320     protected void onStompSubscribe(StompFrame command) throws ProtocolException {
321         checkConnected();
322         Map JavaDoc headers = command.getHeaders();
323
324         String JavaDoc subscriptionId = (String JavaDoc)headers.get(Stomp.Headers.Subscribe.ID);
325         String JavaDoc destination = (String JavaDoc)headers.get(Stomp.Headers.Subscribe.DESTINATION);
326
327         ActiveMQDestination actual_dest = frameTranslator.convertDestination(destination);
328         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
329         ConsumerInfo consumerInfo = new ConsumerInfo(id);
330         consumerInfo.setPrefetchSize(1000);
331         consumerInfo.setDispatchAsync(true);
332
333         String JavaDoc selector = (String JavaDoc) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
334         consumerInfo.setSelector(selector);
335
336         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
337
338         consumerInfo.setDestination(frameTranslator.convertDestination(destination));
339
340         StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
341         stompSubscription.setDestination(actual_dest);
342
343         String JavaDoc ackMode = (String JavaDoc)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
344         if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
345             stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
346         } else {
347             stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
348         }
349
350         subscriptionsByConsumerId.put(id, stompSubscription);
351         sendToActiveMQ(consumerInfo, createResponseHandler(command));
352
353     }
354
355     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
356         checkConnected();
357         Map JavaDoc headers = command.getHeaders();
358
359         ActiveMQDestination destination=null;
360         Object JavaDoc o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
361         if( o!=null )
362             destination = frameTranslator.convertDestination((String JavaDoc) o);
363
364         String JavaDoc subscriptionId = (String JavaDoc)headers.get(Stomp.Headers.Unsubscribe.ID);
365
366         if (subscriptionId==null && destination==null) {
367             throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
368         }
369
370         // TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions
371
// are created with the same destination. Perhaps this should be removed.
372
//
373
for (Iterator JavaDoc iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
374             StompSubscription sub = (StompSubscription) iter.next();
375             if (
376                 (subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
377                 (destination!=null && destination.equals(sub.getDestination()) )
378             ) {
379                 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
380                 iter.remove();
381                 return;
382             }
383         }
384
385         throw new ProtocolException("No subscription matched.");
386     }
387
388     protected void onStompConnect(StompFrame command) throws ProtocolException {
389
390         if(connected.get()) {
391             throw new ProtocolException("Allready connected.");
392         }
393
394         final Map JavaDoc headers = command.getHeaders();
395
396         // allow anyone to login for now
397
String JavaDoc login = (String JavaDoc)headers.get(Stomp.Headers.Connect.LOGIN);
398         String JavaDoc passcode = (String JavaDoc)headers.get(Stomp.Headers.Connect.PASSCODE);
399         String JavaDoc clientId = (String JavaDoc)headers.get(Stomp.Headers.Connect.CLIENT_ID);
400
401         final ConnectionInfo connectionInfo = new ConnectionInfo();
402
403         IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
404
405         connectionInfo.setConnectionId(connectionId);
406         if( clientId!=null )
407             connectionInfo.setClientId(clientId);
408         else
409             connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
410
411         connectionInfo.setResponseRequired(true);
412         connectionInfo.setUserName(login);
413         connectionInfo.setPassword(passcode);
414
415         sendToActiveMQ(connectionInfo, new ResponseHandler(){
416             public void onResponse(ProtocolConverter converter, Response response) throws IOException JavaDoc {
417
418                 final SessionInfo sessionInfo = new SessionInfo(sessionId);
419                 sendToActiveMQ(sessionInfo,null);
420
421
422                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
423                 sendToActiveMQ(producerInfo,new ResponseHandler(){
424                     public void onResponse(ProtocolConverter converter, Response response) throws IOException JavaDoc {
425
426                         connected.set(true);
427                         HashMap JavaDoc responseHeaders = new HashMap JavaDoc();
428
429                         responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
430                         String JavaDoc requestId = (String JavaDoc) headers.get(Stomp.Headers.Connect.REQUEST_ID);
431                         if (requestId == null) {
432                             // TODO legacy
433
requestId = (String JavaDoc) headers.get(Stomp.Headers.RECEIPT_REQUESTED);
434                         }
435                         if( requestId !=null ){
436                             // TODO legacy
437
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
438                             responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
439                         }
440
441                         StompFrame sc = new StompFrame();
442                         sc.setAction(Stomp.Responses.CONNECTED);
443                         sc.setHeaders(responseHeaders);
444                         sendToStomp(sc);
445                     }
446                 });
447
448             }
449         });
450     }
451
452     protected void onStompDisconnect(StompFrame command) throws ProtocolException {
453         checkConnected();
454         sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
455         connected.set(false);
456     }
457
458
459     protected void checkConnected() throws ProtocolException {
460         if(!connected.get()) {
461             throw new ProtocolException("Not connected.");
462         }
463     }
464
465     /**
466      * Dispatch a ActiveMQ command
467      * @param command
468      * @throws IOException
469      */

470     public void onActiveMQCommad( Command command ) throws IOException JavaDoc, JMSException JavaDoc {
471
472         if ( command.isResponse() ) {
473
474             Response response = (Response) command;
475             ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer JavaDoc(response.getCorrelationId()));
476             if( rh !=null ) {
477                 rh.onResponse(this, response);
478             }
479
480         } else if( command.isMessageDispatch() ) {
481
482             MessageDispatch md = (MessageDispatch)command;
483             StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
484             if (sub != null) {
485                 sub.onMessageDispatch(md);
486             }
487         }
488     }
489
490     public ActiveMQMessage convertMessage(StompFrame command) throws IOException JavaDoc, JMSException JavaDoc {
491         ActiveMQMessage msg = frameTranslator.convertFrame(command);
492         return msg;
493     }
494
495     public StompFrame convertMessage(ActiveMQMessage message) throws IOException JavaDoc, JMSException JavaDoc {
496         return frameTranslator.convertMessage(message);
497     }
498
499     public StompTransportFilter getTransportFilter() {
500         return transportFilter;
501     }
502 }
503
Popular Tags