KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > udp > CommandDatagramChannel


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.udp;
19
20 import java.io.DataInputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.net.SocketAddress JavaDoc;
24 import java.nio.ByteBuffer JavaDoc;
25 import java.nio.channels.DatagramChannel JavaDoc;
26
27 import org.apache.activemq.command.Command;
28 import org.apache.activemq.command.Endpoint;
29 import org.apache.activemq.command.LastPartialCommand;
30 import org.apache.activemq.command.PartialCommand;
31 import org.apache.activemq.openwire.BooleanStream;
32 import org.apache.activemq.openwire.OpenWireFormat;
33 import org.apache.activemq.transport.reliable.ReplayBuffer;
34 import org.apache.activemq.util.ByteArrayInputStream;
35 import org.apache.activemq.util.ByteArrayOutputStream;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 /**
40  * A strategy for reading datagrams and de-fragmenting them together.
41  *
42  * @version $Revision: 439111 $
43  */

44 public class CommandDatagramChannel extends CommandChannelSupport {
45
46     private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
47
48     private DatagramChannel JavaDoc channel;
49     private ByteBufferPool bufferPool;
50
51     // reading
52
private Object JavaDoc readLock = new Object JavaDoc();
53     private ByteBuffer JavaDoc readBuffer;
54
55     // writing
56
private Object JavaDoc writeLock = new Object JavaDoc();
57     private int defaultMarshalBufferSize = 64 * 1024;
58
59     public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
60             SocketAddress JavaDoc targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel JavaDoc channel,
61             ByteBufferPool bufferPool) {
62         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
63         this.channel = channel;
64         this.bufferPool = bufferPool;
65     }
66
67     public void start() throws Exception JavaDoc {
68         bufferPool.setDefaultSize(datagramSize);
69         bufferPool.start();
70         readBuffer = bufferPool.borrowBuffer();
71     }
72
73     public void stop() throws Exception JavaDoc {
74         bufferPool.stop();
75     }
76
77     public Command read() throws IOException JavaDoc {
78         Command answer = null;
79         Endpoint from = null;
80         synchronized (readLock) {
81             while (true) {
82                 readBuffer.clear();
83                 SocketAddress JavaDoc address = channel.receive(readBuffer);
84
85                 readBuffer.flip();
86
87                 if (readBuffer.limit() == 0) {
88                     continue;
89                 }
90                 from = headerMarshaller.createEndpoint(readBuffer, address);
91
92                 int remaining = readBuffer.remaining();
93                 byte[] data = new byte[remaining];
94                 readBuffer.get(data);
95
96                 // TODO could use a DataInput implementation that talks direct
97
// to
98
// the ByteBuffer to avoid object allocation and unnecessary
99
// buffering?
100
DataInputStream JavaDoc dataIn = new DataInputStream JavaDoc(new ByteArrayInputStream(data));
101                 answer = (Command) wireFormat.unmarshal(dataIn);
102                 break;
103             }
104         }
105         if (answer != null) {
106             answer.setFrom(from);
107
108             if (log.isDebugEnabled()) {
109                 log.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
110             }
111         }
112         return answer;
113     }
114
115     public void write(Command command, SocketAddress JavaDoc address) throws IOException JavaDoc {
116         synchronized (writeLock) {
117
118             ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
119             wireFormat.marshal(command, new DataOutputStream JavaDoc(largeBuffer));
120             byte[] data = largeBuffer.toByteArray();
121             int size = data.length;
122
123             ByteBuffer JavaDoc writeBuffer = bufferPool.borrowBuffer();
124             writeBuffer.clear();
125             headerMarshaller.writeHeader(command, writeBuffer);
126
127             if (size > writeBuffer.remaining()) {
128                 // lets split the command up into chunks
129
int offset = 0;
130                 boolean lastFragment = false;
131                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
132                     // write the header
133
if (fragment > 0) {
134                         writeBuffer = bufferPool.borrowBuffer();
135                         writeBuffer.clear();
136                         headerMarshaller.writeHeader(command, writeBuffer);
137                     }
138
139                     int chunkSize = writeBuffer.remaining();
140
141                     // we need to remove the amount of overhead to write the
142
// partial command
143

144                     // lets write the flags in there
145
BooleanStream bs = null;
146                     if (wireFormat.isTightEncodingEnabled()) {
147                         bs = new BooleanStream();
148                         bs.writeBoolean(true); // the partial data byte[] is
149
// never null
150
}
151
152                     // lets remove the header of the partial command
153
// which is the byte for the type and an int for the size of
154
// the byte[]
155
chunkSize -= 1 // the data type
156
+ 4 // the command ID
157
+ 4; // the size of the partial data
158

159                     // the boolean flags
160
if (bs != null) {
161                         chunkSize -= bs.marshalledSize();
162                     }
163                     else {
164                         chunkSize -= 1;
165                     }
166
167                     if (!wireFormat.isSizePrefixDisabled()) {
168                         // lets write the size of the command buffer
169
writeBuffer.putInt(chunkSize);
170                         chunkSize -= 4;
171                     }
172
173                     lastFragment = offset + chunkSize >= length;
174                     if (chunkSize + offset > length) {
175                         chunkSize = length - offset;
176                     }
177
178                     if (lastFragment) {
179                         writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
180                     }
181                     else {
182                         writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
183                     }
184
185                     if (bs != null) {
186                         bs.marshal(writeBuffer);
187                     }
188
189                     int commandId = command.getCommandId();
190                     if (fragment > 0) {
191                         commandId = sequenceGenerator.getNextSequenceId();
192                     }
193                     writeBuffer.putInt(commandId);
194                     if (bs == null) {
195                         writeBuffer.put((byte) 1);
196                     }
197
198                     // size of byte array
199
writeBuffer.putInt(chunkSize);
200
201                     // now the data
202
writeBuffer.put(data, offset, chunkSize);
203
204                     offset += chunkSize;
205                     sendWriteBuffer(commandId, address, writeBuffer, false);
206                 }
207             }
208             else {
209                 writeBuffer.put(data);
210                 sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
211             }
212         }
213     }
214
215     // Properties
216
// -------------------------------------------------------------------------
217

