KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > common > datagram > impl > DatagramFactory


1 package com.ubermq.jms.common.datagram.impl;
2
3
4 import com.ubermq.jms.common.datagram.*;
5 import com.ubermq.kernel.*;
6 import java.io.*;
7 import java.nio.*;
8 import org.apache.log4j.*;
9
10 /**
11  * The standard UberMQ datagram protocol handler, aka DatagramFactory. This
12  * class also implements message and control datagram creation as well as protocol
13  * handling.
14  *
15  * <PRE>
16  * UBERMQ DATAGRAM HEADER:
17  * ------
18  * SOH = 0xea 1 byte 1 in 255 chance this is a real datagram.
19  * ------
20  * size 4 bytes the size of the data in addition to header data.
21  * ------
22  * </PRE>
23  *
24  * followed by <code>size - 5</code> bytes of datagram specific data.
25  *
26  */

27 public class DatagramFactory
28     implements IDatagramFactory,
29     IMessageDatagramFactory,
30     IControlDatagramFactory,
31     IAckDatagramFactory,
32     java.io.Serializable JavaDoc
33 {
34     private static final Logger log = Logger.getLogger(DatagramFactory.class);
35     public static final long serialVersionUID = 1L;
36
37     public final static byte UBERMQ_START_OF_HEADER = (byte)0xea;
38     public final static int UBERMQ_HEADER_LENGTH = 6;
39     public static final int UBERMQ_TYPE_POSITION = 5;
40
41     public static final int DGRAM_CONTROL = 1;
42     public static final int DGRAM_ACK = 2;
43     public static final int DGRAM_MSG = 3;
44
45     private static final DatagramFactory theInstance;
46     private static final DatagramFactoryHolder theHolder;
47
48     static {
49         theInstance = new DatagramFactory();
50         theHolder = new DatagramFactoryHolder(theInstance);
51     }
52
53     /**
54      * singleton pattern. this is a stateless object.
55      */

56     DatagramFactory() {}
57     private Object JavaDoc readResolve() {return theInstance;}
58
59     /**
60      * Gets the instance of the UberMQ datagram factory.
61      * @return a datagram factory instance.
62      */

63     public static DatagramFactory getInstance() {return theInstance;}
64
65     /**
66      * Gets a datagram factory holder representing the factory singleton.
67      * @return a factory holder
68      */

69     public static DatagramFactoryHolder getHolder() {return theHolder;}
70
71     public int frame(ByteBuffer bb)
72         throws IOException
73     {
74         if (bb.remaining() < UBERMQ_HEADER_LENGTH)
75             return UBERMQ_HEADER_LENGTH;
76
77         int boundary = bb.position();
78         try {
79             if (bb.get() == UBERMQ_START_OF_HEADER) {
80                 return UBERMQ_HEADER_LENGTH + bb.getInt();
81             } else {
82                 bb.position(boundary);
83                 log.debug(com.ubermq.util.Utility.displayBuffer(bb));
84                 throw new IOException("packet header byte not detected");
85             }
86         } finally {
87             bb.position(boundary);
88         }
89     }
90
91     public IDatagram incoming(ByteBuffer bb)
92         throws IllegalArgumentException JavaDoc
93     {
94         // get the type byte.
95
bb.position(UBERMQ_TYPE_POSITION);
96         int datagramType = bb.get();
97
98         // move past all header data
99
bb.position(UBERMQ_HEADER_LENGTH);
100
101         // read the datagram.
102
try {
103             IDatagram d = createDatagramInstance(datagramType);
104             d.incoming(bb);
105             return d;
106         }
107         catch(IllegalArgumentException JavaDoc iae) {throw iae;}
108         catch(Exception JavaDoc io) {throw new IllegalArgumentException JavaDoc(io.toString());}
109     }
110
111     IDatagram createDatagramInstance(int type)
112     {
113         switch(type)
114         {
115             case DGRAM_ACK:
116                 return new AckDatagram();
117             case DGRAM_CONTROL:
118                 return new ControlDatagram();
119             case DGRAM_MSG:
120                 return new MessageDatagram();
121             default:
122                 return null;
123         }
124     }
125
126     public void outgoing(ByteBuffer bb, IDatagram d)
127     {
128         // output the UBER start byte,
129
// then a zero placeholder for the size,
130
// then the datagram type (this implementation only uses
131
// the low order byte)
132
bb.put(UBERMQ_START_OF_HEADER);
133         bb.putInt(0);
134         bb.put((byte)(0xFF & d.getDatagramType()));
135
136         // write the datagram out
137
d.outgoing(bb);
138
139         // update the length byte
140
int position = bb.position();
141         bb.position(1);
142         bb.putInt(position - UBERMQ_HEADER_LENGTH);
143         bb.position(position);
144     }
145
146     ///// IControlDatagramFactory methods
147

148     /**
149      * Creates or resurrects a durable subscription, with the given name and
150      * topic specification.
151      */

152     public IControlDatagram durableSubscribe(String JavaDoc durable, String JavaDoc topic)
153     {
154         return durableSubscribe(durable, topic, null);
155     }
156
157     /**
158      * Creates or resurrects a durable subscription, with the given name,
159      * topic specification and selector.
160      */

161     public IControlDatagram durableSubscribe(String JavaDoc durable, String JavaDoc topic, String JavaDoc selector)
162     {
163         return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_SUB,
164                                    new ControlDatagram.DurableSubscribeDatagramImpl(durable, topic, selector));
165     }
166
167     /**
168      * Indicates that the durable subscription is switching to disconnected mode.
169      */

