1 28 29 package com.caucho.jms.hessian; 30 31 import com.caucho.ejb.hessian.HessianWriter; 32 import com.caucho.hessian.io.HessianInput; 33 import com.caucho.jms.session.MessageAvailableListener; 34 import com.caucho.services.message.MessageSender; 35 import com.caucho.services.message.MessageServiceException; 36 import com.caucho.util.CharBuffer; 37 import com.caucho.util.L10N; 38 import com.caucho.util.Log; 39 import com.caucho.vfs.Path; 40 import com.caucho.vfs.ReadStream; 41 import com.caucho.vfs.ReadWritePair; 42 import com.caucho.vfs.Vfs; 43 import com.caucho.vfs.WriteStream; 44 45 import javax.jms.JMSException ; 46 import javax.jms.Message ; 47 import javax.jms.ObjectMessage ; 48 import javax.jms.TextMessage ; 49 import java.io.IOException ; 50 import java.util.Enumeration ; 51 import java.util.HashMap ; 52 import java.util.logging.Logger ; 53 54 57 public class HessianQueue extends com.caucho.jms.AbstractQueue 58 implements MessageSender { 59 private final static Logger log = Log.open(HessianQueue.class); 60 static L10N L = new L10N(HessianQueue.class); 61 62 private String queueName; 63 private String url; 64 private Path path; 65 66 public HessianQueue() 67 { 68 } 69 70 73 public String getQueueName() 74 { 75 return queueName; 76 } 77 78 81 public void setQueueName(String name) 82 { 83 this.queueName = name; 84 } 85 86 public void setURL(String url) 87 { 88 this.url = url; 89 } 90 91 public String getURL() 92 { 93 return url; 94 } 95 96 public void setPath(Path path) 97 { 98 this.path = path; 99 } 100 101 public Path getPath() 102 { 103 if (path == null) 104 path = Vfs.lookup(url); 105 106 return path; 107 } 108 109 112 public void addListener(MessageAvailableListener listener) 113 { 114 throw new UnsupportedOperationException (); 115 } 116 117 public void send(Message message) 118 throws JMSException 119 { 120 try { 121 HashMap <String ,Object > headers = new HashMap <String ,Object >(); 122 123 Enumeration names = message.getPropertyNames(); 124 while (names != null && names.hasMoreElements()) { 125 String name = (String ) names.nextElement(); 126 127 Object value = message.getObjectProperty(name); 128 129 headers.put(name, value); 130 } 131 132 if (message instanceof TextMessage ) 133 send(headers, ((TextMessage ) message).getText()); 134 else if (message instanceof ObjectMessage ) 135 send(headers, ((ObjectMessage ) message).getObject()); 136 else 137 send(headers, message); 138 } catch (Exception e) { 139 throw new JMSException (String.valueOf(e)); 140 } 141 } 142 143 public void send(HashMap headers, Object data) 144 throws MessageServiceException 145 { 146 ReadStream is = null; 147 WriteStream os = null; 148 149 try { 150 ReadWritePair pair = getPath().openReadWrite(); 151 is = pair.getReadStream(); 152 os = pair.getWriteStream(); 153 154 HessianWriter out = new HessianWriter(os); 155 HessianInput in = new HessianInput(is); 156 157 out.startCall("send"); 158 159 out.writeObject(headers); 160 out.writeObject(data); 161 162 out.completeCall(); 163 164 os.flush(); 165 166 String status = (String ) is.getAttribute("status"); 167 168 if (! "200".equals(status)) { 169 CharBuffer errorMsg = new CharBuffer(); 170 int ch; 171 172 while ((ch = is.readChar()) >= 0) { 173 errorMsg.append((char) ch); 174 } 175 176 throw new MessageServiceException(errorMsg.toString()); 177 } 178 179 Object result = in.readReply(null); 180 } catch (Throwable e) { 181 e.printStackTrace(); 182 throw new MessageServiceException(e); 183 } finally { 184 if (os != null) { 185 try { 186 os.close(); 187 } catch (IOException e) { 188 } 189 } 190 if (is != null) { 191 is.close(); 192 } 193 } 194 } 195 196 199 public String toString() 200 { 201 return "[HessianQueue " + queueName + "]"; 202 } 203 } 204 205 | Popular Tags |