KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > protocol > MantaBusMessage


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 /*
47  * Created on Dec 10, 2003
48  *
49  * this is the container of the pay load and the Surrounding Headers
50  *
51  */

52 package org.mr.core.protocol;
53
54 import java.io.IOException JavaDoc;
55 import java.nio.ByteBuffer JavaDoc;
56 import java.security.MessageDigest JavaDoc;
57 import java.util.HashMap JavaDoc;
58 import java.util.Iterator JavaDoc;
59 import java.util.Map JavaDoc;
60
61 import org.apache.commons.logging.LogFactory;
62 import org.mr.MantaAgent;
63 import org.mr.core.net.MantaAddress;
64 import org.mr.core.util.Prioritizeable;
65 import org.mr.core.util.SystemTime;
66 import org.mr.core.util.byteable.Byteable;
67 import org.mr.core.util.byteable.ByteableInputStream;
68 import org.mr.core.util.byteable.ByteableOutputStream;
69 import org.mr.core.util.byteable.ByteableRegistry;
70 import org.mr.kernel.UniqueIDGenerator;
71 import org.mr.kernel.delivery.NetworkModerator;
72 import org.mr.kernel.delivery.PostOfficeBox;
73
74 /**
75  *
76  * MantaBusMessage holds the payload to be sent to a remote computer along with data about
77  * this messages and the transport information
78  *
79  * Message are send by there priority (that way it is Prioritizeable)
80  * messages are turned into bytes and sent on the network and kept on the disk if needed (that is way it is Byteable)
81  * @author Amir Shevat
82  *
83  *
84  */

