1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.util.ByteBuffer; 33 34 import java.io.IOException ; 35 import java.io.InputStream ; 36 import java.util.logging.Level ; 37 import java.util.logging.Logger ; 38 39 42 public class AmqClientChannel extends AmqChannel { 43 private static final Logger log 44 = Logger.getLogger(AmqClientChannel.class.getName()); 45 46 private String _queue; 47 private boolean _isQueue; 48 49 private long _publishCount; 50 51 AmqClientChannel(AmqConnection conn) 52 { 53 super(conn); 54 } 55 56 boolean openQueue(String queueName) 57 throws IOException 58 { 59 ByteBuffer packet = new ByteBuffer(); 60 61 packet.addShort(CLASS_QUEUE); 62 packet.addShort(ID_QUEUE_DECLARE); 63 int ticket = 0; 64 packet.addShort(ticket); 65 _conn.addShortString(packet, queueName); 66 boolean passive = false; 67 packet.add(passive ? 1 : 0); 68 boolean durable = false; 69 packet.add(durable ? 1 : 0); 70 boolean exclusive = false; 71 packet.add(exclusive ? 1 : 0); 72 boolean autoDelete = false; 73 packet.add(autoDelete ? 1 : 0); 74 _conn.addTable(packet, null); 75 76 _conn.writePacket(FRAME_METHOD, getId(), packet); 77 78 synchronized (this) { 79 if (_isQueue) 80 return true; 81 else if (_isClosed) 82 return false; 83 84 try { 85 this.wait(10000); 86 } catch (Exception e) { 87 log.log(Level.FINER, e.toString(), e); 88 } 89 } 90 91 return _isQueue; 92 } 93 94 boolean publish(long length, InputStream data) 95 throws IOException 96 { 97 ByteBuffer packet = new ByteBuffer(); 98 99 packet.addShort(CLASS_BASIC); 100 packet.addShort(ID_BASIC_PUBLISH); 101 int ticket = 0; 102 packet.addShort(ticket); 103 String exchange = "test"; 104 _conn.addShortString(packet, exchange); 105 String routing = "test-router"; 106 _conn.addShortString(packet, routing); 107 boolean mandatory = false; 108 packet.add(mandatory ? 1 : 0); 109 boolean immediate = false; 110 packet.add(immediate ? 1 : 0); 111 112 _conn.writePacket(FRAME_METHOD, getId(), packet); 113 114 synchronized (this) { 115 _publishCount++; 116 } 117 118 packet.clear(); 119 packet.addShort(CLASS_BASIC); 120 int weight = 0; 121 packet.addShort(weight); 122 packet.addLong(length); 123 System.out.println("LENGTH: " + length); 124 int propFlags = 0; 125 packet.addShort(propFlags); 126 127 _conn.writePacket(FRAME_HEADER, getId(), packet); 128 _conn.writeData(getId(), length, data); 129 130 try { 131 Thread.sleep(2000); 132 } catch (Throwable e) { 133 } 134 135 return true; 136 } 137 138 boolean waitOpen() 139 { 140 synchronized (this) { 141 if (_isOpen) 142 return true; 143 else if (_isClosed) 144 return false; 145 146 try { 147 this.wait(10000); 148 } catch (Exception e) { 149 log.log(Level.FINER, e.toString(), e); 150 } 151 } 152 153 return _isOpen; 154 } 155 156 boolean doOpenOk(InputStream is) 157 { 158 synchronized (this) { 159 if (! _isClosed) 160 _isOpen = true; 161 162 this.notifyAll(); 163 164 } 165 166 return true; 167 } 168 169 boolean doQueueDeclareOk(InputStream is) 170 throws IOException 171 { 172 String queue = _conn.readShortString(is); 173 int messageCount = _conn.readInt(is); 174 int consumerCount = _conn.readInt(is); 175 176 System.out.println("OK! " + queue); 177 178 synchronized (this) { 179 if (! _isClosed) 180 _isQueue = true; 181 182 this.notifyAll(); 183 } 184 185 return true; 186 } 187 } 188 | Popular Tags |