KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > command > Message


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.command;
19
20 import java.io.DataInputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.util.Collections JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import org.apache.activemq.ActiveMQConnection;
28 import org.apache.activemq.advisory.AdvisorySupport;
29 import org.apache.activemq.broker.region.MessageReference;
30 import org.apache.activemq.util.ByteArrayInputStream;
31 import org.apache.activemq.util.ByteArrayOutputStream;
32 import org.apache.activemq.util.ByteSequence;
33 import org.apache.activemq.util.MarshallingSupport;
34 import org.apache.activemq.wireformat.WireFormat;
35
36 /**
37  * Represents an ActiveMQ message
38  *
39  * @openwire:marshaller
40  * @version $Revision$
41  */

42 abstract public class Message extends BaseCommand implements MarshallAware, MessageReference {
43
44     public static final int AVERAGE_MESSAGE_SIZE_OVERHEAD = 500;
45     
46     protected MessageId messageId;
47     protected ActiveMQDestination originalDestination;
48     protected TransactionId originalTransactionId;
49
50     protected ProducerId producerId;
51     protected ActiveMQDestination destination;
52     protected TransactionId transactionId;
53     
54     protected long expiration;
55     protected long timestamp;
56     protected long arrival;
57     protected String JavaDoc correlationId;
58     protected ActiveMQDestination replyTo;
59     protected boolean persistent;
60     protected String JavaDoc type;
61     protected byte priority;
62     protected String JavaDoc groupID;
63     protected int groupSequence;
64     protected ConsumerId targetConsumerId;
65     protected boolean compressed = false;
66     protected String JavaDoc userID;
67
68     protected ByteSequence content;
69     protected ByteSequence marshalledProperties;
70     protected DataStructure dataStructure;
71     protected int redeliveryCounter;
72
73     protected int size;
74     protected Map JavaDoc properties;
75     protected boolean readOnlyProperties = false;
76     protected boolean readOnlyBody = false;
77     protected transient boolean recievedByDFBridge = false;
78
79     private transient short referenceCount;
80     private transient ActiveMQConnection connection;
81     private transient org.apache.activemq.broker.region.Destination regionDestination;
82
83     private BrokerId [] brokerPath;
84     protected boolean droppable = false;
85
86     abstract public Message copy();
87     
88     protected void copy(Message copy) {
89         super.copy(copy);
90         copy.producerId = producerId;
91         copy.transactionId = transactionId;
92         copy.destination = destination;
93         copy.messageId = messageId != null ? messageId.copy() : null;
94         copy.originalDestination = originalDestination;
95         copy.originalTransactionId = originalTransactionId;
96         copy.expiration = expiration;
97         copy.timestamp = timestamp;
98         copy.correlationId = correlationId;
99         copy.replyTo = replyTo;
100         copy.persistent = persistent;
101         copy.redeliveryCounter = redeliveryCounter;
102         copy.type = type;
103         copy.priority = priority;
104         copy.size = size;
105         copy.groupID = groupID;
106         copy.userID = userID;
107         copy.groupSequence = groupSequence;
108
109         if( properties!=null )
110             copy.properties = new HashMap JavaDoc(properties);
111         else
112             copy.properties = properties;
113
114         copy.content = content;
115         copy.marshalledProperties = marshalledProperties;
116         copy.dataStructure = dataStructure;
117         copy.readOnlyProperties = readOnlyProperties;
118         copy.readOnlyBody = readOnlyBody;
119         copy.compressed = compressed;
120         copy.recievedByDFBridge = recievedByDFBridge;
121         
122         copy.arrival = arrival;
123         copy.connection = connection;
124         copy.regionDestination = regionDestination;
125         //copying the broker path breaks networks - if a consumer re-uses a consumed
126
//message and forwards it on
127
//copy.brokerPath = brokerPath;
128

129         // lets not copy the following fields
130
//copy.targetConsumerId = targetConsumerId;
131
//copy.referenceCount = referenceCount;
132
}
133         
134     public Object JavaDoc getProperty(String JavaDoc name) throws IOException JavaDoc {
135         if( properties == null ) {
136             if( marshalledProperties ==null )
137                 return null;
138             properties = unmarsallProperties(marshalledProperties);
139         }
140         return properties.get(name);
141     }
142     
143     public Map JavaDoc getProperties() throws IOException JavaDoc {
144         if( properties == null ) {
145             if( marshalledProperties==null )
146                 return Collections.EMPTY_MAP;
147             properties = unmarsallProperties(marshalledProperties);
148         }
149         return Collections.unmodifiableMap(properties);
150     }
151     
152     public void clearProperties() {
153         marshalledProperties = null;
154         properties=null;
155     }
156
157
158     public void setProperty(String JavaDoc name, Object JavaDoc value) throws IOException JavaDoc {
159         lazyCreateProperties();
160         properties.put(name, value);
161     }
162
163     protected void lazyCreateProperties() throws IOException JavaDoc {
164         if( properties == null ) {
165             if( marshalledProperties == null ) {
166                 properties = new HashMap JavaDoc();
167             } else {
168                 properties = unmarsallProperties(marshalledProperties);
169                 marshalledProperties = null;
170             }
171         }
172     }
173     
174     private Map JavaDoc unmarsallProperties(ByteSequence marshalledProperties) throws IOException JavaDoc {
175         return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream JavaDoc(new ByteArrayInputStream(marshalledProperties)));
176     }
177
178     public void beforeMarshall(WireFormat wireFormat) throws IOException JavaDoc {
179         // Need to marshal the properties.
180
if( marshalledProperties==null && properties!=null ) {
181             ByteArrayOutputStream baos = new ByteArrayOutputStream();
182             DataOutputStream JavaDoc os = new DataOutputStream JavaDoc(baos);
183             MarshallingSupport.marshalPrimitiveMap(properties, os);
184             os.close();
185             marshalledProperties = baos.toByteSequence();
186         }
187     }
188
189     public void afterMarshall(WireFormat wireFormat) throws IOException JavaDoc {
190     }
191
192     public void beforeUnmarshall(WireFormat wireFormat) throws IOException JavaDoc {
193     }
194
195     public void afterUnmarshall(WireFormat wireFormat) throws IOException JavaDoc {
196     }
197
198
199     ///////////////////////////////////////////////////////////////////
200
//
201
// Simple Field accessors
202
//
203
///////////////////////////////////////////////////////////////////
204

