1 package com.ubermq.jms.common.datagram.impl; 2 3 4 import com.ubermq.jms.common.datagram.*; 5 import com.ubermq.kernel.*; 6 import java.io.*; 7 import java.nio.*; 8 import org.apache.log4j.*; 9 10 27 public class DatagramFactory 28 implements IDatagramFactory, 29 IMessageDatagramFactory, 30 IControlDatagramFactory, 31 IAckDatagramFactory, 32 java.io.Serializable 33 { 34 private static final Logger log = Logger.getLogger(DatagramFactory.class); 35 public static final long serialVersionUID = 1L; 36 37 public final static byte UBERMQ_START_OF_HEADER = (byte)0xea; 38 public final static int UBERMQ_HEADER_LENGTH = 6; 39 public static final int UBERMQ_TYPE_POSITION = 5; 40 41 public static final int DGRAM_CONTROL = 1; 42 public static final int DGRAM_ACK = 2; 43 public static final int DGRAM_MSG = 3; 44 45 private static final DatagramFactory theInstance; 46 private static final DatagramFactoryHolder theHolder; 47 48 static { 49 theInstance = new DatagramFactory(); 50 theHolder = new DatagramFactoryHolder(theInstance); 51 } 52 53 56 DatagramFactory() {} 57 private Object readResolve() {return theInstance;} 58 59 63 public static DatagramFactory getInstance() {return theInstance;} 64 65 69 public static DatagramFactoryHolder getHolder() {return theHolder;} 70 71 public int frame(ByteBuffer bb) 72 throws IOException 73 { 74 if (bb.remaining() < UBERMQ_HEADER_LENGTH) 75 return UBERMQ_HEADER_LENGTH; 76 77 int boundary = bb.position(); 78 try { 79 if (bb.get() == UBERMQ_START_OF_HEADER) { 80 return UBERMQ_HEADER_LENGTH + bb.getInt(); 81 } else { 82 bb.position(boundary); 83 log.debug(com.ubermq.util.Utility.displayBuffer(bb)); 84 throw new IOException("packet header byte not detected"); 85 } 86 } finally { 87 bb.position(boundary); 88 } 89 } 90 91 public IDatagram incoming(ByteBuffer bb) 92 throws IllegalArgumentException 93 { 94 bb.position(UBERMQ_TYPE_POSITION); 96 int datagramType = bb.get(); 97 98 bb.position(UBERMQ_HEADER_LENGTH); 100 101 try { 103 IDatagram d = createDatagramInstance(datagramType); 104 d.incoming(bb); 105 return d; 106 } 107 catch(IllegalArgumentException iae) {throw iae;} 108 catch(Exception io) {throw new IllegalArgumentException (io.toString());} 109 } 110 111 IDatagram createDatagramInstance(int type) 112 { 113 switch(type) 114 { 115 case DGRAM_ACK: 116 return new AckDatagram(); 117 case DGRAM_CONTROL: 118 return new ControlDatagram(); 119 case DGRAM_MSG: 120 return new MessageDatagram(); 121 default: 122 return null; 123 } 124 } 125 126 public void outgoing(ByteBuffer bb, IDatagram d) 127 { 128 bb.put(UBERMQ_START_OF_HEADER); 133 bb.putInt(0); 134 bb.put((byte)(0xFF & d.getDatagramType())); 135 136 d.outgoing(bb); 138 139 int position = bb.position(); 141 bb.position(1); 142 bb.putInt(position - UBERMQ_HEADER_LENGTH); 143 bb.position(position); 144 } 145 146 148 152 public IControlDatagram durableSubscribe(String durable, String topic) 153 { 154 return durableSubscribe(durable, topic, null); 155 } 156 157 161 public IControlDatagram durableSubscribe(String durable, String topic, String selector) 162 { 163 return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_SUB, 164 new ControlDatagram.DurableSubscribeDatagramImpl(durable, topic, selector)); 165 } 166 167 170 public IControlDatagram durableGoingAway(String durable) 171 { 172 return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_GOING_AWAY, 173 new ControlDatagram.DurableGoingAwayDatagramImpl(durable)); 174 } 175 176 179 public IControlDatagram durableRecover(String durable) 180 { 181 return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_RECOVER, 182 new ControlDatagram.DurableRecoverDatagramImpl(durable)); 183 } 184 185 188 public IControlDatagram durableUnsubscribe(String durable) 189 { 190 return new ControlDatagram(ControlDatagram.CONTROL_DURABLE_UNSUB, 191 new ControlDatagram.DurableUnSubDatagramImpl(durable)); 192 } 193 194 198 public IControlDatagram subscribe(String topic) 199 { 200 return subscribe(topic, null); 201 } 202 203 208 public IControlDatagram subscribe(String topic, String selector) 209 { 210 return new ControlDatagram(ControlDatagram.CONTROL_SUB, 211 new ControlDatagram.SubscribeDatagramImpl(topic, selector)); 212 } 213 214 218 public IControlDatagram unsubscribe(String topic) 219 { 220 return new ControlDatagram(ControlDatagram.CONTROL_UNSUB, 221 new ControlDatagram.UnsubscribeDatagramImpl(topic)); 222 } 223 224 229 public IControlDatagram cluster() 230 { 231 return new ControlDatagram(ControlDatagram.CONTROL_CLUSTER, 232 new ControlDatagram.ClusterDatagramImpl()); 233 } 234 235 238 public IControlDatagram clusterPeer(String peerId) 239 { 240 return new ControlDatagram(ControlDatagram.CONTROL_CLUSTER_PEER_ID, 241 new ControlDatagram.ClusterPeerDatagramImpl(peerId)); 242 } 243 244 247 public IControlDatagram start() 248 { 249 return new ControlDatagram(ControlDatagram.CONTROL_START, 250 new ControlDatagram.StartDatagramImpl()); 251 } 252 253 256 public IControlDatagram stop() 257 { 258 return new ControlDatagram(ControlDatagram.CONTROL_STOP, 259 new ControlDatagram.StopDatagramImpl()); 260 } 261 262 265 public IControlDatagram noop() 266 { 267 return new ControlDatagram(ControlDatagram.CONTROL_NOOP, 268 new ControlDatagram.NoopDatagramImpl()); 269 } 270 271 278 public IControlDatagram queueStart(String queue, String selector) 279 { 280 return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_START, 281 new ControlDatagram.QueueStartDatagramImpl(queue, selector)); 282 } 283 284 289 public IControlDatagram queueStop(String queue) 290 { 291 return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_STOP, 292 new ControlDatagram.QueueStopDatagramImpl(queue)); 293 } 294 295 299 public IControlDatagram queueDelete(String queue) 300 { 301 return new ControlDatagram(ControlDatagram.CONTROL_QUEUE_DELETE, 302 new ControlDatagram.QueueDeleteDatagramImpl(queue)); 303 } 304 305 311 public IAckDatagram ack(MessageId id, boolean nack) 312 { 313 return new AckDatagram(id, nack); 314 } 315 316 320 public IAckDatagram ack(boolean nack) 321 { 322 return new AckDatagram(nack); 323 } 324 325 327 public IMessageDatagram createMessage() 328 { 329 return new MessageDatagram(); 330 } 331 332 public IMessageDatagram createMessage(String topic) 333 { 334 return new MessageDatagram(topic); 335 } 336 337 339 344 public IAckDatagram nack(MessageId id) 345 { 346 return new AckDatagram(id, true); 347 } 348 349 354 public IAckDatagram ack(MessageId id) 355 { 356 return new AckDatagram(id, false); 357 } 358 359 363 public IAckDatagram ack() 364 { 365 return new AckDatagram(false); 366 } 367 368 372 public IAckDatagram nack() 373 { 374 return new AckDatagram(true); 375 } 376 377 378 } 379 | Popular Tags |