1 18 package org.apache.activemq; 19 20 import java.io.IOException ; 21 import java.io.OutputStream ; 22 import java.util.HashMap ; 23 import java.util.Iterator ; 24 import java.util.Map ; 25 26 import javax.jms.InvalidDestinationException ; 27 import javax.jms.JMSException ; 28 29 import org.apache.activemq.command.ActiveMQBytesMessage; 30 import org.apache.activemq.command.ActiveMQDestination; 31 import org.apache.activemq.command.ActiveMQMessage; 32 import org.apache.activemq.command.MessageId; 33 import org.apache.activemq.command.ProducerId; 34 import org.apache.activemq.command.ProducerInfo; 35 import org.apache.activemq.util.IOExceptionSupport; 36 37 40 public class ActiveMQOutputStream extends OutputStream implements Disposable { 41 42 final byte buffer[] = new byte[64 * 1024]; 44 protected int count; 45 46 private final ActiveMQConnection connection; 47 private final HashMap properties; 48 private final ProducerInfo info; 49 50 private long messageSequence; 51 private boolean closed; 52 private final int deliveryMode; 53 private final int priority; 54 private final long timeToLive; 55 56 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, 57 Map properties, int deliveryMode, int priority, long timeToLive) throws JMSException { 58 this.connection = connection; 59 this.deliveryMode = deliveryMode; 60 this.priority = priority; 61 this.timeToLive = timeToLive; 62 this.properties = properties==null ? null : new HashMap (properties); 63 64 if (destination == null) { 65 throw new InvalidDestinationException ("Don't understand null destinations"); 66 } 67 68 this.info = new ProducerInfo(producerId); 69 this.info.setDestination(destination); 70 71 this.connection.addOutputStream(this); 72 this.connection.asyncSendPacket(info); 73 } 74 75 public void close() throws IOException { 76 if (closed == false) { 77 flushBuffer(); 78 try { 79 send(new ActiveMQMessage(), true); 81 dispose(); 82 this.connection.asyncSendPacket(info.createRemoveCommand()); 83 } catch (JMSException e) { 84 IOExceptionSupport.create(e); 85 } 86 } 87 } 88 89 public void dispose() { 90 if (closed == false) { 91 this.connection.removeOutputStream(this); 92 closed = true; 93 } 94 } 95 96 public synchronized void write(int b) throws IOException { 97 buffer[count++] = (byte) b; 98 if (count == buffer.length) { 99 flushBuffer(); 100 } 101 } 102 103 public synchronized void write(byte b[], int off, int len) throws IOException { 104 while(len > 0) { 105 int max = Math.min(len, buffer.length-count); 106 System.arraycopy(b, off, buffer, count, max); 107 108 len -= max; 109 count += max; 110 off += max; 111 112 if (count == buffer.length) { 113 flushBuffer(); 114 } 115 } 116 } 117 118 synchronized public void flush() throws IOException { 119 flushBuffer(); 120 } 121 122 private void flushBuffer() throws IOException { 123 try { 124 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 125 msg.writeBytes(buffer, 0, count); 126 send(msg, false); 127 } catch (JMSException e) { 128 throw IOExceptionSupport.create(e); 129 } 130 count=0; 131 } 132 133 137 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { 138 if (properties != null) { 139 for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) { 140 String key = (String ) iter.next(); 141 Object value = properties.get(key); 142 msg.setObjectProperty(key, value); 143 } 144 } 145 msg.setType("org.apache.activemq.Stream"); 146 msg.setGroupID(info.getProducerId().toString()); 147 if( eosMessage ) { 148 msg.setGroupSequence(-1); 149 } else { 150 msg.setGroupSequence((int) messageSequence); 151 } 152 MessageId id = new MessageId(info.getProducerId(), messageSequence++); 153 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage); 154 } 155 156 public String toString() { 157 return "ActiveMQOutputStream { producerId=" +info.getProducerId()+" }"; 158 } 159 160 } 161 | Popular Tags |