205     /**
206      * @openwire:property version=1 cache=true
207      */

208     public ProducerId getProducerId() {
209         return producerId;
210     }
211     public void setProducerId(ProducerId producerId) {
212         this.producerId = producerId;
213     }
214     
215     /**
216      * @openwire:property version=1 cache=true
217      */

218     public ActiveMQDestination getDestination() {
219         return destination;
220     }
221     public void setDestination(ActiveMQDestination destination) {
222         this.destination = destination;
223     }
224
225     /**
226      * @openwire:property version=1 cache=true
227      */

228     public TransactionId getTransactionId() {
229         return transactionId;
230     }
231     public void setTransactionId(TransactionId transactionId) {
232         this.transactionId = transactionId;
233     }
234
235     public boolean isInTransaction() {
236         return transactionId!=null;
237     }
238
239      
240     /**
241      * @openwire:property version=1 cache=true
242      */

243     public ActiveMQDestination getOriginalDestination() {
244         return originalDestination;
245     }
246     public void setOriginalDestination(ActiveMQDestination destination) {
247         this.originalDestination = destination;
248     }
249
250     /**
251      * @openwire:property version=1
252      */

253     public MessageId getMessageId() {
254         return messageId;
255     }
256        
257     public void setMessageId(MessageId messageId) {
258         this.messageId = messageId;
259     }
260     
261     /**
262      * @openwire:property version=1 cache=true
263      */

264     public TransactionId getOriginalTransactionId() {
265         return originalTransactionId;
266     }
267     public void setOriginalTransactionId(TransactionId transactionId) {
268         this.originalTransactionId = transactionId;
269     }
270     
271     /**
272      * @openwire:property version=1
273      */

274     public String JavaDoc getGroupID() {
275         return groupID;
276     }
277     public void setGroupID(String JavaDoc groupID) {
278         this.groupID = groupID;
279     }
280
281     /**
282      * @openwire:property version=1
283      */

