1 30 31 package org.objectweb.fractal.rmi; 32 33 import org.objectweb.fractal.api.control.BindingController; 34 35 import org.objectweb.jonathan.apis.binding.BindException; 36 import org.objectweb.jonathan.apis.binding.ExportException; 37 import org.objectweb.jonathan.apis.binding.Identifier; 38 import org.objectweb.jonathan.apis.binding.NamingContext; 39 import org.objectweb.jonathan.apis.kernel.Context; 40 import org.objectweb.jonathan.apis.kernel.JonathanException; 41 import org.objectweb.jonathan.apis.presentation.Marshaller; 42 import org.objectweb.jonathan.apis.presentation.MarshallerFactory; 43 import org.objectweb.jonathan.apis.presentation.UnMarshaller; 44 import org.objectweb.jonathan.apis.protocols.Protocol; 45 import org.objectweb.jonathan.apis.protocols.ProtocolGraph; 46 import org.objectweb.jonathan.apis.protocols.ReplyInterface; 47 import org.objectweb.jonathan.apis.protocols.ReplySession; 48 import org.objectweb.jonathan.apis.protocols.RequestSession; 49 import org.objectweb.jonathan.apis.protocols.ServerException; 50 import org.objectweb.jonathan.apis.protocols.SessionIdentifier; 51 import org.objectweb.jonathan.apis.protocols.Session_High; 52 import org.objectweb.jonathan.apis.protocols.Session_Low; 53 import org.objectweb.jonathan.apis.resources.Chunk; 54 import org.objectweb.jonathan.apis.resources.Scheduler; 55 56 import org.objectweb.util.monolog.api.BasicLevel; 57 import org.objectweb.util.monolog.api.Logger; 58 import org.objectweb.util.monolog.api.LoggerFactory; 59 60 import java.util.Properties ; 61 62 99 100 public class RmiProtocol implements Protocol, BindingController { 101 102 106 107 protected NamingContext adapter; 108 109 112 113 protected MarshallerFactory marshallerFactory; 114 115 118 119 protected Scheduler scheduler; 120 121 124 125 protected LoggerFactory loggerFactory; 126 127 130 131 protected Logger logger; 132 133 private ClientSession_Low clientSessionLow; 134 135 private ServerSession_Low serverSessionLow; 136 137 private ReplyHolder[] table; 138 139 private ReplyHolder reusable; 140 141 private int size; 142 143 private int id; 144 145 148 149 public RmiProtocol () { 150 clientSessionLow = new ClientSession_Low(); 151 serverSessionLow = new ServerSession_Low(); 152 table = new ReplyHolder[17]; 153 } 154 155 159 public String [] listFc () { 160 return new String [] { 161 "adapter", 162 "marshaller-factory", 163 "scheduler", 164 "logger-factory" 165 }; 166 } 167 168 public Object lookupFc (final String clientItfName) { 169 if (clientItfName.equals("adapter")) { 170 return adapter; 171 } else if (clientItfName.equals("marshaller-factory")) { 172 return marshallerFactory; 173 } else if (clientItfName.equals("scheduler")) { 174 return scheduler; 175 } else if (clientItfName.equals("logger-factory")) { 176 return loggerFactory; 177 } 178 return null; 179 } 180 181 public void bindFc (final String clientItfName, final Object serverItf) { 182 if (clientItfName.equals("adapter")) { 183 adapter = (NamingContext)serverItf; 184 } else if (clientItfName.equals("marshaller-factory")) { 185 marshallerFactory = (MarshallerFactory)serverItf; 186 } else if (clientItfName.equals("scheduler")) { 187 scheduler = (Scheduler)serverItf; 188 } else if (clientItfName.equals("logger-factory")) { 189 loggerFactory = (LoggerFactory)serverItf; 190 logger = loggerFactory.getLogger(getClass().getName()); 191 } 192 } 193 194 public void unbindFc (final String clientItfName) { 195 if (clientItfName.equals("adapter")) { 196 adapter = null; 197 } else if (clientItfName.equals("marshaller-factory")) { 198 marshallerFactory = null; 199 } else if (clientItfName.equals("scheduler")) { 200 scheduler = null; 201 } else if (clientItfName.equals("logger-factory")) { 202 loggerFactory = null; 203 logger = null; 204 } 205 } 206 207 211 222 223 public boolean isAnInvocationProtocol () { 224 return true; 225 } 226 227 236 237 public ProtocolGraph createProtocolGraph ( 238 final ProtocolGraph[] subgraphs, 239 final Context hints) throws JonathanException 240 { 241 if (subgraphs.length != 1) { 242 throw new JonathanException("Lower layers badly specified in RMIP"); 243 } 244 return new Graph(subgraphs[0]); 245 } 246 247 255 256 public SessionIdentifier createSessionIdentifier ( 257 final Properties info, 258 final SessionIdentifier[] next) throws JonathanException 259 { 260 if (next.length != 1) { 261 throw new JonathanException("Lower layers badly specified in RMIP"); 262 } 263 byte[] key = (byte[])(info.get("object_key")); 264 return new CltSessionId(key, next[0]); 265 } 266 267 271 class Graph implements ProtocolGraph { 272 273 ProtocolGraph next; 274 275 public Graph (final ProtocolGraph next) { 276 this.next = next; 277 } 278 279 302 303 public SessionIdentifier export (final Session_Low ignored) 304 throws JonathanException 305 { 306 if (next == null) { 307 throw new ExportException("Badly specified participants"); 308 } 309 return new SrvSessionId(next.export(serverSessionLow)); 310 } 311 } 312 313 class SrvSessionId implements SessionIdentifier { 314 315 SessionIdentifier next; 316 317 public SrvSessionId (final SessionIdentifier next) { 318 this.next = next; 319 } 320 321 public Session_High bind (final Session_Low ignored) 322 throws JonathanException 323 { 324 throw new BindException("Bad session identifier type"); 325 } 326 327 public void unexport () { 328 next.unexport(); 329 } 330 331 public Protocol getProtocol () { 332 return RmiProtocol.this; 333 } 334 335 public SessionIdentifier[] next () { 336 return new SessionIdentifier[] {next}; 337 } 338 339 363 364 public int getProtocolId () { 365 return 0; 366 } 367 368 public Context getInfo () throws JonathanException { 369 throw new JonathanException("Not implemented"); 370 } 372 373 public boolean isLocal () { 374 return false; } 376 } 377 378 class CltSessionId extends SrvSessionId { 379 380 byte[] key; 381 382 public CltSessionId ( 383 final byte[] key, 384 final SessionIdentifier next) 385 { 386 super(next); 387 this.key = key; 388 } 389 390 public Session_High bind (final Session_Low ignored) 391 throws JonathanException 392 { 393 if (next == null) { 394 throw new BindException("Badly specified participants"); 395 } 396 return new ClientSession_High(key, next.bind(clientSessionLow)); 397 } 398 399 public void unexport () {} 400 401 439 440 public Context getInfo () throws JonathanException { 441 throw new JonathanException("Meaningless"); 442 } 443 444 public boolean isLocal() { 445 return next.isLocal(); 446 } 447 } 448 449 453 class ClientSession_Low implements Session_Low { 454 455 public void send ( 456 final UnMarshaller unmarshaller, 457 final Session_High sender) throws JonathanException 458 { 459 try { 460 int rqId = unmarshaller.readInt(); 461 ReplyHolder reply = getHolder(rqId); 462 if (reply == null) { 463 unmarshaller.close(); 465 if (logger != null && logger.isLoggable(BasicLevel.INFO)) { 466 logger.log( 467 BasicLevel.DEBUG, "RMIP Request #" + rqId + " not found"); 468 } 469 } else { 470 reply.sendReply(unmarshaller); 471 } 472 } catch (JonathanException e) { 473 unmarshaller.close(); 474 send(e, sender); 475 } 476 } 477 478 public void send (final JonathanException e, final Session_High sender) { 479 forwardException(e, sender); 480 } 481 } 482 483 class ServerSession_Low implements Session_Low { 484 485 public void send ( 486 final UnMarshaller unmarshaller, 487 final Session_High sender) throws JonathanException 488 { 489 boolean unmarshallerOpened = true; 490 int rqId = unmarshaller.readInt(); 491 RequestSession requestSession = null; 492 try { 493 int len = unmarshaller.readInt(); 494 byte[] key = new byte[len]; 495 unmarshaller.readByteArray(key,0,len); 496 Identifier id = adapter.decode(key, 0, len); 497 requestSession = (RequestSession)id.bind(null, null); 498 if (requestSession != null) { 499 ServerSession_High replySession = 500 new ServerSession_High(sender, rqId); 501 requestSession.send(unmarshaller, replySession); 502 return; 503 } 504 unmarshallerOpened = false; 505 unmarshaller.close(); 506 } catch (Exception e) { 507 if (logger != null && logger.isLoggable(BasicLevel.INFO)) { 508 logger.log(BasicLevel.DEBUG, "Exception caught in RMIP", e); 509 } 510 if (unmarshallerOpened) { 511 unmarshaller.close(); 512 } 513 sendException(e, rqId, sender); 514 } 515 } 516 517 public void send (final JonathanException e, final Session_High sender) { 518 if (logger != null && logger.isLoggable(BasicLevel.INFO)) { 519 logger.log( 520 BasicLevel.DEBUG, "Exception caught in RMIP related to " + sender, e); 521 } 522 sender.close(); 523 } 524 525 void sendException ( 526 final Exception e, 527 final int rqId, 528 final Session_High session) 529 { 530 try { 531 Marshaller marshaller = prepareReplyMessage(rqId, true); 532 marshaller.writeValue(e); 533 sendMessage(marshaller, session); 534 } catch (Exception ignored) { 535 } finally { 536 session.close(); 537 } 538 } 539 } 540 541 class RMIPSession_High { 542 543 Session_High lower; 544 545 public RMIPSession_High (final Session_High lower) { 546 this.lower = lower; 547 } 548 549 public boolean direct () { 550 return false; 551 } 552 553 public void send (final Marshaller marshaller) throws JonathanException { 554 sendMessage(marshaller, lower); 555 } 556 557 public void close() { 558 lower.close(); 559 } 560 } 561 562 class ClientSession_High extends RMIPSession_High implements Session_High { 563 564 byte[] key; 565 566 570 571 public ClientSession_High (final byte[] key, final Session_High lower) { 572 super(lower); 573 this.key = key; 574 } 575 576 public ReplyInterface prepareInvocation (final Marshaller marshaller) 577 throws JonathanException 578 { 579 ReplyHolder reply = registerHolder(lower); 580 marshaller.writeInt(reply.id); 581 int len = key.length; 582 marshaller.writeInt(len); 583 marshaller.writeByteArray(key, 0, len); 584 return reply; 585 } 586 587 public void prepare (final Marshaller marshaller) throws JonathanException { 588 marshaller.writeInt(0); 589 int len = key.length; 590 marshaller.writeInt(len); 591 marshaller.writeByteArray(key, 0, len); 592 } 593 } 594 595 class ServerSession_High extends RMIPSession_High implements ReplySession { 596 597 int rqId; 598 599 public ServerSession_High (final Session_High lower, final int rqId) { 600 super(lower); 601 this.rqId = rqId; 602 } 603 604 public Marshaller prepareReply () throws JonathanException { 605 return prepareReplyMessage(rqId, false); 606 } 607 608 public Marshaller prepareExceptionReply () throws JonathanException { 609 return prepareReplyMessage(rqId, true); 610 } 611 612 public Marshaller prepareSystemExceptionReply () throws JonathanException { 613 return prepareReplyMessage(rqId, true); 614 } 615 616 public Marshaller prepareLocationForwardReply () throws JonathanException { 617 return prepareReplyMessage(rqId, true); 618 } 619 } 620 621 623 void sendMessage (final Marshaller marshaller, final Session_High lower) 624 throws JonathanException 625 { 626 Chunk first = marshaller.getState(); 627 Chunk c = first; 628 int size = 0; 629 while (c != null) { 630 size += c.top - c.offset; 631 c = c.next; 632 } 633 if (lower.direct()) { 634 lower.send(marshaller); 635 } else { 636 Marshaller m = marshallerFactory.newMarshaller(); 637 lower.prepare(m); 638 m.write(marshaller.getState()); 639 marshaller.reset(); 640 lower.send(m); 641 } 642 } 643 644 Marshaller prepareReplyMessage (final int rqId, final boolean isException) 645 throws JonathanException 646 { 647 Marshaller marshaller = marshallerFactory.newMarshaller(); 648 marshaller.writeInt(rqId); 649 marshaller.writeBoolean(isException); 650 return marshaller; 651 } 652 653 synchronized void forwardException ( 654 final JonathanException e, 655 final Session_High lower) 656 { 657 int len = table.length; 658 ReplyHolder holder; 659 for (int i = 0; i < len; i++) { 660 holder = table[i]; 661 while (holder != null) { 662 if (holder.lower == lower) { 663 holder.sendReply(e); 664 } 665 holder = holder.next; 666 } 667 } 668 } 669 670 674 679 680 synchronized ReplyHolder getHolder (final int id) { 681 int index = (id & 0x7FFFFFFF) % table.length; 682 ReplyHolder holder = table[index]; 683 while (! (holder == null || holder.id == id)) { 684 holder = holder.next; 685 } 686 return holder; 687 } 688 689 synchronized ReplyHolder registerHolder (final Session_High lower) { 690 ReplyHolder holder; 691 if (reusable == null) { 692 holder = new ReplyHolder(lower); 693 id++; 694 holder.id = id; 695 } else { 696 holder = reusable; 697 holder.lower = lower; 698 reusable = reusable.next; 699 } 700 int len = table.length; 701 int index = (holder.id & 0x7FFFFFFF) % len; 702 holder.next = table[index]; 703 table[index] = holder; 704 size++; 705 if (size > len / 2) { 706 rehash(len); 707 } 708 return holder; 709 } 710 711 715 716 synchronized void removeHolder (final int id) { 717 int index = (id & 0x7FFFFFFF) % table.length; 718 ReplyHolder first = table[index]; 719 ReplyHolder holder = first; 720 ReplyHolder prev = null; 721 while (holder.id != id) { 722 prev = holder; 723 holder = holder.next; 724 } 725 if (holder != null) { 726 size--; 727 if (prev != null) { 728 prev.next = holder.next; 729 } else { 730 table[index] = holder.next; 731 } 732 holder.next = reusable; 733 holder.lower = null; 734 reusable = holder; 735 } 736 } 737 738 void rehash (final int len) { 739 int newLen = 2 * len + 1; 740 int index; 741 ReplyHolder holder, next_holder; 742 ReplyHolder[] newTable = new ReplyHolder[newLen]; 743 for (int i = 0; i < len; i++) { 744 holder = table[i]; 745 while (holder != null) { 746 next_holder = holder.next; 747 index = (holder.id & 0x7FFFFFFF) % newLen; 749 holder.next = newTable[index]; 750 newTable[index] = holder; 751 752 holder = next_holder; 753 } 754 } 755 table = newTable; 756 } 757 758 class ReplyHolder implements ReplyInterface { 759 760 Object reply; 761 int id; 762 ReplyHolder next; 763 Session_High lower; 764 765 public ReplyHolder (final Session_High lower) { 766 super(); 767 this.lower = lower; 768 } 769 770 public synchronized UnMarshaller listen () throws JonathanException { 771 try { 772 while (reply == null) { 773 scheduler.wait(this); 774 } 775 UnMarshaller message = (UnMarshaller)reply; 776 boolean isException = message.readBoolean(); 777 if (isException) { 778 throw new ServerException(message); 779 } else { 780 return message; 781 } 782 } catch (InterruptedException e) { 783 throw new JonathanException(e); 784 } catch (ClassCastException e) { 785 throw (JonathanException)reply; 786 } finally { 787 reply = null; 788 removeHolder(id); 789 } 790 } 791 792 public synchronized boolean available () { 793 return reply != null; 794 } 795 796 final synchronized void sendReply (final Object reply) { 797 this.reply = reply; 798 scheduler.notify(this); 799 } 800 } 801 } 802 | Popular Tags |