85 public class MantaBusMessage implements Byteable ,Prioritizeable{
86     
87     
88     /**
89      * The message id
90      */

91     private long messagesId ;
92     /**
93      * the message type see MantaBusMessageConsts for valid types
94      */

95     private byte messageType;
96     /**
97      * where this message is sent to
98      */

99     private RecipientAddress recipient;
100     /**
101      * where this message was sent From
102      */

103     private MantaAddress source;
104     // PRIORETY
105
private byte priority ;
106     //DELIVERY MODE - persistent/not
107
private byte deliveryMode ;
108     
109     // VALID UNTILL
110
private long validUntil = Long.MAX_VALUE;
111     // delivery count the number of time this message has been delivered
112
private int deliveryCount = 0;
113     //Header container
114
private HashMap JavaDoc elements = new HashMap JavaDoc();
115     // the payload to be moved
116
private PayloadContainer payloadContainer;
117     
118     // after network use we release the buffer
119
private ByteBuffer JavaDoc networkHeaderBuffer;
120     //after network use we release the buffer
121
// private ByteBuffer networkBodyBuffer;
122
// the message's MD5 signature, created by the sender (not always present)
123
private byte[] messageMD5;
124     // the message's partial MD5 signature, created by the receiver,
125
// not including the password (not always present)
126
private MessageDigest JavaDoc partialMD5;
127
128     // if true, this message is rerouted, i.e. this peer is not the
129
// original sender of the message. rerouted messages are handled
130
// differently by the delivery module.
131
private boolean rerouted = false;
132
133     private static boolean lazyParsing = false;
134     
135     
136     /**
137      * use the factory method getInstance
138      *
139      */

140     private MantaBusMessage(){
141         
142     }
143     
144     /**
145      * large number of communications might result in lots of MantaBusMessage
146      * therefore there might be a need to poll this object (it is a short lived object)
147      * @return MantaBusMessage a new or "fresh" MantaBusMessage
148      */

149     public static MantaBusMessage getInstance(){
150         MantaBusMessage result =new MantaBusMessage();
151         result.messagesId =UniqueIDGenerator.getNextMessageID(); //result.addHeader(MantaBusMessageConsts.HEADER_NAME_MESSAGE_ID , );
152
return result;
153     }
154
155     public static void setLazyParsing() {
156         MantaBusMessage.lazyParsing = true;
157     }
158
159     public static boolean isLazyParsing() {
160         return MantaBusMessage.lazyParsing;
161     }
162
163     /**
164      * adds a header to this manta message
165      * @param headerName the key of the header
166      * @param headerVlaue the value of the header
167      * @return this message
168      */

169     public final MantaBusMessage addHeader(String JavaDoc headerName ,String JavaDoc headerVlaue){
170         elements.put(headerName , headerVlaue);
171         return this;
172     }
173     
174     /**
175      * removes the header if found
176      * @param headerName the header name
177      * @return
178      */

179     public final MantaBusMessage removeHeader(String JavaDoc headerName){
180         elements.remove(headerName);
181         return this;
182     }
183     
184     /**
185      * returns the value of a given header or null if not found
186      * @param headerName the name of the header
187      * @return the value of the header (string)
188      */

189     public final String JavaDoc getHeader(String JavaDoc headerName){
190         return (String JavaDoc)elements.get(headerName);
191     }
192     
193     /**
194      *
195      * @param address the address of the destination
196      * @return a reference to this object
197      */

198     public final MantaBusMessage setRecipient(RecipientAddress adress){
199         recipient = adress;
200         return this;
201     }
202     
203     
204     /**
205      * @return the list of mantaAddresses that are the Destination of this payload
206      */

207     public final RecipientAddress getRecipient(){
208         return recipient ;
209     }
210     
211     /**
212      * will return the payload set on the sending peer
213      * @return Returns the payload.
214      */

215     public final Byteable getPayload() {
216         if(payloadContainer == null)
217             return null;
218         try {
219             return payloadContainer.getPayloadObject();
220         } catch (IOException JavaDoc e) {
221             LogFactory.getLog("mantaBusMessage").error("error in geting payload ", e);
222             return null;
223         }
224     }
225
226     /**
227      * sets the payload (byteable object)
228      * if you are not using the class directly (without the JMS) use byteable map as payload
229      * @param payload The payload to set.
230      * @see org.mr.core.util.byteable.ByteableMap
231      */

232     public final void setPayload(Byteable payload) {
233         this.payloadContainer = new PayloadContainer(payload);
234     }
235
236     /**
237      * returns the address of the sender of the message
238      * @return Returns the source.
239      */

240     public final MantaAddress getSource() {
241         return source;
242     }
243
244     /**
245      * sets the address of the sender of the message
246      * @param source The source to set.
247      */

248     public final void setSource(MantaAddress source) {
249         this.source = source;
250     }
251     
252     /**
253      * for client use -> use type MantaBusMessageConsts.MESSAGE_TYPE_CLIENT
254      * @return Returns the messageType.
255      */

256     public final byte getMessageType() {
257         return messageType;//(String) elements.get(MantaBusMessageConsts.HEADER_NAME_MESSAGE_TYPE);
258
}
259
260     /**
261      * @param messageType The messageType to set.
262      */

263     public final void setMessageType(byte messageType) {
264         this.messageType = messageType; //elements.put(MantaBusMessageConsts.HEADER_NAME_MESSAGE_TYPE ,messageType.toString() ) ;
265
}
266
267     /**
268      * @return Returns the elements.
269      */

270     public final HashMap JavaDoc getElements() {
271         return elements;
272     }
273     
274     
275     
276     
277     
278     /**
279      * @return a string representation of the object
280      */

281     public final String JavaDoc toString(){
282         StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
283         buff.append("MANTA MESSAGE : ");
284         buff.append(" id=");
285         buff.append(messagesId);
286         buff.append(" priority=");
287         buff.append(priority);
288         buff.append(" deliveryMode=");
289         buff.append(deliveryMode);
290         buff.append(" deliveryCount=");
291         buff.append(deliveryCount);
292         buff.append(" valid for ");
293         buff.append( validUntil- SystemTime.gmtCurrentTimeMillis());
294         buff.append(" more millis, ");
295         buff.append(" recipient = ");
296         buff.append(recipient);
297
298         buff.append(" source =");
299         buff.append(source);
300         buff.append(" Headers =");
301         buff.append(elements);
302         buff.append(" payloadContainer = ");
303         buff.append(payloadContainer);
304         
305         return buff.toString();
306         
307     }
308         
309     private String JavaDoc msgIdStr;
310     
311     /**
312      * @return the message id (kept as integer) as String
313      */

314     public final synchronized String JavaDoc getMessageId(){
315         if(msgIdStr == null)
316             msgIdStr = String.valueOf(messagesId);
317         return msgIdStr;
318     }
319     
320     /**
321      * @return the message id
322      */

323     public final long getMessageIdAsLong(){
324         
325         return messagesId;
326     }
327     
328     
329     /**
330      * sets the
331      * @param destination
332      */

333     public final void setLogicalDestination(String JavaDoc destination){
334         addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION , destination);
335     }
336     
337     
338     public final String JavaDoc getLogicalDestination(){
339         return getHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION);
340     }
341
342     
343
344     
345
346     /**
347      * see MantaAgentConstants for valid types (PERSISTENT or NON_PERSISTENT)
348      * @param deliveryMode The delivery Mode to set.
349      * @see org.mr.MantaAgentConstants
350      */

351     public final byte getDeliveryMode() {
352         return deliveryMode;
353     }
354
355     /**
356      * see MantaAgentConstants for valid types (PERSISTENT or NON_PERSISTENT)
357      * @param deliveryMode The delivery Mode to set.
358      * @see org.mr.MantaAgentConstants
359      */

