KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > api > blocks > MantaInputStream


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on 02/06/2004
48  *
49  * Coridan LTD
50  */

51 package org.mr.api.blocks;
52
53 import java.io.IOException JavaDoc;
54 import java.io.InputStream JavaDoc;
55 import java.nio.ByteBuffer JavaDoc;
56 import java.util.LinkedList JavaDoc;
57
58 import javax.jms.BytesMessage JavaDoc;
59 import javax.jms.JMSException JavaDoc;
60 import javax.jms.Message JavaDoc;
61 import javax.jms.MessageConsumer JavaDoc;
62 import javax.jms.MessageListener JavaDoc;
63 import javax.jms.QueueConnection JavaDoc;
64 import javax.jms.QueueConnectionFactory JavaDoc;
65 import javax.jms.Session JavaDoc;
66 import javax.jms.TopicConnection JavaDoc;
67 import javax.jms.TopicConnectionFactory JavaDoc;
68
69
70 import org.mr.MantaAgent;
71 import org.mr.api.jms.MantaQueueConnectionFactory;
72 import org.mr.api.jms.MantaTopicConnectionFactory;
73 import org.mr.kernel.services.MantaService;
74
75
76
77
78
79 /**
80  *
81  * MantaInputStream in an implementation of an InputStream over JMS queue/topic.
82  *
83  * @author Amir Shevat
84  *
85  *
86  */

87 public class MantaInputStream extends InputStream JavaDoc implements MessageListener JavaDoc{
88     public static byte TOPIC = MantaService.SERVICE_TYPE_TOPIC;
89     public static byte QUEUE = MantaService.SERVICE_TYPE_QUEUE;
90     
91     
92     private MantaAgent agent;
93     private LinkedList JavaDoc buffers = new LinkedList JavaDoc();
94     private ByteBuffer JavaDoc buff = ByteBuffer.allocate(0);
95     
96     private boolean connected = false;
97     private boolean endOfStreamMarker;
98     // JMS objects
99

100     private Session JavaDoc receiveSession = null;
101     private MessageConsumer JavaDoc consumer = null;
102
103     
104     public MantaInputStream(){
105         agent = MantaAgent.getInstance();
106         agent.init();
107     }
108     
109     
110     /* (non-Javadoc)
111      * @see java.io.InputStream#read()
112      */

113     public synchronized int read() throws IOException JavaDoc {
114         
115         if(!connected)
116             throw new IOException JavaDoc("stream no connected - use the connect method");
117         synchronized(buffers){
118             if( !buff.hasRemaining()){
119                 if(buffers.isEmpty()){
120                     //there is no data to read
121
try {
122                         if(endOfStreamMarker){
123                             return -1;
124                         }else{
125                             buffers.wait();
126                         }
127                     } catch (InterruptedException JavaDoc e) {
128                         throw new IOException JavaDoc(e.getLocalizedMessage());
129                     }
130                     return read();
131                 }
132                 
133                 buff = ByteBuffer.wrap(((byte[])buffers.removeFirst()));
134             }
135             int b = buff.get()& 0xff;
136             //System.out.println("{"+b+","+(count++)+"|");
137
return b;
138         }
139     }
140     static int count = 0;
141     
142     public synchronized void connect(String JavaDoc destinationName, byte destinationType) throws IOException JavaDoc{
143         try{
144             if(destinationType == QUEUE){
145                 QueueConnectionFactory JavaDoc conFactory = (QueueConnectionFactory JavaDoc) new MantaQueueConnectionFactory();
146                 QueueConnection JavaDoc con = conFactory.createQueueConnection();
147                 con.start();
148                 receiveSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
149                 consumer =receiveSession.createConsumer(receiveSession.createQueue(destinationName));
150                 consumer.setMessageListener(this);
151             }else{
152                 TopicConnectionFactory JavaDoc conFactory = (TopicConnectionFactory JavaDoc) new MantaTopicConnectionFactory();
153                 TopicConnection JavaDoc con = conFactory.createTopicConnection();
154                 con.start();
155                 receiveSession = con.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
156                 consumer =receiveSession.createConsumer(receiveSession.createTopic(destinationName));
157                 consumer.setMessageListener(this);
158             }
159         }catch(Exception JavaDoc e){
160             e.printStackTrace();
161             throw new IOException JavaDoc(e.getMessage());
162         }
163         
164         connected = true;
165     }
166
167
168     
169     
170     
171      /**
172      * Returns the number of bytes that can be read (or skipped over) from
173      * this input stream without blocking by the next caller of a method for
174      * this input stream. The next caller might be the same thread or or
175      * another thread.
176      *
177      * <p> The <code>available</code> method for class <code>InputStream</code>
178      * always returns <code>0</code>.
179      *
180      * <p> This method should be overridden by subclasses.
181      *
182      * @return the number of bytes that can be read from this input stream
183      * without blocking.
184      * @exception IOException if an I/O error occurs.
185      */

186     public int available() throws IOException JavaDoc {
187         if(!connected)
188             throw new IOException JavaDoc("stream no connected - use the connect method");
189         
190         
191         if( !buff.hasRemaining() && buffers.isEmpty()){
192                 return 0;
193         }
194         if(!buff.hasRemaining())
195             buff = ByteBuffer.wrap(((byte[])buffers.removeFirst()));
196         
197         return buff.remaining();
198                 
199     }
200
201     /**
202      * internal API do not use
203      */

204     public void onMessage(Message JavaDoc arg) {
205         try {
206             BytesMessage JavaDoc msg = (BytesMessage JavaDoc)arg;
207             synchronized(buffers){
208                 endOfStreamMarker = msg.getBooleanProperty("endMarker");
209                 if(endOfStreamMarker){
210                     buffers.notifyAll();
211                     return;
212                 }
213                 int length =(int) msg.getBodyLength();
214                 byte[] body = new byte[length];
215                 msg.readBytes(body);
216                 buffers.addLast(body);
217             
218                 buffers.notifyAll();
219             }
220             
221         } catch (JMSException JavaDoc e) {
222         
223             e.printStackTrace();
224         }
225     }
226 }
227
Popular Tags