KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > queues > MSMQQueueImpl


1 /*
2  * Copyright © Coridan Inc.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License as
6  * published by the Free Software Foundation; either version 2 of the
7  * License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA
18  *
19  * Support at:
20  * http://sourceforge.net/projects/mantaray/
21  * Or
22  * support@coridan.com
23  */

24
25 package org.mr.kernel.services.queues;
26
27 import org.mr.core.protocol.MantaBusMessage;
28 import org.mr.core.protocol.MantaBusMessageConsts;
29 import org.mr.core.util.byteable.Byteable;
30 import org.mr.api.jms.MantaTextMessage;
31 import org.mr.api.jms.MantaQueue;
32 import org.mr.api.jms.MantaBytesMessage;
33
34 import java.util.List JavaDoc;
35
36 /**
37  * MSMQQueueImpl.java
38  *
39  *
40  * Created: Thu Apr 06 18:06:09 2006
41  *
42  * @author Uri Schneider
43  * @version 1.0
44  */

45 import org.mr.kernel.services.queues.msmq.*;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49 import javax.jms.JMSException JavaDoc;
50 import javax.jms.TextMessage JavaDoc;
51 import javax.jms.BytesMessage JavaDoc;
52 import javax.jms.MessageNotReadableException JavaDoc;
53
54 public class MSMQQueueImpl implements ForeignQueueImpl {
55     String JavaDoc queueName;
56     Queue queue;
57     MantaQueue mqueue;
58     Log log;
59
60     public MSMQQueueImpl(String JavaDoc service) {
61         this.queueName = getMSMQName(service);
62         this.mqueue = new MantaQueue(service);
63         this.log = LogFactory.getLog("MSMQQueueImpl-" + service);
64     } // MSMQQueueImpl constructor
65

66     public void init() {
67         try {
68             this.queue = new Queue(this.queueName);
69         } catch (MessageQueueException e) {
70             if (e.hresult == 0xC00E0003) { // queue not found
71
try {
72                     this.queue = Queue.create(this.queueName,
73                                               "Created by MantaRay", false);
74                 } catch (MessageQueueException e1) {
75                     System.err.println("Error creating queue " +
76                                        this.queueName + ": " +
77                                        MessageQueueException.
78                                        HrToString(e1.hresult));
79                 }
80             } else {
81                 System.err.println("Error opening queue " +
82                                    this.queueName + ": " +
83                                    MessageQueueException.
84                                    HrToString(e.hresult));
85             }
86         }
87     }
88
89     // TODO this doesn't send to head
90
public void sendToHead(MantaBusMessage message) {
91         send(message);
92     }
93     public void send(MantaBusMessage message) {
94         try {
95             Message msMsg = busMessageToMSMQ(message);
96             if (msMsg == null) {
97                 this.log.error("Message will not be sent to foreign queue " +
98                                this.queueName);
99                 return;
100             }
101             this.queue.send(msMsg);
102         } catch (MessageQueueException e) {
103             this.log.error("Error sending message to " + this.queueName +
104                            ": " + MessageQueueException.HrToString(e.hresult));
105         }
106     }
107     public MantaBusMessage receiveNoWait() {
108         return receive(0);
109     }
110
111     /** inifinite wait */
112     public MantaBusMessage receive() {
113         return receive(-1);
114     }
115
116     public MantaBusMessage receive(int timeout) {
117         try {
118             Message msMsg = this.queue.receive(timeout);
119             MantaBusMessage mbm = msmqMessageToBus(msMsg);
120             return mbm;
121         } catch (MessageQueueException e) {
122             if (e.hresult != 0xC00E001B) { // MQ_ERROR_IO_TIMEOUT
123
System.err.println("Error receiving message from " +
124                                    queueName + ": " + MessageQueueException.
125                                    HrToString(e.hresult));
126             }
127         }
128         return null;
129     }
130
131     public List JavaDoc getCopy() { return null; }
132
133     public boolean isEmpty() {
134         try {
135             this.queue.peek(0);
136             return false;
137         } catch (MessageQueueException e) {
138             if (e.hresult != 0xC00E001B) { // MQ_ERROR_IO_TIMEOUT
139
System.err.println("Error checking emptiness of " +
140                                    queueName + ": " + MessageQueueException.
141                                    HrToString(e.hresult));
142             }
143         }
144
145         return false;
146     }
147
148     public void waitForMessages() {
149         try {
150             this.queue.peek(-1);
151         } catch (MessageQueueException e) {
152             if (e.hresult != 0xC00E001B) { // MQ_ERROR_IO_TIMEOUT
153
System.err.println("Error waiting for messages of " +
154                                    queueName + ": " + MessageQueueException.
155                                    HrToString(e.hresult));
156             }
157         }
158     }
159
160     private MantaBusMessage msmqMessageToBus(Message msg) {
161         MantaBusMessage mbm = MantaBusMessage.getInstance();
162
163         try {
164             byte[] body = msg.getMessage();
165             MantaBytesMessage bytesMessage = new MantaBytesMessage();
166
167             bytesMessage.writeBytes(body);
168             bytesMessage.setJMSMessageID("ID:" + mbm.getMessageId());
169             bytesMessage.setJMSDestination(this.mqueue);
170             mbm.setPayload(bytesMessage);
171             mbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
172             mbm.setPriority((byte) bytesMessage.getJMSPriority());
173             mbm.addHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE,
174                           MantaBusMessageConsts.PAYLOAD_TYPE_JMS);
175             mbm.setDeliveryMode((byte) bytesMessage.getJMSDeliveryMode());
176             mbm.setValidUntil(bytesMessage.getJMSExpiration());
177         } catch (JMSException JavaDoc e) {
178             return null;
179         }
180
181         return mbm;
182     }
183
184     private Message busMessageToMSMQ(MantaBusMessage mbm) {
185         Message message = null;
186         String JavaDoc text;
187         Byteable b = mbm.getPayload();
188         if (b instanceof TextMessage) {
189             try {
190                 text = ((TextMessage) b).getText();
191                 message = new Message(text.getBytes(), "MantaRay",
192                                       "L:none", 0);
193             } catch (JMSException JavaDoc e) {
194                 this.log.error("Cannot convert Manta message to MSMQ " +
195                                "message: " + e.getMessage());
196             }
197         } else if (b instanceof BytesMessage) {
198             try {
199                 BytesMessage bm = (BytesMessage) b;
200                 bm.reset();
201                 byte[] body = new byte[(int) bm.getBodyLength()];
202                 bm.readBytes(body);
203                 message = new Message(body, "MantaRay", "L:none", 0);
204             } catch (MessageNotReadableException JavaDoc mnre) {
205                 System.out.println("MSMQQueueImpl: BytesMessage unreadable!");
206             } catch (JMSException JavaDoc e) {
207                 this.log.error("Cannot convert Manta message to MSMQ " +
208                                "message: " + e.getMessage());
209             }
210         }
211         if (message == null) {
212             this.log.error("Cannot convert Manta message to MSMQ message: " +
213                            "Unsupported message type: " +
214                            b.getClass().getName());
215         }
216         return message;
217     }
218
219     static String JavaDoc getMSMQName(String JavaDoc service) {
220         // remove "msmq:" prefix
221
if (service.startsWith("msmq:")) {
222             service = service.substring(5);
223         }
224         return "DIRECT=OS:.\\private$\\" + service;
225     }
226 } // MSMQQueueImpl
227
Popular Tags