284     public int getGroupSequence() {
285         return groupSequence;
286     }
287     public void setGroupSequence(int groupSequence) {
288         this.groupSequence = groupSequence;
289     }
290
291     /**
292      * @openwire:property version=1
293      */

294     public String JavaDoc getCorrelationId() {
295         return correlationId;
296     }
297     public void setCorrelationId(String JavaDoc correlationId) {
298         this.correlationId = correlationId;
299     }
300     
301     /**
302      * @openwire:property version=1
303      */

304     public boolean isPersistent() {
305         return persistent;
306     }
307     public void setPersistent(boolean deliveryMode) {
308         this.persistent = deliveryMode;
309     }
310
311     /**
312      * @openwire:property version=1
313      */

314     public long getExpiration() {
315         return expiration;
316     }
317     public void setExpiration(long expiration) {
318         this.expiration = expiration;
319     }
320
321     /**
322      * @openwire:property version=1
323      */

324     public byte getPriority() {
325         return priority;
326     }
327     public void setPriority(byte priority) {
328         this.priority = priority;
329     }
330
331     /**
332      * @openwire:property version=1
333      */

334     public ActiveMQDestination getReplyTo() {
335         return replyTo;
336     }
337     public void setReplyTo(ActiveMQDestination replyTo) {
338         this.replyTo = replyTo;
339     }
340
341     /**
342      * @openwire:property version=1
343      */

344     public long getTimestamp() {
345         return timestamp;
346     }
347     public void setTimestamp(long timestamp) {
348         this.timestamp = timestamp;
349     }
350
351     /**
352      * @openwire:property version=1
353      */

354     public String JavaDoc getType() {
355         return type;
356     }
357     public void setType(String JavaDoc type) {
358         this.type = type;
359     }
360
361     /**
362      * @openwire:property version=1
363      */

364     public ByteSequence getContent() {
365         return content;
366     }
367     public void setContent(ByteSequence content) {
368         this.content = content;
369     }
370
371     /**
372      * @openwire:property version=1
373      */

374     public ByteSequence getMarshalledProperties() {
375         return marshalledProperties;
376     }
377     public void setMarshalledProperties(ByteSequence marshalledProperties) {
378         this.marshalledProperties = marshalledProperties;
379     }
380
381     /**
382      * @openwire:property version=1
383      */

384     public DataStructure getDataStructure() {
385         return dataStructure;
386     }
387     public void setDataStructure(DataStructure data) {
388         this.dataStructure = data;
389     }
390
391     /**
392      * Can be used to route the message to a specific consumer. Should
393      * be null to allow the broker use normal JMS routing semantics. If
394      * the target consumer id is an active consumer on the broker, the message
395      * is dropped. Used by the AdvisoryBroker to replay advisory messages
396      * to a specific consumer.
397      *
398      * @openwire:property version=1 cache=true
399      */

400     public ConsumerId getTargetConsumerId() {
401         return targetConsumerId;
402     }
403     public void setTargetConsumerId(ConsumerId targetConsumerId) {
404         this.targetConsumerId = targetConsumerId;
405     }
406
407     public boolean isExpired() {
408         long expireTime = getExpiration();
409         if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
410             return true;
411         }
412         return false;
413     }
414     
415     public boolean isAdvisory(){
416         return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
417     }
418     
419     /**
420      * @openwire:property version=1
421      */

422     public boolean isCompressed() {
423         return compressed;
424     }
425     public void setCompressed(boolean compressed) {
426         this.compressed = compressed;
427     }
428     
429     public boolean isRedelivered() {
430         return redeliveryCounter>0;
431     }
432     
433     public void setRedelivered(boolean redelivered) {
434         if( redelivered ) {
435             if( !isRedelivered() ) {
436                 setRedeliveryCounter(1);
437             }
438         } else {
439             if( isRedelivered() ) {
440                 setRedeliveryCounter(0);
441             }
442         }
443     }
444
445     public void incrementRedeliveryCounter() {
446         redeliveryCounter++;
447     }
448
449     /**
450      * @openwire:property version=1
451      */