170     public IControlDatagram durableGoingAway(String JavaDoc durable)
171     {
172         return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_GOING_AWAY,
173                                    new ControlDatagram.DurableGoingAwayDatagramImpl(durable));
174     }
175
176     /**
177      * Recovers a durable subscription, resending all unacknowledged messages.
178      */

179     public IControlDatagram durableRecover(String JavaDoc durable)
180     {
181         return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_RECOVER,
182                                    new ControlDatagram.DurableRecoverDatagramImpl(durable));
183     }
184
185     /**
186      * Permanently removes the named durable subscription.
187      */

188     public IControlDatagram durableUnsubscribe(String JavaDoc durable)
189     {
190         return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_UNSUB,
191                                    new ControlDatagram.DurableUnSubDatagramImpl(durable));
192     }
193
194     /**
195      * Subscribes to the given topic specification. The topic specification
196      * is not defined here; it is only meaningful to the recipient.
197      */

198     public IControlDatagram subscribe(String JavaDoc topic)
199     {
200         return subscribe(topic, null);
201     }
202
203     /**
204      * Subscribes to the given topic specification and message selector.
205      * Both are interpreted by the peer.
206      *
207      */

208     public IControlDatagram subscribe(String JavaDoc topic, String JavaDoc selector)
209     {
210         return new ControlDatagram(ControlDatagram.CONTROL_SUB,
211                                    new ControlDatagram.SubscribeDatagramImpl(topic, selector));
212     }
213
214     /**
215      * Unsubscribe from the topic specification given. The same specification
216      * should have been given in a prior subscribe() call.
217      */

218     public IControlDatagram unsubscribe(String JavaDoc topic)
219     {
220         return new ControlDatagram(ControlDatagram.CONTROL_UNSUB,
221                                    new ControlDatagram.UnsubscribeDatagramImpl(topic));
222     }
223
224     /**
225      * Informs the peer that this connection should be considered as a
226      * clustering propagation connection, and any messages emerging from it
227      * should be interpreted as repeated.
228      */

229     public IControlDatagram cluster()
230     {
231         return new ControlDatagram(ControlDatagram.CONTROL_CLUSTER,
232                                    new ControlDatagram.ClusterDatagramImpl());
233     }
234
235     /**
236      * Gives the peer the unique identifier of this clustering connection.
237      */