360     public final void setDeliveryMode(byte deliveryMode) {
361         this.deliveryMode = deliveryMode;
362     }
363
364     /**
365      * @return Returns the priority.
366      */

367     public final byte getPriority() {
368         return priority;
369     }
370
371     /**
372      * @param priority The priority to set.
373      */

374     public final void setPriority(byte priorety) {
375         this.priority = priorety;
376     }
377
378     /**
379      * @return Returns the validUntil.
380      */

381     public final long getValidUntil() {
382         return validUntil;
383     }
384
385     /**
386      * @param validUntil The validUntil to set.
387      */

388     public final void setValidUntil(long validUntil) {
389                     
390         if(validUntil >0)
391             this.validUntil = validUntil;
392         else
393             this.validUntil = Long.MAX_VALUE;
394     }
395     
396     static final String JavaDoc ByteableName = "MantaBM";
397     /* (non-Javadoc)
398      * @see org.mr.core.util.byteable.Byteable#getByteableName()
399      */

400     public final String JavaDoc getByteableName() {
401         return ByteableName;
402     }
403
404     /* (non-Javadoc)
405      * @see org.mr.core.util.byteable.Byteable#toBytes(org.mr.core.util.byteable.ByteableOutputStream)
406      */

407     public final void toBytes(ByteableOutputStream out) throws IOException JavaDoc {
408         writeHeaders(out);
409         //write the payloadContainer
410
if(payloadContainer != null){
411             ByteBuffer JavaDoc buf = payloadContainer.getSerializedPayload();
412             out.write(buf.array(), 0,buf.limit() );
413             payloadContainer.release();
414         }else{
415             out.writeByteable(null);
416         }
417         
418     
419     }
420     
421     public final void writeHeaders(ByteableOutputStream out) throws IOException JavaDoc {
422 // start the parse
423
Map JavaDoc data = this.getElements();
424         // write the manta addresses
425
//destinations:
426
MantaAddress address;
427         out.writeByteable(recipient);
428         
429         // source :
430
address = this.getSource();
431         out.writeByteable(address);
432         
433         // headers
434
//ID
435
out.writeLong(messagesId);
436         // type
437
out.writeByte(messageType);
438         // PRIORETY
439
out.writeByte(this.getPriority() );
440         //DELIVERY MODE
441
out.writeByte( this.getDeliveryMode());
442         // VALID UNTILL
443
out.writeLong(this.getValidUntil());
444         // delivery count
445
out.writeInt(this.getDeliveryCount());
446         // write the Optional headers
447
byte numberOfHeaders =(byte)data.size();
448         out.writeByte(numberOfHeaders);
449         Iterator JavaDoc iter = data.keySet().iterator();
450         while(iter.hasNext()){
451             String JavaDoc headerName =(String JavaDoc) iter.next();
452             String JavaDoc headerValue =(String JavaDoc) data.get(headerName);
453             out.writeASCIIString(headerName);
454             out.writeASCIIString(headerValue);
455         }
456     }
457
458     /* (non-Javadoc)
459      * @see org.mr.core.util.byteable.Byteable#createInstance(org.mr.core.util.byteable.ByteableInputStream)
460      */

461     public final Byteable createInstance(ByteableInputStream in) throws IOException JavaDoc {
462         MantaBusMessage result = MantaBusMessage.getInstance();
463         
464         // read the manta addresses
465
//destinations:
466
MantaAddress address;
467         
468         result.setRecipient((RecipientAddress) in.readByteable());
469         // source :
470
address =(MantaAddress)in.readByteable();
471         result.setSource(address);
472         // headers
473
//ID
474
result.messagesId = in.readLong();
475         // type
476
result.messageType = in.readByte();
477         // PRIORETY
478
result.setPriority(in.readByte());
479         //DELIVERY MODE
480
result.setDeliveryMode(in.readByte()) ;
481         // VALID UNTILL
482
result.setValidUntil(in.readLong()) ;
483         // delivery count
484
result.setDeliveryCount(in.readInt());
485         
486         // read the Optional headers
487
byte numberOfHeaders =in.readByte();
488         for(int count =0 ; count < numberOfHeaders ; count++){
489             String JavaDoc headerName =in.readASCIIString();
490             String JavaDoc headerValue =in.readASCIIString();
491             result.addHeader(headerName ,headerValue );
492         }
493         // read the payloadContainer
494
if (MantaBusMessage.lazyParsing) {
495             result.setPayloadContainer(new PayloadContainer(in.getUnderlying()));
496         } else {
497             result.setPayloadContainer(new PayloadContainer(in.readByteable()));
498         }
499         
500         return result;
501     }
502     
503     
504     /* (non-Javadoc)
505      * @see org.mr.core.util.byteable.Byteable#registerToByteableRegistry()
506      */

