1 29 30 package com.caucho.jms.amq; 31 32 import com.caucho.util.ByteBuffer; 33 import com.caucho.util.ThreadPool; 34 35 import java.io.IOException ; 36 import java.io.InputStream ; 37 import java.util.HashMap ; 38 import java.util.logging.Logger ; 39 40 43 public class AmqServerChannel extends AmqChannel { 44 private static final Logger log 45 = Logger.getLogger(AmqServerChannel.class.getName()); 46 47 private static final int BASIC_PUBLISH 48 = (CLASS_BASIC << 8) + ID_BASIC_PUBLISH; 49 50 private String _queue; 51 private boolean _isPublishing; 52 53 private int _contentType; 54 55 AmqServerChannel(AmqConnection conn, int id) 56 { 57 super(conn); 58 59 setId(id); 60 } 61 62 boolean doQueueDeclare(InputStream is) 63 throws IOException 64 { 65 int ticket = _conn.readShort(is); 66 _queue = _conn.readShortString(is); 67 boolean passive = is.read() != 0; 68 boolean durable = is.read() != 0; 69 boolean exclusive = is.read() != 0; 70 boolean autoDelete = is.read() != 0; 71 HashMap <String ,String > arguments = _conn.readTable(is); 72 73 System.out.println("QUEUE: " + _queue); 74 75 ByteBuffer packet = new ByteBuffer(); 76 77 packet.addShort(CLASS_QUEUE); 78 packet.addShort(ID_QUEUE_DECLARE_OK); 79 _conn.addShortString(packet, _queue); 80 int msgCount = 0; 81 packet.addInt(msgCount); 82 int consumerCount = 0; 83 packet.addInt(consumerCount); 84 85 _conn.writePacket(FRAME_METHOD, getId(), packet); 86 87 return true; 88 } 89 90 boolean doBasicPublish(InputStream is) 91 throws IOException 92 { 93 int ticket = _conn.readShort(is); 94 String exchange = _conn.readShortString(is); 95 String routing = _conn.readShortString(is); 96 boolean mandatory = is.read() != 0; 97 boolean immediate = is.read() != 0; 98 99 System.out.println("PUBLISH: " + exchange + " " + routing); 100 101 _contentType = BASIC_PUBLISH; 102 103 return true; 104 } 105 106 void doContentEnd(InputStream is) 107 throws IOException 108 { 109 try { 110 int contentType = _contentType; 111 _contentType = 0; 112 113 switch (contentType) { 114 case BASIC_PUBLISH: 115 Runnable action = new PublishAction(is); 116 is = null; 117 118 ThreadPool.getThreadPool().schedule(action); 119 return; 120 default: 121 System.out.println("UNKNOWN: " + (contentType >> 8) + "." + (contentType & 0xffff)); 122 close(); 123 return; 124 } 125 } finally { 126 if (is != null) 127 is.close(); 128 } 129 } 130 131 class PublishAction implements Runnable { 132 private InputStream _is; 133 134 PublishAction(InputStream is) 135 { 136 _is = is; 137 } 138 139 public void run() 140 { 141 try { 142 System.out.println("PUBLISH!"); 143 } finally { 144 try { 145 _is.close(); 146 } catch (IOException e) { 147 } 148 } 149 } 150 } 151 } 152 | Popular Tags |