KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > amq > AmqClient


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.amq;
31
32 import com.caucho.util.ByteBuffer;
33 import com.caucho.util.ThreadPool;
34 import com.caucho.vfs.Path;
35 import com.caucho.vfs.ReadStream;
36 import com.caucho.vfs.ReadWritePair;
37 import com.caucho.vfs.Vfs;
38 import com.caucho.vfs.WriteStream;
39
40 import java.io.IOException JavaDoc;
41 import java.io.InputStream JavaDoc;
42 import java.util.HashMap JavaDoc;
43 import java.util.logging.Level JavaDoc;
44 import java.util.logging.Logger JavaDoc;
45
46 /**
47  * AMQ client.
48  */

49 public class AmqClient extends AmqConnection {
50   private static final Logger JavaDoc log
51     = Logger.getLogger(AmqClient.class.getName());
52
53   private static final byte []AMQP_HEADER = new byte[] {
54     (byte) 'A', (byte) 'M', (byte) 'Q', (byte) 'P',
55     1, 1, 9, 1
56   };
57
58   private static final int MAX_PREFETCH = 0;
59
60   private ClassLoader JavaDoc _loader = Thread.currentThread().getContextClassLoader();
61   
62   private String JavaDoc _host;
63   private int _port;
64   private Path _path;
65
66   private ByteBuffer _packet = new ByteBuffer();
67
68   private ReadThread _readThread;
69
70   AmqClient(String JavaDoc host, int port)
71   {
72     _host = host;
73     _port = port;
74
75     _path = Vfs.lookup("tcp://" + host + ":" + port);
76   }
77   
78   /**
79    * Opens a new channel.
80    */

81   public AmqClientChannel openChannel()
82     throws IOException JavaDoc
83   {
84     connect();
85
86     ByteBuffer packet = new ByteBuffer();
87     packet.addShort(CLASS_CHANNEL);
88     packet.addShort(ID_CHANNEL_OPEN);
89     packet.addInt(MAX_PREFETCH);
90     addShortString(packet, "");
91     
92     AmqClientChannel channel = new AmqClientChannel(this);
93
94     int channelId = addChannel(channel);
95     System.out.println("I: " + channelId);
96
97     writePacket(FRAME_METHOD, channelId, packet);
98
99     channel.waitOpen();
100
101     return channel;
102   }
103
104   private void connect()
105     throws IOException JavaDoc
106   {
107     synchronized (this) {
108       if (_readThread != null)
109     return;
110
111       log.fine("AMQ client connecting to " + _path);
112     
113       ReadWritePair pair = _path.openReadWrite();
114
115       _is = pair.getReadStream();
116       _os = pair.getWriteStream();
117
118       _os.write(AMQP_HEADER, 0, AMQP_HEADER.length);
119
120       doRequest();
121     }
122   }
123
124   // type=byte,empty=byte,channel=short,size=int data frame-end=byte
125

126   protected boolean doConnectionStart(InputStream JavaDoc is)
127     throws IOException JavaDoc
128   {
129     int major = is.read();
130     int minor = is.read();
131
132     HashMap JavaDoc<String JavaDoc,String JavaDoc> props = readTable(is);
133
134     String JavaDoc security = readLongString(is);
135     String JavaDoc locales = readLongString(is);
136     
137     _packet.clear();
138     _packet.addShort(CLASS_CONNECTION);
139     _packet.addShort(ID_CONNECTION_START_OK);
140
141     addTable(_packet, null);
142     addShortString(_packet, "PLAIN"); // auth method
143
addLongString(_packet, "harry:quidditch"); // auth credentials
144
addShortString(_packet, "en_US"); // locale
145

146     writePacket(FRAME_METHOD, 0, _packet);
147
148     return doRequest();
149   }
150
151   protected boolean doConnectionTune(InputStream JavaDoc is)
152     throws IOException JavaDoc
153   {
154     int channelMax = readShort(is);
155     int frameMax = readInt(is);
156     int heartbeat = readShort(is);
157     
158     _packet.clear();
159     _packet.addShort(CLASS_CONNECTION);
160     _packet.addShort(ID_CONNECTION_TUNE_OK);
161
162     _packet.addShort(256);
163     _packet.addInt(MAX_FRAME);
164     _packet.addShort(HEARTBEAT);
165
166     writePacket(FRAME_METHOD, 0, _packet);
167     
168     _packet.clear();
169     _packet.addShort(CLASS_CONNECTION);
170     _packet.addShort(ID_CONNECTION_OPEN);
171
172     addShortString(_packet, "/" + _host + ":" + _port); // virtual host
173
addShortString(_packet, ""); // capabilities
174
_packet.add(0); // insist
175

176     writePacket(FRAME_METHOD, 0, _packet);
177
178     return doRequest();
179   }
180
181   protected boolean doConnectionOpenOk(InputStream JavaDoc is)
182     throws IOException JavaDoc
183   {
184     String JavaDoc hosts = readShortString(is);
185
186     _readThread = new ReadThread();
187
188     log.fine("AMQ: openOk(" + _host + ":" + _port + ")");
189     System.out.println("OPEN:");
190     // also the heartbeak
191

192     ThreadPool.getThreadPool().schedule(_readThread);
193
194     return true;
195   }
196
197   public void close()
198   {
199     try {
200       WriteStream os = _os;
201       _os = null;
202       
203       if (os != null)
204     os.close();
205     } catch (Throwable JavaDoc e) {
206     }
207     
208     try {
209       ReadStream is = _is;
210       _is = null;
211       
212       if (is != null)
213     is.close();
214     } catch (Throwable JavaDoc e) {
215     }
216
217     ReadThread readThread = _readThread;
218     _readThread = null;
219     
220     if (readThread != null)
221       readThread.close();
222   }
223
224   class ReadThread implements Runnable JavaDoc {
225     private Thread JavaDoc _thread;
226     
227     public void run()
228     {
229       _thread = Thread.currentThread();
230       try {
231     while (_is != null && doRequest()) {
232     }
233       } catch (IOException JavaDoc e) {
234     log.log(Level.FINE, e.toString(), e);
235       } finally {
236     _thread = null;
237       }
238     }
239
240     void close()
241     {
242       Thread JavaDoc thread = _thread;
243
244       if (thread != null)
245     thread.interrupt();
246     }
247   }
248 }
249
Popular Tags