KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQInputStream


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InputStream JavaDoc;
22 import java.util.HashMap JavaDoc;
23
24 import javax.jms.IllegalStateException JavaDoc;
25 import javax.jms.InvalidDestinationException JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27
28 import org.apache.activemq.command.ActiveMQBytesMessage;
29 import org.apache.activemq.command.ActiveMQDestination;
30 import org.apache.activemq.command.ActiveMQMessage;
31 import org.apache.activemq.command.CommandTypes;
32 import org.apache.activemq.command.ConsumerId;
33 import org.apache.activemq.command.ConsumerInfo;
34 import org.apache.activemq.command.MessageAck;
35 import org.apache.activemq.command.MessageDispatch;
36 import org.apache.activemq.command.ProducerId;
37 import org.apache.activemq.selector.SelectorParser;
38 import org.apache.activemq.util.IOExceptionSupport;
39 import org.apache.activemq.util.IntrospectionSupport;
40 import org.apache.activemq.util.JMSExceptionSupport;
41
42 /**
43  *
44  * @version $Revision$
45  */

46 public class ActiveMQInputStream extends InputStream JavaDoc implements ActiveMQDispatcher {
47
48     private final ActiveMQConnection connection;
49     private final ConsumerInfo info;
50     // These are the messages waiting to be delivered to the client
51
private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
52
53     private int deliveredCounter = 0;
54     private MessageDispatch lastDelivered;
55     private boolean eosReached;
56     private byte buffer[];
57     private int pos;
58     
59     private ProducerId producerId;
60     private long nextSequenceId=0;
61
62     public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest,
63             String JavaDoc selector, boolean noLocal, String JavaDoc name, int prefetch) throws JMSException JavaDoc {
64         this.connection = connection;
65
66         if (dest == null) {
67             throw new InvalidDestinationException JavaDoc("Don't understand null destinations");
68         } else if (dest.isTemporary()) {
69             String JavaDoc physicalName = dest.getPhysicalName();
70
71             if (physicalName == null) {
72                 throw new IllegalArgumentException JavaDoc("Physical name of Destination should be valid: " + dest);
73             }
74
75             String JavaDoc connectionID = connection.getConnectionInfo().getConnectionId().getValue();
76
77             if (physicalName.indexOf(connectionID) < 0) {
78                 throw new InvalidDestinationException JavaDoc("Cannot use a Temporary destination from another Connection");
79             }
80
81             if (connection.isDeleted(dest)) {
82                 throw new InvalidDestinationException JavaDoc("Cannot use a Temporary destination that has been deleted");
83             }
84         }
85
86         this.info = new ConsumerInfo(consumerId);
87         this.info.setSubscriptionName(name);
88
89         if (selector != null && selector.trim().length() != 0) {
90             selector = "JMSType='org.apache.activemq.Stream' AND ( "+selector+" ) ";
91         } else {
92             selector = "JMSType='org.apache.activemq.Stream'";
93         }
94         
95         new SelectorParser().parse(selector);
96         this.info.setSelector(selector);
97         
98         this.info.setPrefetchSize(prefetch);
99         this.info.setNoLocal(noLocal);
100         this.info.setBrowser(false);
101         this.info.setDispatchAsync(false);
102         
103         // Allows the options on the destination to configure the consumerInfo
104
if (dest.getOptions() != null) {
105             HashMap JavaDoc options = new HashMap JavaDoc(dest.getOptions());
106             IntrospectionSupport.setProperties(this.info, options, "consumer.");
107         }
108         
109         this.info.setDestination(dest);
110
111         this.connection.addInputStream(this);
112         this.connection.addDispatcher(info.getConsumerId(), this);
113         this.connection.syncSendPacket(info);
114         unconsumedMessages.start();
115     }
116
117     public void close() throws IOException JavaDoc {
118         if (!unconsumedMessages.isClosed()) {
119             try {
120                 if (lastDelivered != null) {
121                     MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
122                     connection.asyncSendPacket(ack);
123                 }
124                 dispose();
125                 this.connection.syncSendPacket(info.createRemoveCommand());
126             } catch (JMSException JavaDoc e) {
127                 throw IOExceptionSupport.create(e);
128             }
129         }
130     }
131
132     public void dispose() {
133         if (!unconsumedMessages.isClosed()) {
134             unconsumedMessages.close();
135             this.connection.removeDispatcher(info.getConsumerId());
136             this.connection.removeInputStream(this);
137         }
138     }
139
140     public ActiveMQMessage receive() throws JMSException JavaDoc {
141         checkClosed();
142         MessageDispatch md;
143         try {
144             md = unconsumedMessages.dequeue(-1);
145         } catch (InterruptedException JavaDoc e) {
146             Thread.currentThread().interrupt();
147             throw JMSExceptionSupport.create(e);
148         }
149
150         if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired())
151             return null;
152
153         deliveredCounter++;
154         if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
155             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
156             connection.asyncSendPacket(ack);
157             deliveredCounter = 0;
158             lastDelivered = null;
159         } else {
160             lastDelivered = md;
161         }
162
163         return (ActiveMQMessage) md.getMessage();
164     }
165
166     /**
167      * @throws IllegalStateException
168      */

169     protected void checkClosed() throws IllegalStateException JavaDoc {
170         if (unconsumedMessages.isClosed()) {
171             throw new IllegalStateException JavaDoc("The Consumer is closed");
172         }
173     }
174
175     public int read() throws IOException JavaDoc {
176         fillBuffer();
177         if( eosReached )
178             return -1;
179         return buffer[pos++] & 0xff;
180     }
181
182     public int read(byte[] b, int off, int len) throws IOException JavaDoc {
183         fillBuffer();
184         if( eosReached )
185             return -1;
186
187         int max = Math.min(len, buffer.length-pos);
188         System.arraycopy(buffer, pos, b, off, max);
189         
190         pos += max;
191         return max;
192     }
193
194     private void fillBuffer() throws IOException JavaDoc {
195         if( eosReached || (buffer!=null && buffer.length > pos) )
196             return;
197         try {
198             while(true) {
199                 ActiveMQMessage m = receive();
200                 if( m!=null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE ) {
201                     // First message.
202
long producerSequenceId = m.getMessageId().getProducerSequenceId();
203                     if( producerId == null ) {
204                         // We have to start a stream at sequence id = 0
205
if( producerSequenceId!=0 ) {
206                             continue;
207                         }
208                         nextSequenceId++;
209                         producerId = m.getMessageId().getProducerId();
210                     } else {
211                         // Verify it's the next message of the sequence.
212
if( !m.getMessageId().getProducerId().equals(producerId) ) {
213                             throw new IOException JavaDoc("Received an unexpected message: invalid producer: "+m);
214                         }
215                         if( producerSequenceId!=nextSequenceId++ ) {
216                             throw new IOException JavaDoc("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: "+m);
217                         }
218                     }
219                     
220                     // Read the buffer in.
221
ActiveMQBytesMessage bm = (ActiveMQBytesMessage) m;
222                     buffer = new byte[(int) bm.getBodyLength()];
223                     bm.readBytes(buffer);
224                     pos=0;
225                 } else {
226                     eosReached=true;
227                 }
228                 return;
229             }
230         } catch (JMSException JavaDoc e) {
231             eosReached = true;
232             throw IOExceptionSupport.create(e);
233         }
234     }
235
236     public void dispatch(MessageDispatch md) {
237         unconsumedMessages.enqueue(md);
238     }
239
240     public String JavaDoc toString() {
241         return "ActiveMQInputStream { value="+info.getConsumerId()+", producerId=" +producerId+" }";
242     }
243
244 }
245
Popular Tags