1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.server.connection.Connection; 33 import com.caucho.server.port.ServerRequest; 34 import com.caucho.util.ByteBuffer; 35 36 import java.io.IOException ; 37 import java.io.InputStream ; 38 import java.util.HashMap ; 39 import java.util.logging.Logger ; 40 41 47 public class AmqRequest extends AmqConnection implements ServerRequest 48 { 49 private static final Logger log = 50 Logger.getLogger(AmqRequest.class.getName()); 51 52 private static final byte []AMQP_HEADER = new byte[] { 53 (byte) 'A', (byte) 'M', (byte) 'Q', (byte) 'P', 54 1, 1, 9, 1 55 }; 56 57 private Connection _conn; 58 private ClassLoader _loader; 59 60 private ByteBuffer _packet = new ByteBuffer(); 61 62 AmqRequest(Connection conn, ClassLoader loader) 63 { 64 _conn = conn; 65 _loader = loader; 66 } 67 68 72 public void init() 73 { 74 } 75 76 public boolean isWaitForRead() 77 { 78 return false; 79 } 80 81 89 public boolean handleRequest() throws IOException 90 { 91 _is = _conn.getReadStream(); 92 _os = _conn.getWriteStream(); 93 94 return doRequest(); 95 } 96 97 protected boolean doHello() 98 throws IOException 99 { 100 if (_is.read() != 'M' 101 || _is.read() != 'Q' 102 || _is.read() != 'P' 103 || _is.read() != 1 104 || _is.read() != 1 105 || _is.read() != 9 106 || _is.read() != 1) { 107 _os.write(AMQP_HEADER, 0, AMQP_HEADER.length); 108 _os.flush(); 109 110 return false; 111 } 112 113 _packet.clear(); 114 _packet.addShort(CLASS_CONNECTION); 115 _packet.addShort(ID_CONNECTION_START); 116 _packet.add(9); _packet.add(1); addTable(_packet, null); addLongString(_packet, "PLAIN"); addLongString(_packet, "en_US"); 122 writePacket(FRAME_METHOD, 0, _packet); 123 124 return true; 125 } 126 127 protected boolean doConnectionStartOk(InputStream is) 128 throws IOException 129 { 130 HashMap <String ,String > props = readTable(is); 131 132 String auth = readShortString(is); 133 String credentials = readLongString(is); 134 String locale = readShortString(is); 135 136 _packet.clear(); 137 _packet.addShort(CLASS_CONNECTION); 138 _packet.addShort(ID_CONNECTION_TUNE); 139 _packet.addShort(256); _packet.addInt(MAX_FRAME); _packet.addShort(HEARTBEAT); 143 writePacket(FRAME_METHOD, 0, _packet); 144 145 return true; 146 } 147 148 protected boolean doConnectionTuneOk(InputStream is) 149 throws IOException 150 { 151 int channelMax = readShort(is); 152 int frameMax = readInt(is); 153 154 if (frameMax < 4096 || MAX_FRAME < frameMax) 155 return fatalProtocolError(frameMax + " is an invalid frame size"); 156 157 int heartbeat = readShort(is); 158 159 return true; 160 } 161 162 protected boolean doConnectionOpen(InputStream is) 163 throws IOException 164 { 165 String host = readShortString(is); 166 167 System.out.println("VHOST: " + host); 168 169 _packet.clear(); 170 _packet.addShort(CLASS_CONNECTION); 171 _packet.addShort(ID_CONNECTION_OPEN_OK); 172 addShortString(_packet, ""); 173 174 writePacket(FRAME_METHOD, 0, _packet); 175 176 return true; 177 } 178 179 protected boolean doChannelOpen(int id, InputStream is) 180 throws IOException 181 { 182 int prefetch = readInt(is); 183 String oob = readShortString(is); 184 185 synchronized (_channels) { 186 if (_channels[id] != null) 187 return fatalProtocolError(id + " is an existing channel"); 188 189 _channels[id] = new AmqServerChannel(this, id); 190 191 } 193 194 _packet.clear(); 195 _packet.addShort(CLASS_CHANNEL); 196 _packet.addShort(ID_CHANNEL_OPEN_OK); 197 198 writePacket(FRAME_METHOD, id, _packet); 199 200 return true; 201 } 202 203 public void protocolCloseEvent() 204 { 205 } 206 } 207 | Popular Tags |