238     public IControlDatagram clusterPeer(String JavaDoc peerId)
239     {
240         return new ControlDatagram(ControlDatagram.CONTROL_CLUSTER_PEER_ID,
241                                    new ControlDatagram.ClusterPeerDatagramImpl(peerId));
242     }
243
244     /**
245      * Asks the connection peer to begin sending messages.
246      */

247     public IControlDatagram start()
248     {
249         return new ControlDatagram(ControlDatagram.CONTROL_START,
250                                    new ControlDatagram.StartDatagramImpl());
251     }
252
253     /**
254      * Asks the connection peer to stop sending messages.
255      */

256     public IControlDatagram stop()
257     {
258         return new ControlDatagram(ControlDatagram.CONTROL_STOP,
259                                    new ControlDatagram.StopDatagramImpl());
260     }
261
262     /**
263      * The null operation. This can be used for connection keep alive.
264      */

265     public IControlDatagram noop()
266     {
267         return new ControlDatagram(ControlDatagram.CONTROL_NOOP,
268                                    new ControlDatagram.NoopDatagramImpl());
269     }
270
271     /**
272      * Starts receiving messages from the named queue, with
273      * the specified message selector (or null for none).
274      * @param queue the name of the queue
275      * @param selector a message selector, or null to allow all messages.
276      * @return a control datagram
277      */

278     public IControlDatagram queueStart(String JavaDoc queue, String JavaDoc selector)
279     {
280         return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_START,
281                                    new ControlDatagram.QueueStartDatagramImpl(queue, selector));
282     }
283
284     /**
285      * Stops receiving messages from the named queue.
286      * @param queue the name of the queue
287      * @return a control datagram
288      */

289     public IControlDatagram queueStop(String JavaDoc queue)
290     {
291         return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_STOP,
292                                    new ControlDatagram.QueueStopDatagramImpl(queue));
293     }
294
295     /**
296      * Deletes a queue.
297      * @param queue the name of the queue.
298      */

299     public IControlDatagram queueDelete(String JavaDoc queue)
300     {
301         return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_DELETE,
302                                    new ControlDatagram.QueueDeleteDatagramImpl(queue));
303     }
304
305     /**
306      * Creates an acknowledgement datagram for a message. The acknowledgement
307      * may be positive or negative.
308      * @param id the message identifier
309      * @param nack true if the Ack is negative, false otherwise
310      */

311     public IAckDatagram ack(MessageId id, boolean nack)
312     {
313         return new AckDatagram(id, nack);
314     }
315
316     /**
317      * Creates an acknowledgement datagram that is unrelated to a message.
318      * @param nack true if the Ack is negative, false otherwise
319      */

320     public IAckDatagram ack(boolean nack)
321     {
322         return new AckDatagram(nack);
323     }
324
325     /////// MessageFactory
326

327     public IMessageDatagram createMessage()
328     {
329         return new MessageDatagram();
330     }
331
332     public IMessageDatagram createMessage(String JavaDoc topic)
333     {
334         return new MessageDatagram(topic);
335     }
336
337     /////// Ack Factory
338

339     /**
340      * Creates a negative acknowledgement (NACK) datagram in reference
341      * to the specified message identifier.
342      * @param id a message identifier
343      */

344     public IAckDatagram nack(MessageId id)
345     {
346         return new AckDatagram(id, true);
347     }
348
349     /**
350      * Creates an acknowledgement datagram for the given message identifier.
351      * This creates a positive ack.
352      * @param id a message identifier
353      */

354     public IAckDatagram ack(MessageId id)
355     {
356         return new AckDatagram(id, false);
357     }
358
359     /**
360      * Creates an acknowledgement datagram with no contextual information.
361      * This creates a positive ack.
362      */

363     public IAckDatagram ack()
364     {
365         return new AckDatagram(false);
366     }
367
368     /**
369      * Creates a negative acknowledgement (NACK) datagram
370      * with no contextual information.
371      */

372     public IAckDatagram nack()
373     {
374         return new AckDatagram(true);
375     }
376
377
378 }
379
Popular Tags