1 29 30 package com.caucho.jms.amq; 31 32 import java.io.IOException ; 33 import java.io.InputStream ; 34 import java.util.logging.Logger ; 35 36 39 public class AmqChannel implements AmqConstants { 40 private static final Logger log 41 = Logger.getLogger(AmqChannel.class.getName()); 42 43 protected AmqConnection _conn; 44 private int _id; 45 46 protected boolean _isOpen; 47 protected boolean _isClosed; 48 49 protected long _bodySize; 50 protected ChunkItem _contentHead; 51 protected ChunkItem _contentTail; 52 53 AmqChannel(AmqConnection conn) 54 { 55 _conn = conn; 56 } 57 58 void setId(int id) 59 { 60 _id = id; 61 } 62 63 int getId() 64 { 65 return _id; 66 } 67 68 boolean doOpenOk(InputStream is) 69 throws IOException 70 { 71 return fatal("doOpenOk() should not be called"); 72 } 73 74 boolean doQueueDeclare(InputStream is) 75 throws IOException 76 { 77 return fatal("doQueueDeclare() should not be called"); 78 } 79 80 boolean doQueueDeclareOk(InputStream is) 81 throws IOException 82 { 83 return fatal("doQueueDeclare() should not be called"); 84 } 85 86 boolean doBasicPublish(InputStream is) 87 throws IOException 88 { 89 return fatal("doBasicPublish() should not be called"); 90 } 91 92 boolean doHeader(int classId, int weight, int bodySize, InputStream is) 93 throws IOException 94 { 95 System.out.println("HEADER: " + classId + " " + bodySize); 96 97 _bodySize = bodySize; 98 _contentHead = _contentTail = null; 99 100 return true; 101 } 102 103 void addChunk(Chunk chunk, int offset, int length) 104 throws IOException 105 { 106 _bodySize -= length; 107 108 ChunkItem item = new ChunkItem(chunk, offset, length); 109 110 if (_contentTail != null) { 111 _contentTail.setNext(item); 112 _contentTail = item; 113 } 114 else { 115 _contentHead = _contentTail = item; 116 } 117 } 118 119 void endContentFrame() 120 throws IOException 121 { 122 System.out.println("END_FRAME:" + _bodySize); 123 if (_bodySize <= 0) { 124 ChunkInputStream is = new ChunkInputStream(_contentHead); 125 _contentHead = _contentTail = null; 126 127 doContentEnd(is); 128 } 129 } 130 131 void doContentEnd(InputStream is) 132 throws IOException 133 { 134 throw new IllegalStateException (); 137 } 138 139 boolean fatal(String msg) 140 { 141 log.fine(msg); 142 143 System.out.println(msg); 144 145 close(); 146 147 return false; 148 } 149 150 public void close() 151 { 152 synchronized (this) { 153 boolean isClosed = _isClosed; 154 _isClosed = true; 155 _isOpen = false; 156 157 if (isClosed) 158 return; 159 } 160 } 161 162 static class ChunkItem { 163 private final Chunk _chunk; 164 private final int _offset; 165 private final int _length; 166 167 private ChunkItem _next; 168 169 ChunkItem(Chunk chunk, int offset, int length) 170 { 171 _chunk = chunk; 172 _offset = offset; 173 _length = length; 174 } 175 176 void setNext(ChunkItem next) 177 { 178 _next = next; 179 } 180 } 181 182 static class ChunkInputStream extends InputStream { 183 private ChunkItem _head; 184 private int _offset; 185 186 ChunkInputStream(ChunkItem head) 187 { 188 _head = head; 189 } 190 191 public int read() 192 { 193 return -1; 194 } 195 } 196 } 197 | Popular Tags |