507     public void registerToByteableRegistry() {
508         ByteableRegistry.registerByteableFactory(getByteableName() , this);
509         
510     }
511     
512     public static void register(){
513         MantaBusMessage instance = new MantaBusMessage();
514         instance.registerToByteableRegistry();
515     }
516
517     /**
518      * @return Returns the deliveryCount.
519      */

520     public int getDeliveryCount() {
521         return deliveryCount;
522     }
523     /**
524      * @param deliveryCount The deliveryCount to set.
525      */

526     public void setDeliveryCount(int deliveryCount) {
527         this.deliveryCount = deliveryCount;
528     }
529     
530     public boolean isRedelivered(){
531         return deliveryCount >1;
532     }
533     
534     
535     /**
536      * @return Returns the realNetAddress.
537      */

538     public MantaAddress getRealNetAddress() {
539         return recipient;
540     }
541     
542     /**
543      * @param messagesId The messagesId to set.
544      */

545     public void setMessagesId(long messagesId) {
546         this.messagesId = messagesId;
547     }
548     /**
549      * @return Returns the payloadContainer.
550      */

551     public PayloadContainer getPayloadContainer() {
552         return payloadContainer;
553     }
554     /**
555      * @param payloadContainer The payloadContainer to set.
556      */

557     public void setPayloadContainer(PayloadContainer payloadContainer) {
558         this.payloadContainer = payloadContainer;
559     }
560     
561     /**
562      * used by the network to decouple the message header from the payload
563      * this is done Because sometimes the payload is shared and has already been serialized
564      * @return 2 byte buffers 1 for headers and 1 for payload
565      * @throws IOException if serializetion fails
566      */

567     public ByteBuffer JavaDoc[] getNetBuffers() throws IOException JavaDoc{
568         
569         ByteBuffer JavaDoc[] networkBuffers = new ByteBuffer JavaDoc[2];
570         ByteableOutputStream out = new ByteableOutputStream( MessageTransformer.messageHeaderBufferPool);
571         out.writeASCIIString(getByteableName());
572         writeHeaders(out);
573          
574         networkBuffers[0] = out.getByteBuffer().duplicate();
575         networkHeaderBuffer = networkBuffers[0];
576         //if payload container was not set create it now (we might need to change this)
577
if(payloadContainer == null){
578             payloadContainer = new PayloadContainer((Byteable) null);
579         }
580         networkBuffers[1] = payloadContainer.getSerializedPayload();
581
582         return networkBuffers;
583     }
584  
585     public final void release(boolean SentSuccessful) {
586         PostOfficeBox pob = MantaAgent.getInstance().getSingletonRepository()
587         .getPostOffice().getPostOfficeBox(recipient.getId());
588         if(pob!= null){
589             NetworkModerator mod = pob.getModerator();
590             if(SentSuccessful){
591                 mod.messageSentByNetwork(this);
592             }else{
593                 mod.messageSendFailByNetwork(this);
594             }
595         }
596         
597     }//release
598

599     public synchronized final void releaseBuffers(){
600         if(networkHeaderBuffer != null){
601             MessageTransformer.messageHeaderBufferPool.release(networkHeaderBuffer);
602             networkHeaderBuffer = null;
603             
604         }
605         if (payloadContainer != null) {
606             this.payloadContainer.release();
607         }
608         
609     }
610
611     /**
612      * Set the MD5 signature that was calculated by the message
613      * sender.
614      * @param md5 the message's MD5 signature
615      */

616     public void setMessageMD5(byte[] md5) {
617         this.messageMD5 = md5;
618     }
619     
620     /**
621      * Get the MD5 signature that was calculated by the message
622      * sender.
623      * @param md5 the message's MD5 signature
624      */

625     public byte[] getMessageMD5() {
626         return this.messageMD5;
627     }
628
629     /**
630      * Set the partial MD5 signature of the message. This object is
631      * created by the network layer upon receiving the message.
632      */

633     public void setPartialMD5(MessageDigest JavaDoc partialMD5) {
634         this.partialMD5 = partialMD5;
635     }
636
637     /**
638      * Get the partial MD5 signature of the message. This object is
639      * created by the network layer upon receiving the message. To
640      * complete the signature, the digest needs to be updated with the
641      * password the sending agent used to sign the message with.
642      */

643     public MessageDigest JavaDoc getPartialMD5() {
644         return this.partialMD5;
645     }
646
647     public boolean isRerouted() {
648         return this.rerouted;
649     }
650
651     /**
652      * this is set if the message is rerouted, that is, not originated
653      * from this peer. currently used by the MWB.
654      */

655     public void setRerouted() {
656         this.rerouted = true;
657     }
658 }
659
Popular Tags