1 46 51 package org.mr.api.blocks; 52 53 import java.io.IOException ; 54 import java.io.InputStream ; 55 import java.nio.ByteBuffer ; 56 import java.util.LinkedList ; 57 58 import javax.jms.BytesMessage ; 59 import javax.jms.JMSException ; 60 import javax.jms.Message ; 61 import javax.jms.MessageConsumer ; 62 import javax.jms.MessageListener ; 63 import javax.jms.QueueConnection ; 64 import javax.jms.QueueConnectionFactory ; 65 import javax.jms.Session ; 66 import javax.jms.TopicConnection ; 67 import javax.jms.TopicConnectionFactory ; 68 69 70 import org.mr.MantaAgent; 71 import org.mr.api.jms.MantaQueueConnectionFactory; 72 import org.mr.api.jms.MantaTopicConnectionFactory; 73 import org.mr.kernel.services.MantaService; 74 75 76 77 78 79 87 public class MantaInputStream extends InputStream implements MessageListener { 88 public static byte TOPIC = MantaService.SERVICE_TYPE_TOPIC; 89 public static byte QUEUE = MantaService.SERVICE_TYPE_QUEUE; 90 91 92 private MantaAgent agent; 93 private LinkedList buffers = new LinkedList (); 94 private ByteBuffer buff = ByteBuffer.allocate(0); 95 96 private boolean connected = false; 97 private boolean endOfStreamMarker; 98 100 private Session receiveSession = null; 101 private MessageConsumer consumer = null; 102 103 104 public MantaInputStream(){ 105 agent = MantaAgent.getInstance(); 106 agent.init(); 107 } 108 109 110 113 public synchronized int read() throws IOException { 114 115 if(!connected) 116 throw new IOException ("stream no connected - use the connect method"); 117 synchronized(buffers){ 118 if( !buff.hasRemaining()){ 119 if(buffers.isEmpty()){ 120 try { 122 if(endOfStreamMarker){ 123 return -1; 124 }else{ 125 buffers.wait(); 126 } 127 } catch (InterruptedException e) { 128 throw new IOException (e.getLocalizedMessage()); 129 } 130 return read(); 131 } 132 133 buff = ByteBuffer.wrap(((byte[])buffers.removeFirst())); 134 } 135 int b = buff.get()& 0xff; 136 return b; 138 } 139 } 140 static int count = 0; 141 142 public synchronized void connect(String destinationName, byte destinationType) throws IOException { 143 try{ 144 if(destinationType == QUEUE){ 145 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 146 QueueConnection con = conFactory.createQueueConnection(); 147 con.start(); 148 receiveSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 149 consumer =receiveSession.createConsumer(receiveSession.createQueue(destinationName)); 150 consumer.setMessageListener(this); 151 }else{ 152 TopicConnectionFactory conFactory = (TopicConnectionFactory ) new MantaTopicConnectionFactory(); 153 TopicConnection con = conFactory.createTopicConnection(); 154 con.start(); 155 receiveSession = con.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 156 consumer =receiveSession.createConsumer(receiveSession.createTopic(destinationName)); 157 consumer.setMessageListener(this); 158 } 159 }catch(Exception e){ 160 e.printStackTrace(); 161 throw new IOException (e.getMessage()); 162 } 163 164 connected = true; 165 } 166 167 168 169 170 171 186 public int available() throws IOException { 187 if(!connected) 188 throw new IOException ("stream no connected - use the connect method"); 189 190 191 if( !buff.hasRemaining() && buffers.isEmpty()){ 192 return 0; 193 } 194 if(!buff.hasRemaining()) 195 buff = ByteBuffer.wrap(((byte[])buffers.removeFirst())); 196 197 return buff.remaining(); 198 199 } 200 201 204 public void onMessage(Message arg) { 205 try { 206 BytesMessage msg = (BytesMessage )arg; 207 synchronized(buffers){ 208 endOfStreamMarker = msg.getBooleanProperty("endMarker"); 209 if(endOfStreamMarker){ 210 buffers.notifyAll(); 211 return; 212 } 213 int length =(int) msg.getBodyLength(); 214 byte[] body = new byte[length]; 215 msg.readBytes(body); 216 buffers.addLast(body); 217 218 buffers.notifyAll(); 219 } 220 221 } catch (JMSException e) { 222 223 e.printStackTrace(); 224 } 225 } 226 } 227 | Popular Tags |