218     public ByteBufferPool getBufferPool() {
219         return bufferPool;
220     }
221
222     /**
223      * Sets the implementation of the byte buffer pool to use
224      */

225     public void setBufferPool(ByteBufferPool bufferPool) {
226         this.bufferPool = bufferPool;
227     }
228
229     // Implementation methods
230
// -------------------------------------------------------------------------
231
protected void sendWriteBuffer(int commandId, SocketAddress JavaDoc address, ByteBuffer JavaDoc writeBuffer, boolean redelivery)
232             throws IOException JavaDoc {
233         // lets put the datagram into the replay buffer first to prevent timing
234
// issues
235
ReplayBuffer bufferCache = getReplayBuffer();
236         if (bufferCache != null && !redelivery) {
237             bufferCache.addBuffer(commandId, writeBuffer);
238         }
239         
240         writeBuffer.flip();
241
242         if (log.isDebugEnabled()) {
243             String JavaDoc text = (redelivery) ? "REDELIVERING" : "sending";
244             log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
245         }
246         channel.send(writeBuffer, address);
247     }
248
249     public void sendBuffer(int commandId, Object JavaDoc buffer) throws IOException JavaDoc {
250         if (buffer != null) {
251             ByteBuffer JavaDoc writeBuffer = (ByteBuffer JavaDoc) buffer;
252             sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
253         }
254         else {
255             if (log.isWarnEnabled()) {
256                 log.warn("Request for buffer: " + commandId + " is no longer present");
257             }
258         }
259     }
260
261 }
262
Popular Tags