1 46 51 package org.mr.api.blocks; 52 53 import java.io.IOException ; 54 import java.io.OutputStream ; 55 import java.nio.ByteBuffer ; 56 57 import javax.jms.BytesMessage ; 58 import javax.jms.JMSException ; 59 import javax.jms.MessageProducer ; 60 import javax.jms.QueueConnection ; 61 import javax.jms.QueueConnectionFactory ; 62 import javax.jms.Session ; 63 import javax.jms.TopicConnection ; 64 import javax.jms.TopicConnectionFactory ; 65 66 import org.mr.MantaAgent; 67 import org.mr.MantaAgentConstants; 68 import org.mr.MantaException; 69 import org.mr.api.jms.MantaBytesMessage; 70 import org.mr.api.jms.MantaQueueConnectionFactory; 71 import org.mr.api.jms.MantaTopicConnectionFactory; 72 import org.mr.kernel.services.MantaService; 73 74 75 76 82 public class MantaOutputStream extends OutputStream { 83 84 public static byte TOPIC = MantaService.SERVICE_TYPE_TOPIC; 85 public static byte QUEUE = MantaService.SERVICE_TYPE_QUEUE; 86 private MantaAgent agent; 87 private int packetBuffSize = 1024; 88 private ByteBuffer buff ; 89 90 private Session sendSession = null; 91 private MessageProducer producer = null; 92 93 private boolean connected = false; 94 95 public MantaOutputStream(){ 96 agent = MantaAgent.getInstance(); 97 agent.init(); 98 buff = ByteBuffer.allocate(packetBuffSize); 99 } 100 101 public MantaOutputStream( int packetBuffSize){ 102 agent = MantaAgent.getInstance(); 103 agent.init(); 104 this.packetBuffSize = packetBuffSize; 105 buff = ByteBuffer.allocate(packetBuffSize); 106 agent = MantaAgent.getInstance(); 107 } 108 109 public synchronized void connect(String destinationName, byte destinationType) throws IOException , MantaException{ 110 111 try{ 112 if(destinationType == QUEUE){ 113 QueueConnectionFactory conFactory = (QueueConnectionFactory ) new MantaQueueConnectionFactory(); 114 QueueConnection con = conFactory.createQueueConnection(); 115 con.start(); 116 sendSession = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 117 producer =sendSession.createProducer(sendSession.createQueue(destinationName)); 118 }else{ 119 TopicConnectionFactory conFactory = (TopicConnectionFactory ) new MantaTopicConnectionFactory(); 120 TopicConnection con = conFactory.createTopicConnection(); 121 con.start(); 122 sendSession = con.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 123 producer =sendSession.createProducer(sendSession.createTopic(destinationName)); 124 } 125 126 127 }catch(Exception e){ 128 throw new IOException (e.getLocalizedMessage()); 129 } 130 connected = true; 131 } 132 133 134 137 public synchronized void write(int b) throws IOException { 138 if(!connected) 139 throw new IOException ("stream no connected - use the connect method"); 140 buff.put((byte)b); 141 if(!buff.hasRemaining()){ 142 flush(); 143 } 144 145 } 146 147 148 160 public synchronized void flush() throws IOException { 161 if(buff.position() == 0) 162 return; 163 try { 164 BytesMessage msg = generateMessage(); 166 msg.setBooleanProperty("endMarker",false); 167 producer.send(msg, MantaAgentConstants.NON_PERSISTENT, 9, 600000) ; 168 } catch (Exception e) { 169 e.printStackTrace(); 170 throw new IOException (e.getLocalizedMessage()); 171 } 172 buff.position(0); 173 } 174 175 185 public void close() throws IOException { 186 if(!connected) 187 return; 188 flush(); 189 try { 191 BytesMessage endMarker = generateMessage(); 192 endMarker.setBooleanProperty("endMarker",true); 193 producer.send(endMarker, MantaAgentConstants.NON_PERSISTENT, 9, 600000) ; 194 } catch (Exception e) { 195 throw new IOException (e.getLocalizedMessage()); 196 } 197 } 198 199 200 private synchronized BytesMessage generateMessage() throws JMSException { 201 MantaBytesMessage result =(MantaBytesMessage) sendSession.createBytesMessage(); 202 result.writeBytes(buff.array(), 0 ,buff.position()); 203 return result; 204 }} 206 | Popular Tags |