KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > util > CommandMessageListener


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.activemq.broker.util;
18
19 import org.apache.activemq.command.ActiveMQTextMessage;
20 import org.apache.activemq.util.FactoryFinder;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.MessageProducer JavaDoc;
29 import javax.jms.Session JavaDoc;
30 import javax.jms.TextMessage JavaDoc;
31 import java.io.BufferedReader JavaDoc;
32 import java.io.IOException JavaDoc;
33 import java.io.InputStreamReader JavaDoc;
34
35 /**
36  * @version $Revision: $
37  */

38 public class CommandMessageListener implements MessageListener JavaDoc {
39     private static final Log log = LogFactory.getLog(CommandMessageListener.class);
40
41     private Session JavaDoc session;
42     private MessageProducer JavaDoc producer;
43     private CommandHandler handler;
44
45     public CommandMessageListener(Session JavaDoc session) {
46         this.session = session;
47     }
48
49     public void onMessage(Message message) {
50         if (log.isDebugEnabled()) {
51             log.debug("Received command: " + message);
52         }
53         if (message instanceof TextMessage) {
54             TextMessage request = (TextMessage) message;
55             try {
56                 Destination JavaDoc replyTo = message.getJMSReplyTo();
57                 if (replyTo == null) {
58                     log.warn("Ignored message as no JMSReplyTo set: " + message);
59                     return;
60                 }
61                 Message response = processCommand(request);
62                 addReplyHeaders(request, response);
63                 getProducer().send(replyTo, response);
64             }
65             catch (Exception JavaDoc e) {
66                 log.error("Failed to process message due to: " + e + ". Message: " + message, e);
67             }
68         }
69         else {
70             log.warn("Ignoring invalid message: " + message);
71         }
72     }
73
74     protected void addReplyHeaders(TextMessage request, Message response) throws JMSException JavaDoc {
75         String JavaDoc correlationID = request.getJMSCorrelationID();
76         if (correlationID != null) {
77             response.setJMSCorrelationID(correlationID);
78         }
79     }
80
81     /**
82      * Processes an incoming JMS message returning the response message
83      */

84     public Message processCommand(TextMessage request) throws Exception JavaDoc {
85         TextMessage response = session.createTextMessage();
86         getHandler().processCommand(request, response);
87         return response;
88     }
89
90     /**
91      * Processes an incoming command from a console and returning the text to output
92      */

93     public String JavaDoc processCommandText(String JavaDoc line) throws Exception JavaDoc {
94         TextMessage request = new ActiveMQTextMessage();
95         request.setText(line);
96         TextMessage response = new ActiveMQTextMessage();
97         getHandler().processCommand(request, response);
98         return response.getText();
99     }
100
101     public Session JavaDoc getSession() {
102         return session;
103     }
104
105     public MessageProducer JavaDoc getProducer() throws JMSException JavaDoc {
106         if (producer == null) {
107             producer = getSession().createProducer(null);
108         }
109         return producer;
110     }
111
112     public CommandHandler getHandler() throws IllegalAccessException JavaDoc, IOException JavaDoc, InstantiationException JavaDoc, ClassNotFoundException JavaDoc {
113         if (handler == null) {
114             handler = createHandler();
115         }
116         return handler;
117     }
118
119     private CommandHandler createHandler() throws IllegalAccessException JavaDoc, IOException JavaDoc, ClassNotFoundException JavaDoc, InstantiationException JavaDoc {
120         FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
121         return (CommandHandler) factoryFinder.newInstance("agent");
122     }
123 }
124
Popular Tags