1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.util.ByteBuffer; 33 import com.caucho.util.ThreadPool; 34 import com.caucho.vfs.Path; 35 import com.caucho.vfs.ReadStream; 36 import com.caucho.vfs.ReadWritePair; 37 import com.caucho.vfs.Vfs; 38 import com.caucho.vfs.WriteStream; 39 40 import java.io.IOException ; 41 import java.io.InputStream ; 42 import java.util.HashMap ; 43 import java.util.logging.Level ; 44 import java.util.logging.Logger ; 45 46 49 public class AmqClient extends AmqConnection { 50 private static final Logger log 51 = Logger.getLogger(AmqClient.class.getName()); 52 53 private static final byte []AMQP_HEADER = new byte[] { 54 (byte) 'A', (byte) 'M', (byte) 'Q', (byte) 'P', 55 1, 1, 9, 1 56 }; 57 58 private static final int MAX_PREFETCH = 0; 59 60 private ClassLoader _loader = Thread.currentThread().getContextClassLoader(); 61 62 private String _host; 63 private int _port; 64 private Path _path; 65 66 private ByteBuffer _packet = new ByteBuffer(); 67 68 private ReadThread _readThread; 69 70 AmqClient(String host, int port) 71 { 72 _host = host; 73 _port = port; 74 75 _path = Vfs.lookup("tcp://" + host + ":" + port); 76 } 77 78 81 public AmqClientChannel openChannel() 82 throws IOException 83 { 84 connect(); 85 86 ByteBuffer packet = new ByteBuffer(); 87 packet.addShort(CLASS_CHANNEL); 88 packet.addShort(ID_CHANNEL_OPEN); 89 packet.addInt(MAX_PREFETCH); 90 addShortString(packet, ""); 91 92 AmqClientChannel channel = new AmqClientChannel(this); 93 94 int channelId = addChannel(channel); 95 System.out.println("I: " + channelId); 96 97 writePacket(FRAME_METHOD, channelId, packet); 98 99 channel.waitOpen(); 100 101 return channel; 102 } 103 104 private void connect() 105 throws IOException 106 { 107 synchronized (this) { 108 if (_readThread != null) 109 return; 110 111 log.fine("AMQ client connecting to " + _path); 112 113 ReadWritePair pair = _path.openReadWrite(); 114 115 _is = pair.getReadStream(); 116 _os = pair.getWriteStream(); 117 118 _os.write(AMQP_HEADER, 0, AMQP_HEADER.length); 119 120 doRequest(); 121 } 122 } 123 124 126 protected boolean doConnectionStart(InputStream is) 127 throws IOException 128 { 129 int major = is.read(); 130 int minor = is.read(); 131 132 HashMap <String ,String > props = readTable(is); 133 134 String security = readLongString(is); 135 String locales = readLongString(is); 136 137 _packet.clear(); 138 _packet.addShort(CLASS_CONNECTION); 139 _packet.addShort(ID_CONNECTION_START_OK); 140 141 addTable(_packet, null); 142 addShortString(_packet, "PLAIN"); addLongString(_packet, "harry:quidditch"); addShortString(_packet, "en_US"); 146 writePacket(FRAME_METHOD, 0, _packet); 147 148 return doRequest(); 149 } 150 151 protected boolean doConnectionTune(InputStream is) 152 throws IOException 153 { 154 int channelMax = readShort(is); 155 int frameMax = readInt(is); 156 int heartbeat = readShort(is); 157 158 _packet.clear(); 159 _packet.addShort(CLASS_CONNECTION); 160 _packet.addShort(ID_CONNECTION_TUNE_OK); 161 162 _packet.addShort(256); 163 _packet.addInt(MAX_FRAME); 164 _packet.addShort(HEARTBEAT); 165 166 writePacket(FRAME_METHOD, 0, _packet); 167 168 _packet.clear(); 169 _packet.addShort(CLASS_CONNECTION); 170 _packet.addShort(ID_CONNECTION_OPEN); 171 172 addShortString(_packet, "/" + _host + ":" + _port); addShortString(_packet, ""); _packet.add(0); 176 writePacket(FRAME_METHOD, 0, _packet); 177 178 return doRequest(); 179 } 180 181 protected boolean doConnectionOpenOk(InputStream is) 182 throws IOException 183 { 184 String hosts = readShortString(is); 185 186 _readThread = new ReadThread(); 187 188 log.fine("AMQ: openOk(" + _host + ":" + _port + ")"); 189 System.out.println("OPEN:"); 190 192 ThreadPool.getThreadPool().schedule(_readThread); 193 194 return true; 195 } 196 197 public void close() 198 { 199 try { 200 WriteStream os = _os; 201 _os = null; 202 203 if (os != null) 204 os.close(); 205 } catch (Throwable e) { 206 } 207 208 try { 209 ReadStream is = _is; 210 _is = null; 211 212 if (is != null) 213 is.close(); 214 } catch (Throwable e) { 215 } 216 217 ReadThread readThread = _readThread; 218 _readThread = null; 219 220 if (readThread != null) 221 readThread.close(); 222 } 223 224 class ReadThread implements Runnable { 225 private Thread _thread; 226 227 public void run() 228 { 229 _thread = Thread.currentThread(); 230 try { 231 while (_is != null && doRequest()) { 232 } 233 } catch (IOException e) { 234 log.log(Level.FINE, e.toString(), e); 235 } finally { 236 _thread = null; 237 } 238 } 239 240 void close() 241 { 242 Thread thread = _thread; 243 244 if (thread != null) 245 thread.interrupt(); 246 } 247 } 248 } 249 | Popular Tags |