1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.util.ByteBuffer; 33 import com.caucho.vfs.ReadStream; 34 import com.caucho.vfs.WriteStream; 35 36 import java.io.IOException ; 37 import java.io.InputStream ; 38 import java.util.HashMap ; 39 import java.util.logging.Logger ; 40 41 44 public class AmqConnection implements AmqConstants { 45 private static final Logger log = 46 Logger.getLogger(AmqConnection.class.getName()); 47 48 private static final long DOS_TIME = 5000L; 50 protected static final int MAX_FRAME = 65536; 51 protected static final int HEARTBEAT = 10; 52 53 private static final byte []AMQP_HEADER = new byte[] { 54 (byte) 'A', (byte) 'M', (byte) 'Q', (byte) 'P', 55 1, 1, 9, 1 56 }; 57 58 private static final int CONNECTION_START 59 = (CLASS_CONNECTION << 16) + ID_CONNECTION_START; 60 private static final int CONNECTION_START_OK 61 = (CLASS_CONNECTION << 16) + ID_CONNECTION_START_OK; 62 private static final int CONNECTION_TUNE 63 = (CLASS_CONNECTION << 16) + ID_CONNECTION_TUNE; 64 private static final int CONNECTION_TUNE_OK 65 = (CLASS_CONNECTION << 16) + ID_CONNECTION_TUNE_OK; 66 private static final int CONNECTION_OPEN 67 = (CLASS_CONNECTION << 16) + ID_CONNECTION_OPEN; 68 private static final int CONNECTION_OPEN_OK 69 = (CLASS_CONNECTION << 16) + ID_CONNECTION_OPEN_OK; 70 71 protected AmqChannel []_channels = new AmqChannel[256]; 72 73 protected ReadStream _is; 74 protected WriteStream _os; 75 76 private int _maxFrameSize = MAX_FRAME; 77 78 private ByteBuffer _packet = new ByteBuffer(); 79 80 83 protected int addChannel(AmqChannel channel) 84 { 85 int id; 86 87 synchronized (_channels) { 88 for (int i = 1; i < _channels.length; i++) { 89 if (_channels[i] == null) { 90 channel.setId(i); 91 _channels[i] = channel; 92 93 return i; 94 } 95 } 96 } 97 98 return 0; 99 } 100 101 105 protected boolean doRequest() throws IOException  106 { 107 ReadStream is = _is; 108 109 int code = is.read(); 110 111 switch (code) { 112 case -1: 113 close(); 114 return false; 115 116 case 'A': 117 return doHello(); 118 119 case FRAME_METHOD: 120 { 121 int cycle = is.read(); 122 int channel = 256 * is.read() + is.read(); 123 int size = ((is.read() << 24) 124 + (is.read() << 16) 125 + (is.read() << 8) 126 + (is.read())); 127 int methodClass = 256 * is.read() + is.read(); 128 int methodId = 256 * is.read() + is.read(); 129 130 if (_maxFrameSize < size) 131 return fatalProtocolError("Frame size too large at " + size); 132 133 _packet.clear(); 134 _packet.ensureCapacity(size - 4); 135 is.read(_packet.getBuffer(), 0, size - 4); 136 _packet.setLength(size - 4); 137 int end = is.read(); 138 139 if (end != FRAME_END) { 140 return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end)); 141 } 142 143 return doMethod(channel, methodClass, methodId, _packet); 144 } 145 146 case FRAME_HEADER: 147 { 148 int cycle = is.read(); 149 int channelId = 256 * is.read() + is.read(); 150 int size = ((is.read() << 24) 151 + (is.read() << 16) 152 + (is.read() << 8) 153 + (is.read())); 154 int classId = 256 * is.read() + is.read(); 155 int weight = 256 * is.read() + is.read(); 156 int bodySize = ((is.read() << 56) 157 + (is.read() << 48) 158 + (is.read() << 40) 159 + (is.read() << 32) 160 + (is.read() << 24) 161 + (is.read() << 16) 162 + (is.read() << 8) 163 + (is.read())); 164 165 _packet.clear(); 166 _packet.ensureCapacity(size - 12); 167 is.read(_packet.getBuffer(), 0, size - 12); 168 _packet.setLength(size - 12); 169 int end = is.read(); 170 171 if (end != FRAME_END) { 172 return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end)); 173 } 174 175 if (channelId <= 0 || _channels.length <= channelId) 176 return fatalProtocolError("header illegal channel at " + channelId); 177 178 AmqChannel channel = _channels[channelId]; 179 if (_channels[channelId] == null) 180 return fatalProtocolError("header illegal channel at " + channelId); 181 182 return channel.doHeader(classId, weight, bodySize, _packet.createInputStream()); 183 } 184 185 case FRAME_BODY: 186 { 187 int cycle = is.read(); 188 int channelId = 256 * is.read() + is.read(); 189 int size = ((is.read() << 24) 190 + (is.read() << 16) 191 + (is.read() << 8) 192 + (is.read())); 193 194 if (channelId <= 0 || _channels.length <= channelId) 195 return fatalProtocolError("header illegal channel at " + channelId); 196 197 AmqChannel channel = _channels[channelId]; 198 if (_channels[channelId] == null) 199 return fatalProtocolError("header illegal channel at " + channelId); 200 201 while (size > 0) { 202 Chunk chunk = new Chunk(size); 203 204 byte []buffer = chunk.getBuffer(); 205 int offset = chunk.getOffset(); 206 207 int sublen = is.read(buffer, offset, buffer.length - offset); 208 209 if (sublen < 0) 210 return false; 211 212 chunk.setOffset(offset + sublen); 213 214 channel.addChunk(chunk, offset, sublen); 215 216 size -= sublen; 217 } 218 219 int end = is.read(); 220 221 if (end != FRAME_END) { 222 return fatalProtocolError("Bad packet end at 0x" + Integer.toHexString(end)); 223 } 224 225 channel.endContentFrame(); 226 227 return true; 228 } 229 230 default: 231 System.out.println("BOGUS:" + code); 232 return false; 233 } 234 } 235 236 private boolean doMethod(int channel, 237 int methodClass, int methodId, 238 ByteBuffer packet) 239 throws IOException  240 { 241 System.out.println("METHOD: " + methodClass + "." + methodId); 242 243 switch (methodClass) { 244 case CLASS_CONNECTION: 245 if (channel != 0) 246 return fatalProtocolError("Connection requires channel 0 at " + 247 channel); 248 249 switch (methodId) { 250 case ID_CONNECTION_START: 251 return doConnectionStart(packet.createInputStream()); 252 253 case ID_CONNECTION_START_OK: 254 return doConnectionStartOk(packet.createInputStream()); 255 256 case ID_CONNECTION_TUNE: 257 return doConnectionTune(packet.createInputStream()); 258 259 case ID_CONNECTION_TUNE_OK: 260 return doConnectionTuneOk(packet.createInputStream()); 261 262 case ID_CONNECTION_OPEN: 263 return doConnectionOpen(packet.createInputStream()); 264 265 case ID_CONNECTION_OPEN_OK: 266 return doConnectionOpenOk(packet.createInputStream()); 267 } 268 break; 269 270 case CLASS_CHANNEL: 271 { 272 if (channel <= 0 || _channels.length <= channel) 273 return fatalProtocolError(methodClass + "." + methodId + " illegal channel at " + channel); 274 275 if (methodId == ID_CHANNEL_OPEN) 276 return doChannelOpen(channel, packet.createInputStream()); 277 278 AmqChannel channelCallback = _channels[channel]; 279 if (channelCallback == null) 280 return fatalProtocolError(methodClass + "." + methodId + " illegal channel at " + channel); 281 282 switch (methodId) { 283 case ID_CHANNEL_OPEN_OK: 284 return channelCallback.doOpenOk(packet.createInputStream()); 285 } 286 } 287 break; 288 289 case CLASS_QUEUE: 290 { 291 if (channel <= 0 || _channels.length <= channel) 292 return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel); 293 294 AmqChannel channelCallback = _channels[channel]; 295 if (channelCallback == null) 296 return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel); 297 298 switch (methodId) { 299 case ID_QUEUE_DECLARE: 300 return channelCallback.doQueueDeclare(packet.createInputStream()); 301 302 case ID_QUEUE_DECLARE_OK: 303 return channelCallback.doQueueDeclareOk(packet.createInputStream()); 304 } 305 } 306 break; 307 308 case CLASS_BASIC: 309 { 310 if (channel <= 0 || _channels.length <= channel) 311 return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel); 312 313 AmqChannel channelCallback = _channels[channel]; 314 if (channelCallback == null) 315 return fatalProtocolError(methodClass + '.' + methodId + " illegal channel at " + channel); 316 317 switch (methodId) { 318 case ID_BASIC_PUBLISH: 319 return channelCallback.doBasicPublish(packet.createInputStream()); 320 } 321 } 322 break; 323 } 324 325 System.out.println("UNKNOWN METHOD: " + methodClass + "." + methodId); 326 327 return fatalProtocolError("Unknown method " + methodClass + "." + methodId); 328 } 329 330 protected boolean doHello() 331 throws IOException  332 { 333 return fatalProtocolError("doHello() should not be called"); 334 } 335 336 protected boolean doConnectionStart(InputStream is) 337 throws IOException  338 { 339 return fatalProtocolError("doConnectionStart() should not be called"); 340 } 341 342 protected boolean doConnectionStartOk(InputStream is) 343 throws IOException  344 { 345 return fatalProtocolError("doConnectionStartOk() should not be called"); 346 } 347 348 protected boolean doConnectionTune(InputStream is) 349 throws IOException  350 { 351 return fatalProtocolError("doConnectionTune() should not be called"); 352 } 353 354 protected boolean doConnectionTuneOk(InputStream is) 355 throws IOException  356 { 357 return fatalProtocolError("doConnectionTuneOk() should not be called"); 358 } 359 360 protected boolean doConnectionOpen(InputStream is) 361 throws IOException  362 { 363 return fatalProtocolError("doConnectionOpen() should not be called"); 364 } 365 366 protected boolean doConnectionOpenOk(InputStream is) 367 throws IOException  368 { 369 return fatalProtocolError("doConnectionOpenOk() should not be called"); 370 } 371 372 protected boolean doChannelOpen(int id, InputStream is) 373 throws IOException  374 { 375 return fatalProtocolError("doChannelOpen() should not be called"); 376 } 377 378 protected boolean fatalProtocolError(String msg) 379 throws IOException  380 { 381 System.out.println("AMQ: " + msg); 382 log.warning("AMQ: " + msg); 383 384 try { 385 Thread.sleep(DOS_TIME); 386 } catch (InterruptedException e) { 387 } 388 389 close(); 390 391 return false; 392 } 393 394 protected final void writePacket(int frame, int channel, ByteBuffer packet) 395 throws IOException  396 { 397 WriteStream os = _os; 398 399 if (os == null) 400 return; 401 402 synchronized (os) { 403 os.write(frame); 404 os.write(CYCLE_TBD); 405 os.write(channel >> 8); 406 os.write(channel); 407 writeInt(os, packet.size()); 408 os.write(packet.getBuffer(), 0, packet.getLength()); 409 os.write(FRAME_END); 410 } 411 412 Thread.yield(); 413 414 synchronized (os) { 415 os.flush(); } 417 } 418 419 protected final void writeData(int channel, long length, InputStream is) 420 throws IOException  421 { 422 WriteStream os = _os; 423 424 while (length > 0) { 425 synchronized (os) { 426 os.write(FRAME_BODY); 427 os.write(CYCLE_TBD); 428 os.write(channel >> 8); 429 os.write(channel); 430 431 int offset = os.getBufferOffset() + 4; 432 byte []buffer = os.getBuffer(); 433 434 int sublen = buffer.length - offset - 1; 435 if (sublen <= 0) { 436 os.flush(); 437 offset = os.getBufferOffset() + 4; 438 buffer = os.getBuffer(); 439 sublen = buffer.length - offset - 1; 440 } 441 if (length < sublen) 442 sublen = (int) length; 443 444 sublen = is.read(buffer, offset, sublen); 445 446 if (sublen <= 0) 447 throw new IOException ("unexpected EOF"); 448 449 buffer[offset - 4] = (byte) (sublen >> 24); 450 buffer[offset - 3] = (byte) (sublen >> 16); 451 buffer[offset - 2] = (byte) (sublen >> 8); 452 buffer[offset - 1] = (byte) (sublen); 453 buffer[offset + sublen] = (byte) FRAME_END; 454 455 os.setBufferOffset(offset + sublen + 1); 456 457 length -= sublen; 458 } 459 460 Thread.yield(); 461 } 462 463 synchronized (os) { 464 os.flush(); } 466 } 467 468 protected final void addTable(ByteBuffer packet, 469 HashMap <String ,String > props) 470 { 471 packet.addShort(0); 472 } 473 474 protected final void addShort(ByteBuffer packet, String v) 475 { 476 packet.add(v.length()); 477 packet.addString(v); 478 } 479 480 protected final void addLongString(ByteBuffer packet, String v) 481 { 482 packet.addInt(v.length()); 483 packet.addString(v); 484 } 485 486 protected final void addShortString(ByteBuffer packet, String v) 487 { 488 packet.add(v.length()); 489 packet.addString(v); 490 } 491 492 protected final HashMap <String ,String > readTable(InputStream is) 493 throws IOException  494 { 495 int length = 256 * is.read() + is.read(); 496 497 return null; 498 } 499 500 protected final int readShort(InputStream is) 501 throws IOException  502 { 503 return 256 * is.read() + is.read(); 504 } 505 506 protected final int readInt(InputStream is) 507 throws IOException  508 { 509 return ((is.read() << 24) 510 + (is.read() << 16) 511 + (is.read() << 8) 512 + (is.read())); 513 } 514 515 protected final String readLongString(InputStream is) 516 throws IOException  517 { 518 int length = ((is.read() << 24) 519 + (is.read() << 16) 520 + (is.read() << 8) 521 + (is.read())); 522 523 char []buf = new char[length]; 524 525 for (int i = 0; i < length; i++) 526 buf[i] = (char) is.read(); 527 528 return new String (buf, 0, length); 529 } 530 531 protected final String readShortString(InputStream is) 532 throws IOException  533 { 534 int length = is.read(); 535 536 char []buf = new char[length]; 537 538 for (int i = 0; i < length; i++) 539 buf[i] = (char) is.read(); 540 541 return new String (buf, 0, length); 542 } 543 544 protected final void writeInt(WriteStream os, int v) 545 throws IOException  546 { 547 os.write(v >> 24); 548 os.write(v >> 16); 549 os.write(v >> 8); 550 os.write(v); 551 } 552 553 private final void writeShort(WriteStream os, int v) 554 throws IOException  555 { 556 os.write(v >> 8); 557 os.write(v); 558 } 559 560 public void close() 561 { 562 try { 563 WriteStream os = _os; 564 _os = null; 565 566 if (os != null) 567 os.close(); 568 } catch (IOException e) { 569 } 570 571 ReadStream is = _is; 572 _is = null; 573 574 if (is != null) 575 is.close(); 576 } 577 } 578 | Popular Tags |