1 46 52 package org.mr.core.protocol; 53 54 import java.io.IOException ; 55 import java.nio.ByteBuffer ; 56 import java.security.MessageDigest ; 57 import java.util.HashMap ; 58 import java.util.Iterator ; 59 import java.util.Map ; 60 61 import org.apache.commons.logging.LogFactory; 62 import org.mr.MantaAgent; 63 import org.mr.core.net.MantaAddress; 64 import org.mr.core.util.Prioritizeable; 65 import org.mr.core.util.SystemTime; 66 import org.mr.core.util.byteable.Byteable; 67 import org.mr.core.util.byteable.ByteableInputStream; 68 import org.mr.core.util.byteable.ByteableOutputStream; 69 import org.mr.core.util.byteable.ByteableRegistry; 70 import org.mr.kernel.UniqueIDGenerator; 71 import org.mr.kernel.delivery.NetworkModerator; 72 import org.mr.kernel.delivery.PostOfficeBox; 73 74 85 public class MantaBusMessage implements Byteable ,Prioritizeable{ 86 87 88 91 private long messagesId ; 92 95 private byte messageType; 96 99 private RecipientAddress recipient; 100 103 private MantaAddress source; 104 private byte priority ; 106 private byte deliveryMode ; 108 109 private long validUntil = Long.MAX_VALUE; 111 private int deliveryCount = 0; 113 private HashMap elements = new HashMap (); 115 private PayloadContainer payloadContainer; 117 118 private ByteBuffer networkHeaderBuffer; 120 private byte[] messageMD5; 124 private MessageDigest partialMD5; 127 128 private boolean rerouted = false; 132 133 private static boolean lazyParsing = false; 134 135 136 140 private MantaBusMessage(){ 141 142 } 143 144 149 public static MantaBusMessage getInstance(){ 150 MantaBusMessage result =new MantaBusMessage(); 151 result.messagesId =UniqueIDGenerator.getNextMessageID(); return result; 153 } 154 155 public static void setLazyParsing() { 156 MantaBusMessage.lazyParsing = true; 157 } 158 159 public static boolean isLazyParsing() { 160 return MantaBusMessage.lazyParsing; 161 } 162 163 169 public final MantaBusMessage addHeader(String headerName ,String headerVlaue){ 170 elements.put(headerName , headerVlaue); 171 return this; 172 } 173 174 179 public final MantaBusMessage removeHeader(String headerName){ 180 elements.remove(headerName); 181 return this; 182 } 183 184 189 public final String getHeader(String headerName){ 190 return (String )elements.get(headerName); 191 } 192 193 198 public final MantaBusMessage setRecipient(RecipientAddress adress){ 199 recipient = adress; 200 return this; 201 } 202 203 204 207 public final RecipientAddress getRecipient(){ 208 return recipient ; 209 } 210 211 215 public final Byteable getPayload() { 216 if(payloadContainer == null) 217 return null; 218 try { 219 return payloadContainer.getPayloadObject(); 220 } catch (IOException e) { 221 LogFactory.getLog("mantaBusMessage").error("error in geting payload ", e); 222 return null; 223 } 224 } 225 226 232 public final void setPayload(Byteable payload) { 233 this.payloadContainer = new PayloadContainer(payload); 234 } 235 236 240 public final MantaAddress getSource() { 241 return source; 242 } 243 244 248 public final void setSource(MantaAddress source) { 249 this.source = source; 250 } 251 252 256 public final byte getMessageType() { 257 return messageType; } 259 260 263 public final void setMessageType(byte messageType) { 264 this.messageType = messageType; } 266 267 270 public final HashMap getElements() { 271 return elements; 272 } 273 274 275 276 277 278 281 public final String toString(){ 282 StringBuffer buff = new StringBuffer (); 283 buff.append("MANTA MESSAGE : "); 284 buff.append(" id="); 285 buff.append(messagesId); 286 buff.append(" priority="); 287 buff.append(priority); 288 buff.append(" deliveryMode="); 289 buff.append(deliveryMode); 290 buff.append(" deliveryCount="); 291 buff.append(deliveryCount); 292 buff.append(" valid for "); 293 buff.append( validUntil- SystemTime.gmtCurrentTimeMillis()); 294 buff.append(" more millis, "); 295 buff.append(" recipient = "); 296 buff.append(recipient); 297 298 buff.append(" source ="); 299 buff.append(source); 300 buff.append(" Headers ="); 301 buff.append(elements); 302 buff.append(" payloadContainer = "); 303 buff.append(payloadContainer); 304 305 return buff.toString(); 306 307 } 308 309 private String msgIdStr; 310 311 314 public final synchronized String getMessageId(){ 315 if(msgIdStr == null) 316 msgIdStr = String.valueOf(messagesId); 317 return msgIdStr; 318 } 319 320 323 public final long getMessageIdAsLong(){ 324 325 return messagesId; 326 } 327 328 329 333 public final void setLogicalDestination(String destination){ 334 addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION , destination); 335 } 336 337 338 public final String getLogicalDestination(){ 339 return getHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION); 340 } 341 342 343 344 345 346 351 public final byte getDeliveryMode() { 352 return deliveryMode; 353 } 354 355 360 public final void setDeliveryMode(byte deliveryMode) { 361 this.deliveryMode = deliveryMode; 362 } 363 364 367 public final byte getPriority() { 368 return priority; 369 } 370 371 374 public final void setPriority(byte priorety) { 375 this.priority = priorety; 376 } 377 378 381 public final long getValidUntil() { 382 return validUntil; 383 } 384 385 388 public final void setValidUntil(long validUntil) { 389 390 if(validUntil >0) 391 this.validUntil = validUntil; 392 else 393 this.validUntil = Long.MAX_VALUE; 394 } 395 396 static final String ByteableName = "MantaBM"; 397 400 public final String getByteableName() { 401 return ByteableName; 402 } 403 404 407 public final void toBytes(ByteableOutputStream out) throws IOException { 408 writeHeaders(out); 409 if(payloadContainer != null){ 411 ByteBuffer buf = payloadContainer.getSerializedPayload(); 412 out.write(buf.array(), 0,buf.limit() ); 413 payloadContainer.release(); 414 }else{ 415 out.writeByteable(null); 416 } 417 418 419 } 420 421 public final void writeHeaders(ByteableOutputStream out) throws IOException { 422 Map data = this.getElements(); 424 MantaAddress address; 427 out.writeByteable(recipient); 428 429 address = this.getSource(); 431 out.writeByteable(address); 432 433 out.writeLong(messagesId); 436 out.writeByte(messageType); 438 out.writeByte(this.getPriority() ); 440 out.writeByte( this.getDeliveryMode()); 442 out.writeLong(this.getValidUntil()); 444 out.writeInt(this.getDeliveryCount()); 446 byte numberOfHeaders =(byte)data.size(); 448 out.writeByte(numberOfHeaders); 449 Iterator iter = data.keySet().iterator(); 450 while(iter.hasNext()){ 451 String headerName =(String ) iter.next(); 452 String headerValue =(String ) data.get(headerName); 453 out.writeASCIIString(headerName); 454 out.writeASCIIString(headerValue); 455 } 456 } 457 458 461 public final Byteable createInstance(ByteableInputStream in) throws IOException { 462 MantaBusMessage result = MantaBusMessage.getInstance(); 463 464 MantaAddress address; 467 468 result.setRecipient((RecipientAddress) in.readByteable()); 469 address =(MantaAddress)in.readByteable(); 471 result.setSource(address); 472 result.messagesId = in.readLong(); 475 result.messageType = in.readByte(); 477 result.setPriority(in.readByte()); 479 result.setDeliveryMode(in.readByte()) ; 481 result.setValidUntil(in.readLong()) ; 483 result.setDeliveryCount(in.readInt()); 485 486 byte numberOfHeaders =in.readByte(); 488 for(int count =0 ; count < numberOfHeaders ; count++){ 489 String headerName =in.readASCIIString(); 490 String headerValue =in.readASCIIString(); 491 result.addHeader(headerName ,headerValue ); 492 } 493 if (MantaBusMessage.lazyParsing) { 495 result.setPayloadContainer(new PayloadContainer(in.getUnderlying())); 496 } else { 497 result.setPayloadContainer(new PayloadContainer(in.readByteable())); 498 } 499 500 return result; 501 } 502 503 504 507 public void registerToByteableRegistry() { 508 ByteableRegistry.registerByteableFactory(getByteableName() , this); 509 510 } 511 512 public static void register(){ 513 MantaBusMessage instance = new MantaBusMessage(); 514 instance.registerToByteableRegistry(); 515 } 516 517 520 public int getDeliveryCount() { 521 return deliveryCount; 522 } 523 526 public void setDeliveryCount(int deliveryCount) { 527 this.deliveryCount = deliveryCount; 528 } 529 530 public boolean isRedelivered(){ 531 return deliveryCount >1; 532 } 533 534 535 538 public MantaAddress getRealNetAddress() { 539 return recipient; 540 } 541 542 545 public void setMessagesId(long messagesId) { 546 this.messagesId = messagesId; 547 } 548 551 public PayloadContainer getPayloadContainer() { 552 return payloadContainer; 553 } 554 557 public void setPayloadContainer(PayloadContainer payloadContainer) { 558 this.payloadContainer = payloadContainer; 559 } 560 561 567 public ByteBuffer [] getNetBuffers() throws IOException { 568 569 ByteBuffer [] networkBuffers = new ByteBuffer [2]; 570 ByteableOutputStream out = new ByteableOutputStream( MessageTransformer.messageHeaderBufferPool); 571 out.writeASCIIString(getByteableName()); 572 writeHeaders(out); 573 574 networkBuffers[0] = out.getByteBuffer().duplicate(); 575 networkHeaderBuffer = networkBuffers[0]; 576 if(payloadContainer == null){ 578 payloadContainer = new PayloadContainer((Byteable) null); 579 } 580 networkBuffers[1] = payloadContainer.getSerializedPayload(); 581 582 return networkBuffers; 583 } 584 585 public final void release(boolean SentSuccessful) { 586 PostOfficeBox pob = MantaAgent.getInstance().getSingletonRepository() 587 .getPostOffice().getPostOfficeBox(recipient.getId()); 588 if(pob!= null){ 589 NetworkModerator mod = pob.getModerator(); 590 if(SentSuccessful){ 591 mod.messageSentByNetwork(this); 592 }else{ 593 mod.messageSendFailByNetwork(this); 594 } 595 } 596 597 } 599 public synchronized final void releaseBuffers(){ 600 if(networkHeaderBuffer != null){ 601 MessageTransformer.messageHeaderBufferPool.release(networkHeaderBuffer); 602 networkHeaderBuffer = null; 603 604 } 605 if (payloadContainer != null) { 606 this.payloadContainer.release(); 607 } 608 609 } 610 611 616 public void setMessageMD5(byte[] md5) { 617 this.messageMD5 = md5; 618 } 619 620 625 public byte[] getMessageMD5() { 626 return this.messageMD5; 627 } 628 629 633 public void setPartialMD5(MessageDigest partialMD5) { 634 this.partialMD5 = partialMD5; 635 } 636 637 643 public MessageDigest getPartialMD5() { 644 return this.partialMD5; 645 } 646 647 public boolean isRerouted() { 648 return this.rerouted; 649 } 650 651 655 public void setRerouted() { 656 this.rerouted = true; 657 } 658 } 659 | Popular Tags |