1 18 package org.apache.activemq; 19 20 import java.io.IOException ; 21 import java.io.InputStream ; 22 import java.util.HashMap ; 23 24 import javax.jms.IllegalStateException ; 25 import javax.jms.InvalidDestinationException ; 26 import javax.jms.JMSException ; 27 28 import org.apache.activemq.command.ActiveMQBytesMessage; 29 import org.apache.activemq.command.ActiveMQDestination; 30 import org.apache.activemq.command.ActiveMQMessage; 31 import org.apache.activemq.command.CommandTypes; 32 import org.apache.activemq.command.ConsumerId; 33 import org.apache.activemq.command.ConsumerInfo; 34 import org.apache.activemq.command.MessageAck; 35 import org.apache.activemq.command.MessageDispatch; 36 import org.apache.activemq.command.ProducerId; 37 import org.apache.activemq.selector.SelectorParser; 38 import org.apache.activemq.util.IOExceptionSupport; 39 import org.apache.activemq.util.IntrospectionSupport; 40 import org.apache.activemq.util.JMSExceptionSupport; 41 42 46 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { 47 48 private final ActiveMQConnection connection; 49 private final ConsumerInfo info; 50 private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); 52 53 private int deliveredCounter = 0; 54 private MessageDispatch lastDelivered; 55 private boolean eosReached; 56 private byte buffer[]; 57 private int pos; 58 59 private ProducerId producerId; 60 private long nextSequenceId=0; 61 62 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, 63 String selector, boolean noLocal, String name, int prefetch) throws JMSException { 64 this.connection = connection; 65 66 if (dest == null) { 67 throw new InvalidDestinationException ("Don't understand null destinations"); 68 } else if (dest.isTemporary()) { 69 String physicalName = dest.getPhysicalName(); 70 71 if (physicalName == null) { 72 throw new IllegalArgumentException ("Physical name of Destination should be valid: " + dest); 73 } 74 75 String connectionID = connection.getConnectionInfo().getConnectionId().getValue(); 76 77 if (physicalName.indexOf(connectionID) < 0) { 78 throw new InvalidDestinationException ("Cannot use a Temporary destination from another Connection"); 79 } 80 81 if (connection.isDeleted(dest)) { 82 throw new InvalidDestinationException ("Cannot use a Temporary destination that has been deleted"); 83 } 84 } 85 86 this.info = new ConsumerInfo(consumerId); 87 this.info.setSubscriptionName(name); 88 89 if (selector != null && selector.trim().length() != 0) { 90 selector = "JMSType='org.apache.activemq.Stream' AND ( "+selector+" ) "; 91 } else { 92 selector = "JMSType='org.apache.activemq.Stream'"; 93 } 94 95 new SelectorParser().parse(selector); 96 this.info.setSelector(selector); 97 98 this.info.setPrefetchSize(prefetch); 99 this.info.setNoLocal(noLocal); 100 this.info.setBrowser(false); 101 this.info.setDispatchAsync(false); 102 103 if (dest.getOptions() != null) { 105 HashMap options = new HashMap (dest.getOptions()); 106 IntrospectionSupport.setProperties(this.info, options, "consumer."); 107 } 108 109 this.info.setDestination(dest); 110 111 this.connection.addInputStream(this); 112 this.connection.addDispatcher(info.getConsumerId(), this); 113 this.connection.syncSendPacket(info); 114 unconsumedMessages.start(); 115 } 116 117 public void close() throws IOException { 118 if (!unconsumedMessages.isClosed()) { 119 try { 120 if (lastDelivered != null) { 121 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 122 connection.asyncSendPacket(ack); 123 } 124 dispose(); 125 this.connection.syncSendPacket(info.createRemoveCommand()); 126 } catch (JMSException e) { 127 throw IOExceptionSupport.create(e); 128 } 129 } 130 } 131 132 public void dispose() { 133 if (!unconsumedMessages.isClosed()) { 134 unconsumedMessages.close(); 135 this.connection.removeDispatcher(info.getConsumerId()); 136 this.connection.removeInputStream(this); 137 } 138 } 139 140 public ActiveMQMessage receive() throws JMSException { 141 checkClosed(); 142 MessageDispatch md; 143 try { 144 md = unconsumedMessages.dequeue(-1); 145 } catch (InterruptedException e) { 146 Thread.currentThread().interrupt(); 147 throw JMSExceptionSupport.create(e); 148 } 149 150 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) 151 return null; 152 153 deliveredCounter++; 154 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { 155 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 156 connection.asyncSendPacket(ack); 157 deliveredCounter = 0; 158 lastDelivered = null; 159 } else { 160 lastDelivered = md; 161 } 162 163 return (ActiveMQMessage) md.getMessage(); 164 } 165 166 169 protected void checkClosed() throws IllegalStateException { 170 if (unconsumedMessages.isClosed()) { 171 throw new IllegalStateException ("The Consumer is closed"); 172 } 173 } 174 175 public int read() throws IOException { 176 fillBuffer(); 177 if( eosReached ) 178 return -1; 179 return buffer[pos++] & 0xff; 180 } 181 182 public int read(byte[] b, int off, int len) throws IOException { 183 fillBuffer(); 184 if( eosReached ) 185 return -1; 186 187 int max = Math.min(len, buffer.length-pos); 188 System.arraycopy(buffer, pos, b, off, max); 189 190 pos += max; 191 return max; 192 } 193 194 private void fillBuffer() throws IOException { 195 if( eosReached || (buffer!=null && buffer.length > pos) ) 196 return; 197 try { 198 while(true) { 199 ActiveMQMessage m = receive(); 200 if( m!=null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE ) { 201 long producerSequenceId = m.getMessageId().getProducerSequenceId(); 203 if( producerId == null ) { 204 if( producerSequenceId!=0 ) { 206 continue; 207 } 208 nextSequenceId++; 209 producerId = m.getMessageId().getProducerId(); 210 } else { 211 if( !m.getMessageId().getProducerId().equals(producerId) ) { 213 throw new IOException ("Received an unexpected message: invalid producer: "+m); 214 } 215 if( producerSequenceId!=nextSequenceId++ ) { 216 throw new IOException ("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: "+m); 217 } 218 } 219 220 ActiveMQBytesMessage bm = (ActiveMQBytesMessage) m; 222 buffer = new byte[(int) bm.getBodyLength()]; 223 bm.readBytes(buffer); 224 pos=0; 225 } else { 226 eosReached=true; 227 } 228 return; 229 } 230 } catch (JMSException e) { 231 eosReached = true; 232 throw IOExceptionSupport.create(e); 233 } 234 } 235 236 public void dispatch(MessageDispatch md) { 237 unconsumedMessages.enqueue(md); 238 } 239 240 public String toString() { 241 return "ActiveMQInputStream { value="+info.getConsumerId()+", producerId=" +producerId+" }"; 242 } 243 244 } 245 | Popular Tags |