KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > amq > AmqConnection


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.amq;
31
32 import com.caucho.util.ByteBuffer;
33 import com.caucho.vfs.ReadStream;
34 import com.caucho.vfs.WriteStream;
35
36 import java.io.IOException JavaDoc;
37 import java.io.InputStream JavaDoc;
38 import java.util.HashMap JavaDoc;
39 import java.util.logging.Logger JavaDoc;
40
41 /**
42  * Common for client and server
43  */

44 public class AmqConnection implements AmqConstants {
45   private static final Logger JavaDoc log =
46     Logger.getLogger(AmqConnection.class.getName());
47
48   // denial-of-service sleep time
49
private static final long DOS_TIME = 5000L;
50   protected static final int MAX_FRAME = 65536;
51   protected static final int HEARTBEAT = 10;
52
53   private static final byte []AMQP_HEADER = new byte[] {
54     (byte) 'A', (byte) 'M', (byte) 'Q', (byte) 'P',
55     1, 1, 9, 1
56   };
57
58   private static final int CONNECTION_START
59     = (CLASS_CONNECTION << 16) + ID_CONNECTION_START;
60   private static final int CONNECTION_START_OK
61     = (CLASS_CONNECTION << 16) + ID_CONNECTION_START_OK;
62   private static final int CONNECTION_TUNE
63     = (CLASS_CONNECTION << 16) + ID_CONNECTION_TUNE;
64   private static final int CONNECTION_TUNE_OK
65     = (CLASS_CONNECTION << 16) + ID_CONNECTION_TUNE_OK;
66   private static final int CONNECTION_OPEN
67     = (CLASS_CONNECTION << 16) + ID_CONNECTION_OPEN;
68   private static final int CONNECTION_OPEN_OK
69     = (CLASS_CONNECTION << 16) + ID_CONNECTION_OPEN_OK;
70
71   protected AmqChannel []_channels = new AmqChannel[256];
72
73   protected ReadStream _is;
74   protected WriteStream _os;
75
76   private int _maxFrameSize = MAX_FRAME;
77
78   private ByteBuffer _packet = new ByteBuffer();
79
80   /**
81    * Sets a channel callback.
82    */

83   protected int addChannel(AmqChannel channel)
84   {
85     int id;
86
87     synchronized (_channels) {
88       for (int i = 1; i < _channels.length; i++) {
89     if (_channels[i] == null) {
90       channel.setId(i);
91       _channels[i] = channel;
92
93       return i;
94     }
95       }
96     }
97
98     return 0;
99   }
100   
101   /**
102    * Handles a new request. The input stream and output stream are
103    * already initialized.
104    */

105   protected boolean doRequest() throws IOException JavaDoc
106   {
107     ReadStream is = _is;
108     
109     int code = is.read();
110
111     switch (code) {
112     case -1:
113       close();
114       return false;
115       
116     case 'A':
117       return doHello();
118
119     case FRAME_METHOD:
120       {
121     int cycle = is.read();
122     int channel = 256 * is.read() + is.read();
123     int size = ((is.read() << 24)
124             + (is.read() << 16)
125             + (is.read() << 8)
126             + (is.read()));
127     int methodClass = 256 * is.read() + is.read();
128     int methodId = 256 * is.read() + is.read();
129
130     if (_maxFrameSize < size)
131       return fatalProtocolError("Frame size too large at " + size);
132
133     _packet.clear();
134     _packet.ensureCapacity(size - 4);
135     is.read(_packet.getBuffer(), 0, size - 4);
136     _packet.setLength(size - 4);
137     int end = is.read();
138
139     if (end != FRAME_END) {
140       return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end));
141     }
142
143     return doMethod(channel, methodClass, methodId, _packet);
144       }
145
146     case FRAME_HEADER:
147       {
148     int cycle = is.read();
149     int channelId = 256 * is.read() + is.read();
150     int size = ((is.read() << 24)
151             + (is.read() << 16)
152             + (is.read() << 8)
153             + (is.read()));
154     int classId = 256 * is.read() + is.read();
155     int weight = 256 * is.read() + is.read();
156     int bodySize = ((is.read() << 56)
157             + (is.read() << 48)
158             + (is.read() << 40)
159             + (is.read() << 32)
160             + (is.read() << 24)
161             + (is.read() << 16)
162             + (is.read() << 8)
163             + (is.read()));
164     
165     _packet.clear();
166     _packet.ensureCapacity(size - 12);
167     is.read(_packet.getBuffer(), 0, size - 12);
168     _packet.setLength(size - 12);
169     int end = is.read();
170
171     if (end != FRAME_END) {
172       return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end));
173     }
174
175     if (channelId <= 0 || _channels.length <= channelId)
176       return fatalProtocolError("header illegal channel at " + channelId);
177
178     AmqChannel channel = _channels[channelId];
179     if (_channels[channelId] == null)
180       return fatalProtocolError("header illegal channel at " + channelId);
181
182     return channel.doHeader(classId, weight, bodySize, _packet.createInputStream());
183       }
184
185     case FRAME_BODY:
186       {
187     int cycle = is.read();
188     int channelId = 256 * is.read() + is.read();
189     int size = ((is.read() << 24)
190             + (is.read() << 16)
191             + (is.read() << 8)
192             + (is.read()));
193
194     if (channelId <= 0 || _channels.length <= channelId)
195       return fatalProtocolError("header illegal channel at " + channelId);
196
197     AmqChannel channel = _channels[channelId];
198     if (_channels[channelId] == null)
199       return fatalProtocolError("header illegal channel at " + channelId);
200
201     while (size > 0) {
202       Chunk chunk = new Chunk(size);
203
204       byte []buffer = chunk.getBuffer();
205       int offset = chunk.getOffset();
206
207       int sublen = is.read(buffer, offset, buffer.length - offset);
208
209       if (sublen < 0)
210         return false;
211
212       chunk.setOffset(offset + sublen);
213
214       channel.addChunk(chunk, offset, sublen);
215
216       size -= sublen;
217     }
218     
219     int end = is.read();
220
221     if (end != FRAME_END) {
222       return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end));
223     }
224
225     channel.endContentFrame();
226
227     return true;
228       }
229       
230     default:
231       System.out.println("BOGUS:" + code);
232       return false;
233     }
234   }
235
236   private boolean doMethod(int channel,
237                int methodClass, int methodId,
238                ByteBuffer packet)
239     throws IOException JavaDoc
240   {
241     System.out.println("METHOD: " + methodClass + "." + methodId);
242     
243     switch (methodClass) {
244     case CLASS_CONNECTION:
245       if (channel != 0)
246     return fatalProtocolError("Connection requires channel 0 at " +
247                   channel);
248       
249       switch (methodId) {
250       case ID_CONNECTION_START:
251     return doConnectionStart(packet.createInputStream());
252       
253       case ID_CONNECTION_START_OK:
254     return doConnectionStartOk(packet.createInputStream());
255       
256       case ID_CONNECTION_TUNE:
257     return doConnectionTune(packet.createInputStream());
258       
259       case ID_CONNECTION_TUNE_OK:
260     return doConnectionTuneOk(packet.createInputStream());
261       
262       case ID_CONNECTION_OPEN:
263     return doConnectionOpen(packet.createInputStream());
264       
265       case ID_CONNECTION_OPEN_OK:
266     return doConnectionOpenOk(packet.createInputStream());
267       }
268       break;
269       
270     case CLASS_CHANNEL:
271       {
272     if (channel <= 0 || _channels.length <= channel)
273       return fatalProtocolError(methodClass + "." + methodId + " illegal channel at " + channel);
274
275     if (methodId == ID_CHANNEL_OPEN)
276       return doChannelOpen(channel, packet.createInputStream());
277       
278     AmqChannel channelCallback = _channels[channel];
279     if (channelCallback == null)
280       return fatalProtocolError(methodClass + "." + methodId + " illegal channel at " + channel);
281       
282     switch (methodId) {
283     case ID_CHANNEL_OPEN_OK:
284       return channelCallback.doOpenOk(packet.createInputStream());
285     }
286       }
287       break;
288
289     case CLASS_QUEUE:
290       {
291     if (channel <= 0 || _channels.length <= channel)
292       return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel);
293       
294     AmqChannel channelCallback = _channels[channel];
295     if (channelCallback == null)
296       return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel);
297
298     switch (methodId) {
299     case ID_QUEUE_DECLARE:
300       return channelCallback.doQueueDeclare(packet.createInputStream());
301       
302     case ID_QUEUE_DECLARE_OK:
303       return channelCallback.doQueueDeclareOk(packet.createInputStream());
304     }
305       }
306       break;
307
308     case CLASS_BASIC:
309       {
310     if (channel <= 0 || _channels.length <= channel)
311       return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel);
312       
313     AmqChannel channelCallback = _channels[channel];
314     if (channelCallback == null)
315       return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel);
316
317     switch (methodId) {
318     case ID_BASIC_PUBLISH:
319       return channelCallback.doBasicPublish(packet.createInputStream());
320     }
321       }
322       break;
323     }
324
325     System.out.println("UNKNOWN METHOD: " + methodClass + "." + methodId);
326       
327     return fatalProtocolError("Unknown method " + methodClass + "." + methodId);
328   }
329
330   protected boolean doHello()
331     throws IOException JavaDoc
332   {
333     return fatalProtocolError("doHello() should not be called");
334   }
335
336   protected boolean doConnectionStart(InputStream JavaDoc is)
337     throws IOException JavaDoc
338   {
339     return fatalProtocolError("doConnectionStart() should not be called");
340   }
341
342   protected boolean doConnectionStartOk(InputStream JavaDoc is)
343     throws IOException JavaDoc
344   {
345     return fatalProtocolError("doConnectionStartOk() should not be called");
346   }
347
348   protected boolean doConnectionTune(InputStream JavaDoc is)
349     throws IOException JavaDoc
350   {
351     return fatalProtocolError("doConnectionTune() should not be called");
352   }
353
354   protected boolean doConnectionTuneOk(InputStream JavaDoc is)
355     throws IOException JavaDoc
356   {
357     return fatalProtocolError("doConnectionTuneOk() should not be called");
358   }
359
360   protected boolean doConnectionOpen(InputStream JavaDoc is)
361     throws IOException JavaDoc
362   {
363     return fatalProtocolError("doConnectionOpen() should not be called");
364   }
365
366   protected boolean doConnectionOpenOk(InputStream JavaDoc is)
367     throws IOException JavaDoc
368   {
369     return fatalProtocolError("doConnectionOpenOk() should not be called");
370   }
371
372   protected boolean doChannelOpen(int id, InputStream JavaDoc is)
373     throws IOException JavaDoc
374   {
375     return fatalProtocolError("doChannelOpen() should not be called");
376   }
377
378   protected boolean fatalProtocolError(String JavaDoc msg)
379     throws IOException JavaDoc
380   {
381     System.out.println("AMQ: " + msg);
382     log.warning("AMQ: " + msg);
383
384     try {
385       Thread.sleep(DOS_TIME);
386     } catch (InterruptedException JavaDoc e) {
387     }
388     
389     close();
390
391     return false;
392   }
393
394   protected final void writePacket(int frame, int channel, ByteBuffer packet)
395     throws IOException JavaDoc
396   {
397     WriteStream os = _os;
398
399     if (os == null)
400       return;
401
402     synchronized (os) {
403       os.write(frame);
404       os.write(CYCLE_TBD);
405       os.write(channel >> 8);
406       os.write(channel);
407       writeInt(os, packet.size());
408       os.write(packet.getBuffer(), 0, packet.getLength());
409       os.write(FRAME_END);
410     }
411
412     Thread.yield();
413
414     synchronized (os) {
415       os.flush(); // XXX: batch?
416
}
417   }
418
419   protected final void writeData(int channel, long length, InputStream JavaDoc is)
420     throws IOException JavaDoc
421   {
422     WriteStream os = _os;
423
424     while (length > 0) {
425       synchronized (os) {
426     os.write(FRAME_BODY);
427     os.write(CYCLE_TBD);
428     os.write(channel >> 8);
429     os.write(channel);
430
431     int offset = os.getBufferOffset() + 4;
432     byte []buffer = os.getBuffer();
433
434     int sublen = buffer.length - offset - 1;
435     if (sublen <= 0) {
436       os.flush();
437       offset = os.getBufferOffset() + 4;
438       buffer = os.getBuffer();
439       sublen = buffer.length - offset - 1;
440     }
441     if (length < sublen)
442       sublen = (int) length;
443
444     sublen = is.read(buffer, offset, sublen);
445
446     if (sublen <= 0)
447       throw new IOException JavaDoc("unexpected EOF");
448
449     buffer[offset - 4] = (byte) (sublen >> 24);
450     buffer[offset - 3] = (byte) (sublen >> 16);
451     buffer[offset - 2] = (byte) (sublen >> 8);
452     buffer[offset - 1] = (byte) (sublen);
453     buffer[offset + sublen] = (byte) FRAME_END;
454
455     os.setBufferOffset(offset + sublen + 1);
456
457     length -= sublen;
458       }
459
460       Thread.yield();
461     }
462
463     synchronized (os) {
464       os.flush(); // XXX: batch?
465
}
466   }
467
468   protected final void addTable(ByteBuffer packet,
469                 HashMap JavaDoc<String JavaDoc,String JavaDoc> props)
470   {
471     packet.addShort(0);
472   }
473
474   protected final void addShort(ByteBuffer packet, String JavaDoc v)
475   {
476     packet.add(v.length());
477     packet.addString(v);
478   }
479
480   protected final void addLongString(ByteBuffer packet, String JavaDoc v)
481   {
482     packet.addInt(v.length());
483     packet.addString(v);
484   }
485
486   protected final void addShortString(ByteBuffer packet, String JavaDoc v)
487   {
488     packet.add(v.length());
489     packet.addString(v);
490   }
491
492   protected final HashMap JavaDoc<String JavaDoc,String JavaDoc> readTable(InputStream JavaDoc is)
493     throws IOException JavaDoc
494   {
495     int length = 256 * is.read() + is.read();
496
497     return null;
498   }
499   
500   protected final int readShort(InputStream JavaDoc is)
501     throws IOException JavaDoc
502   {
503     return 256 * is.read() + is.read();
504   }
505   
506   protected final int readInt(InputStream JavaDoc is)
507     throws IOException JavaDoc
508   {
509     return ((is.read() << 24)
510         + (is.read() << 16)
511         + (is.read() << 8)
512         + (is.read()));
513   }
514
515   protected final String JavaDoc readLongString(InputStream JavaDoc is)
516     throws IOException JavaDoc
517   {
518     int length = ((is.read() << 24)
519           + (is.read() << 16)
520           + (is.read() << 8)
521           + (is.read()));
522
523     char []buf = new char[length];
524
525     for (int i = 0; i < length; i++)
526       buf[i] = (char) is.read();
527
528     return new String JavaDoc(buf, 0, length);
529   }
530
531   protected final String JavaDoc readShortString(InputStream JavaDoc is)
532     throws IOException JavaDoc
533   {
534     int length = is.read();
535
536     char []buf = new char[length];
537
538     for (int i = 0; i < length; i++)
539       buf[i] = (char) is.read();
540
541     return new String JavaDoc(buf, 0, length);
542   }
543
544   protected final void writeInt(WriteStream os, int v)
545     throws IOException JavaDoc
546   {
547     os.write(v >> 24);
548     os.write(v >> 16);
549     os.write(v >> 8);
550     os.write(v);
551   }
552
553   private final void writeShort(WriteStream os, int v)
554     throws IOException JavaDoc
555   {
556     os.write(v >> 8);
557     os.write(v);
558   }
559   
560   public void close()
561   {
562     try {
563       WriteStream os = _os;
564       _os = null;
565
566       if (os != null)
567     os.close();
568     } catch (IOException JavaDoc e) {
569     }
570     
571     ReadStream is = _is;
572     _is = null;
573
574     if (is != null)
575       is.close();
576   }
577 }
578
Popular Tags