KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ActiveMQOutputStream


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.io.IOException JavaDoc;
21 import java.io.OutputStream JavaDoc;
22 import java.util.HashMap JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.Map JavaDoc;
25
26 import javax.jms.InvalidDestinationException JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28
29 import org.apache.activemq.command.ActiveMQBytesMessage;
30 import org.apache.activemq.command.ActiveMQDestination;
31 import org.apache.activemq.command.ActiveMQMessage;
32 import org.apache.activemq.command.MessageId;
33 import org.apache.activemq.command.ProducerId;
34 import org.apache.activemq.command.ProducerInfo;
35 import org.apache.activemq.util.IOExceptionSupport;
36
37 /**
38  * @version $Revision$
39  */

40 public class ActiveMQOutputStream extends OutputStream JavaDoc implements Disposable {
41
42     // Send down 64k messages.
43
final byte buffer[] = new byte[64 * 1024];
44     protected int count;
45
46     private final ActiveMQConnection connection;
47     private final HashMap JavaDoc properties;
48     private final ProducerInfo info;
49
50     private long messageSequence;
51     private boolean closed;
52     private final int deliveryMode;
53     private final int priority;
54     private final long timeToLive;
55
56     public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination,
57             Map JavaDoc properties, int deliveryMode, int priority, long timeToLive) throws JMSException JavaDoc {
58         this.connection = connection;
59         this.deliveryMode = deliveryMode;
60         this.priority = priority;
61         this.timeToLive = timeToLive;
62         this.properties = properties==null ? null : new HashMap JavaDoc(properties);
63
64         if (destination == null) {
65             throw new InvalidDestinationException JavaDoc("Don't understand null destinations");
66         }
67
68         this.info = new ProducerInfo(producerId);
69         this.info.setDestination(destination);
70
71         this.connection.addOutputStream(this);
72         this.connection.asyncSendPacket(info);
73     }
74
75     public void close() throws IOException JavaDoc {
76         if (closed == false) {
77             flushBuffer();
78             try {
79                 // Send an EOS style empty message to signal EOS.
80
send(new ActiveMQMessage(), true);
81                 dispose();
82                 this.connection.asyncSendPacket(info.createRemoveCommand());
83             } catch (JMSException JavaDoc e) {
84                 IOExceptionSupport.create(e);
85             }
86         }
87     }
88
89     public void dispose() {
90         if (closed == false) {
91             this.connection.removeOutputStream(this);
92             closed = true;
93         }
94     }
95
96     public synchronized void write(int b) throws IOException JavaDoc {
97         buffer[count++] = (byte) b;
98         if (count == buffer.length) {
99             flushBuffer();
100         }
101     }
102
103     public synchronized void write(byte b[], int off, int len) throws IOException JavaDoc {
104         while(len > 0) {
105             int max = Math.min(len, buffer.length-count);
106             System.arraycopy(b, off, buffer, count, max);
107             
108             len -= max;
109             count += max;
110             off += max;
111             
112             if (count == buffer.length) {
113                 flushBuffer();
114             }
115         }
116     }
117     
118     synchronized public void flush() throws IOException JavaDoc {
119         flushBuffer();
120     }
121     
122     private void flushBuffer() throws IOException JavaDoc {
123         try {
124             ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
125             msg.writeBytes(buffer, 0, count);
126             send(msg, false);
127         } catch (JMSException JavaDoc e) {
128             throw IOExceptionSupport.create(e);
129         }
130         count=0;
131     }
132
133     /**
134      * @param msg
135      * @throws JMSException
136      */

137     private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException JavaDoc {
138         if (properties != null) {
139             for (Iterator JavaDoc iter = properties.keySet().iterator(); iter.hasNext();) {
140                 String JavaDoc key = (String JavaDoc) iter.next();
141                 Object JavaDoc value = properties.get(key);
142                 msg.setObjectProperty(key, value);
143             }
144         }
145         msg.setType("org.apache.activemq.Stream");
146         msg.setGroupID(info.getProducerId().toString());
147         if( eosMessage ) {
148             msg.setGroupSequence(-1);
149         } else {
150             msg.setGroupSequence((int) messageSequence);
151         }
152         MessageId id = new MessageId(info.getProducerId(), messageSequence++);
153         connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
154     }
155     
156     public String JavaDoc toString() {
157         return "ActiveMQOutputStream { producerId=" +info.getProducerId()+" }";
158     }
159
160 }
161
Popular Tags