1 24 25 package org.mr.kernel.services.queues; 26 27 import org.mr.core.protocol.MantaBusMessage; 28 import org.mr.core.protocol.MantaBusMessageConsts; 29 import org.mr.core.util.byteable.Byteable; 30 import org.mr.api.jms.MantaTextMessage; 31 import org.mr.api.jms.MantaQueue; 32 import org.mr.api.jms.MantaBytesMessage; 33 34 import java.util.List ; 35 36 45 import org.mr.kernel.services.queues.msmq.*; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 import javax.jms.JMSException ; 50 import javax.jms.TextMessage ; 51 import javax.jms.BytesMessage ; 52 import javax.jms.MessageNotReadableException ; 53 54 public class MSMQQueueImpl implements ForeignQueueImpl { 55 String queueName; 56 Queue queue; 57 MantaQueue mqueue; 58 Log log; 59 60 public MSMQQueueImpl(String service) { 61 this.queueName = getMSMQName(service); 62 this.mqueue = new MantaQueue(service); 63 this.log = LogFactory.getLog("MSMQQueueImpl-" + service); 64 } 66 public void init() { 67 try { 68 this.queue = new Queue(this.queueName); 69 } catch (MessageQueueException e) { 70 if (e.hresult == 0xC00E0003) { try { 72 this.queue = Queue.create(this.queueName, 73 "Created by MantaRay", false); 74 } catch (MessageQueueException e1) { 75 System.err.println("Error creating queue " + 76 this.queueName + ": " + 77 MessageQueueException. 78 HrToString(e1.hresult)); 79 } 80 } else { 81 System.err.println("Error opening queue " + 82 this.queueName + ": " + 83 MessageQueueException. 84 HrToString(e.hresult)); 85 } 86 } 87 } 88 89 public void sendToHead(MantaBusMessage message) { 91 send(message); 92 } 93 public void send(MantaBusMessage message) { 94 try { 95 Message msMsg = busMessageToMSMQ(message); 96 if (msMsg == null) { 97 this.log.error("Message will not be sent to foreign queue " + 98 this.queueName); 99 return; 100 } 101 this.queue.send(msMsg); 102 } catch (MessageQueueException e) { 103 this.log.error("Error sending message to " + this.queueName + 104 ": " + MessageQueueException.HrToString(e.hresult)); 105 } 106 } 107 public MantaBusMessage receiveNoWait() { 108 return receive(0); 109 } 110 111 112 public MantaBusMessage receive() { 113 return receive(-1); 114 } 115 116 public MantaBusMessage receive(int timeout) { 117 try { 118 Message msMsg = this.queue.receive(timeout); 119 MantaBusMessage mbm = msmqMessageToBus(msMsg); 120 return mbm; 121 } catch (MessageQueueException e) { 122 if (e.hresult != 0xC00E001B) { System.err.println("Error receiving message from " + 124 queueName + ": " + MessageQueueException. 125 HrToString(e.hresult)); 126 } 127 } 128 return null; 129 } 130 131 public List getCopy() { return null; } 132 133 public boolean isEmpty() { 134 try { 135 this.queue.peek(0); 136 return false; 137 } catch (MessageQueueException e) { 138 if (e.hresult != 0xC00E001B) { System.err.println("Error checking emptiness of " + 140 queueName + ": " + MessageQueueException. 141 HrToString(e.hresult)); 142 } 143 } 144 145 return false; 146 } 147 148 public void waitForMessages() { 149 try { 150 this.queue.peek(-1); 151 } catch (MessageQueueException e) { 152 if (e.hresult != 0xC00E001B) { System.err.println("Error waiting for messages of " + 154 queueName + ": " + MessageQueueException. 155 HrToString(e.hresult)); 156 } 157 } 158 } 159 160 private MantaBusMessage msmqMessageToBus(Message msg) { 161 MantaBusMessage mbm = MantaBusMessage.getInstance(); 162 163 try { 164 byte[] body = msg.getMessage(); 165 MantaBytesMessage bytesMessage = new MantaBytesMessage(); 166 167 bytesMessage.writeBytes(body); 168 bytesMessage.setJMSMessageID("ID:" + mbm.getMessageId()); 169 bytesMessage.setJMSDestination(this.mqueue); 170 mbm.setPayload(bytesMessage); 171 mbm.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT); 172 mbm.setPriority((byte) bytesMessage.getJMSPriority()); 173 mbm.addHeader(MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE, 174 MantaBusMessageConsts.PAYLOAD_TYPE_JMS); 175 mbm.setDeliveryMode((byte) bytesMessage.getJMSDeliveryMode()); 176 mbm.setValidUntil(bytesMessage.getJMSExpiration()); 177 } catch (JMSException e) { 178 return null; 179 } 180 181 return mbm; 182 } 183 184 private Message busMessageToMSMQ(MantaBusMessage mbm) { 185 Message message = null; 186 String text; 187 Byteable b = mbm.getPayload(); 188 if (b instanceof TextMessage) { 189 try { 190 text = ((TextMessage) b).getText(); 191 message = new Message(text.getBytes(), "MantaRay", 192 "L:none", 0); 193 } catch (JMSException e) { 194 this.log.error("Cannot convert Manta message to MSMQ " + 195 "message: " + e.getMessage()); 196 } 197 } else if (b instanceof BytesMessage) { 198 try { 199 BytesMessage bm = (BytesMessage) b; 200 bm.reset(); 201 byte[] body = new byte[(int) bm.getBodyLength()]; 202 bm.readBytes(body); 203 message = new Message(body, "MantaRay", "L:none", 0); 204 } catch (MessageNotReadableException mnre) { 205 System.out.println("MSMQQueueImpl: BytesMessage unreadable!"); 206 } catch (JMSException e) { 207 this.log.error("Cannot convert Manta message to MSMQ " + 208 "message: " + e.getMessage()); 209 } 210 } 211 if (message == null) { 212 this.log.error("Cannot convert Manta message to MSMQ message: " + 213 "Unsupported message type: " + 214 b.getClass().getName()); 215 } 216 return message; 217 } 218 219 static String getMSMQName(String service) { 220 if (service.startsWith("msmq:")) { 222 service = service.substring(5); 223 } 224 return "DIRECT=OS:.\\private$\\" + service; 225 } 226 } | Popular Tags |