452     public int getRedeliveryCounter() {
453         return redeliveryCounter;
454     }
455     public void setRedeliveryCounter(int deliveryCounter) {
456         this.redeliveryCounter = deliveryCounter;
457     }
458
459     /**
460      * The route of brokers the command has moved through.
461      *
462      * @openwire:property version=1 cache=true
463      */

464     public BrokerId[] getBrokerPath() {
465         return brokerPath;
466     }
467     public void setBrokerPath(BrokerId[] brokerPath) {
468         this.brokerPath = brokerPath;
469     }
470     
471     public boolean isReadOnlyProperties() {
472         return readOnlyProperties;
473     }
474     public void setReadOnlyProperties(boolean readOnlyProperties) {
475         this.readOnlyProperties = readOnlyProperties;
476     }
477     public boolean isReadOnlyBody() {
478         return readOnlyBody;
479     }
480     public void setReadOnlyBody(boolean readOnlyBody) {
481         this.readOnlyBody = readOnlyBody;
482     }
483
484     public ActiveMQConnection getConnection() {
485         return this.connection;
486     }
487     public void setConnection(ActiveMQConnection connection) {
488         this.connection = connection;
489     }
490
491     /**
492      * Used to schedule the arrival time of a message to a broker. The broker will
493      * not dispatch a message to a consumer until it's arrival time has elapsed.
494      *
495      * @openwire:property version=1
496      */

497     public long getArrival() {
498         return arrival;
499     }
500     public void setArrival(long arrival) {
501         this.arrival = arrival;
502     }
503
504     
505     /**
506      * Only set by the broker and defines the userID of the producer connection who
507      * sent this message. This is an optional field, it needs to be enabled on the
508      * broker to have this field populated.
509      *
510      * @openwire:property version=1
511      */

512     public String JavaDoc getUserID() {
513         return userID;
514     }
515
516     public void setUserID(String JavaDoc jmsxUserID) {
517         this.userID = jmsxUserID;
518     }
519
520     public int getReferenceCount() {
521         return referenceCount;
522     }
523     
524     public Message getMessageHardRef() {
525         return this;
526     }
527
528     public Message getMessage() throws IOException JavaDoc {
529         return this;
530     }
531
532     public org.apache.activemq.broker.region.Destination getRegionDestination() {
533         return regionDestination;
534     }
535
536     public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
537         this.regionDestination = destination;
538     }
539
540     public boolean isMarshallAware() {
541         return true;
542     }
543         
544     public int incrementReferenceCount() {
545         int rc;
546         int size;
547         synchronized (this) {
548             rc = ++referenceCount;
549             size = getSize();
550         }
551         
552         if( rc==1 && regionDestination!=null )
553             regionDestination.getUsageManager().increaseUsage(size);
554         
555 // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+" "+rc);
556
return rc;
557     }
558     
559     synchronized public int decrementReferenceCount() {
560         int rc;
561         int size;
562         synchronized (this) {
563             rc = --referenceCount;
564             size = getSize();
565         }
566         
567         if( rc==0 && regionDestination!=null )
568             regionDestination.getUsageManager().decreaseUsage(size);
569
570 // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+" "+rc);
571

572         return rc;
573     }
574
575     public int getSize() {
576         if( size <=AVERAGE_MESSAGE_SIZE_OVERHEAD ) {
577             size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
578             if( marshalledProperties!=null )
579                 size += marshalledProperties.getLength();
580             if( content!=null )
581                 size += content.getLength();
582         }
583         return size;
584     }
585
586     /**
587      * @openwire:property version=1
588      * @return Returns the recievedByDFBridge.
589      */

590     public boolean isRecievedByDFBridge(){
591         return recievedByDFBridge;
592     }
593
594     /**
595      * @param recievedByDFBridge The recievedByDFBridge to set.
596      */

597     public void setRecievedByDFBridge(boolean recievedByDFBridge){
598         this.recievedByDFBridge=recievedByDFBridge;
599     }
600
601     public void onMessageRolledBack() {
602         incrementRedeliveryCounter();
603     }
604
605     /**
606      * @openwire:property version=2 cache=true
607      */

608     public boolean isDroppable() {
609         return droppable;
610     }
611     public void setDroppable(boolean droppable) {
612         this.droppable = droppable;
613     }
614 }
615
Popular Tags