KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > fractal > rmi > RmiProtocol


1 /***
2  * Fractal RMI: a binder for remote method calls between Fractal components.
3  * Copyright (C) 2003 France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: Eric.Bruneton@rd.francetelecom.com
20  *
21  * Author: Eric Bruneton
22  *
23  * adapted from Jonathan:
24  * org.objectweb.david.libs.protocols.giop.GIOPProtocol
25  * (authors: B. Dumant, S. Chambon, H. Piccone, S. Thiebaud)
26  * with some comments copied from:
27  * org.objectweb.jonathan.apis.protocols.Protocol
28  * (author: B. Dumant)
29  */

30
31 package org.objectweb.fractal.rmi;
32
33 import org.objectweb.fractal.api.control.BindingController;
34
35 import org.objectweb.jonathan.apis.binding.BindException;
36 import org.objectweb.jonathan.apis.binding.ExportException;
37 import org.objectweb.jonathan.apis.binding.Identifier;
38 import org.objectweb.jonathan.apis.binding.NamingContext;
39 import org.objectweb.jonathan.apis.kernel.Context;
40 import org.objectweb.jonathan.apis.kernel.JonathanException;
41 import org.objectweb.jonathan.apis.presentation.Marshaller;
42 import org.objectweb.jonathan.apis.presentation.MarshallerFactory;
43 import org.objectweb.jonathan.apis.presentation.UnMarshaller;
44 import org.objectweb.jonathan.apis.protocols.Protocol;
45 import org.objectweb.jonathan.apis.protocols.ProtocolGraph;
46 import org.objectweb.jonathan.apis.protocols.ReplyInterface;
47 import org.objectweb.jonathan.apis.protocols.ReplySession;
48 import org.objectweb.jonathan.apis.protocols.RequestSession;
49 import org.objectweb.jonathan.apis.protocols.ServerException;
50 import org.objectweb.jonathan.apis.protocols.SessionIdentifier;
51 import org.objectweb.jonathan.apis.protocols.Session_High;
52 import org.objectweb.jonathan.apis.protocols.Session_Low;
53 import org.objectweb.jonathan.apis.resources.Chunk;
54 import org.objectweb.jonathan.apis.resources.Scheduler;
55
56 import org.objectweb.util.monolog.api.BasicLevel;
57 import org.objectweb.util.monolog.api.Logger;
58 import org.objectweb.util.monolog.api.LoggerFactory;
59
60 import java.util.Properties JavaDoc;
61
62 /**
63  * Provides a very simple invocation protocol component. This invocation
64  * protocol uses only two kinds of messages:
65  * <ul>
66  * <li>request messages:
67  * <table border>
68  * <tr>
69  * <td colspan="3"><center>HEADER</center></td>
70  * <td><center>PAYLOAD</center></td>
71  * </tr>
72  * <tr>
73  * <td><center>&nbsp;request identifier (4 bytes)&nbsp;</center></td>
74  * <td><center>&nbsp;object key length (4 bytes)&nbsp;</center></td>
75  * <td><center>&nbsp;object key data (variable length)&nbsp;</center></td>
76  * <td><center>&nbsp;payload (variable length)&nbsp;</center></td>
77  * </tr>
78  * </table>
79  * </li>
80  * <li>reply messages:
81  * <table border>
82  * <tr>
83  * <td colspan="2"><center>HEADER</center></td>
84  * <td><center>PAYLOAD</center></td>
85  * </tr>
86  * <tr>
87  * <td><center>&nbsp;request identifier (4 bytes)&nbsp;</center></td>
88  * <td><center>&nbsp;exception flag (1 byte)&nbsp;</center></td>
89  * <td><center>&nbsp;payload (variable length)&nbsp;</center></td>
90  * </tr>
91  * </table>
92  * </li>
93  * </ul>
94  * The request identifier is used to associate request and reply messages
95  * correctly. The object key identifies the remote object on which the
96  * invocation must be performed. The exception flag is used to indicate if a
97  * reply contains a normal result or an exception.
98  */

99
100 public class RmiProtocol implements Protocol, BindingController {
101
102   /**
103    * The naming context used to decode the object key contained in request
104    * messages.
105    */

106
107   protected NamingContext adapter;
108
109   /**
110    * The marshaller factory used to create request or reply messages.
111    */

112
113   protected MarshallerFactory marshallerFactory;
114
115   /**
116    * The scheduler used to synchronized threads for waiting reply messages.
117    */

118
119   protected Scheduler scheduler;
120
121   /**
122    * The optional logger factory used to get a logger for this component.
123    */

124
125   protected LoggerFactory loggerFactory;
126
127   /**
128    * The logger used to log messages. May be <tt>null</tt>.
129    */

130
131   protected Logger logger;
132
133   private ClientSession_Low clientSessionLow;
134
135   private ServerSession_Low serverSessionLow;
136
137   private ReplyHolder[] table;
138
139   private ReplyHolder reusable;
140
141   private int size;
142
143   private int id;
144
145   /**
146    * Constructs a new {@link RmiProtocol}.
147    */

148
149   public RmiProtocol () {
150     clientSessionLow = new ClientSession_Low();
151     serverSessionLow = new ServerSession_Low();
152     table = new ReplyHolder[17];
153   }
154
155   // --------------------------------------------------------------------------
156
// Implementation of the BindingController interface
157
// --------------------------------------------------------------------------
158

159   public String JavaDoc[] listFc () {
160     return new String JavaDoc[] {
161       "adapter",
162       "marshaller-factory",
163       "scheduler",
164       "logger-factory"
165     };
166   }
167
168   public Object JavaDoc lookupFc (final String JavaDoc clientItfName) {
169     if (clientItfName.equals("adapter")) {
170       return adapter;
171     } else if (clientItfName.equals("marshaller-factory")) {
172       return marshallerFactory;
173     } else if (clientItfName.equals("scheduler")) {
174       return scheduler;
175     } else if (clientItfName.equals("logger-factory")) {
176       return loggerFactory;
177     }
178     return null;
179   }
180
181   public void bindFc (final String JavaDoc clientItfName, final Object JavaDoc serverItf) {
182     if (clientItfName.equals("adapter")) {
183       adapter = (NamingContext)serverItf;
184     } else if (clientItfName.equals("marshaller-factory")) {
185       marshallerFactory = (MarshallerFactory)serverItf;
186     } else if (clientItfName.equals("scheduler")) {
187       scheduler = (Scheduler)serverItf;
188     } else if (clientItfName.equals("logger-factory")) {
189       loggerFactory = (LoggerFactory)serverItf;
190       logger = loggerFactory.getLogger(getClass().getName());
191     }
192   }
193
194   public void unbindFc (final String JavaDoc clientItfName) {
195     if (clientItfName.equals("adapter")) {
196       adapter = null;
197     } else if (clientItfName.equals("marshaller-factory")) {
198       marshallerFactory = null;
199     } else if (clientItfName.equals("scheduler")) {
200       scheduler = null;
201     } else if (clientItfName.equals("logger-factory")) {
202       loggerFactory = null;
203       logger = null;
204     }
205   }
206
207   // --------------------------------------------------------------------------
208
// Implementation of the Protocol interface
209
// --------------------------------------------------------------------------
210

211   /**
212    * Returns true if the target protocol is an invocation protocol. An
213    * invocation protocol is a protocol able to handle <i>invocations</i>, i.e.,
214    * requests expecting a reply. In practice, this means that calls to the
215    * {@link Session_High#prepareInvocation(Marshaller) prepareInvocation} method
216    * on sessions obtained from the target protocol will not raise an
217    * {@link org.objectweb.jonathan.apis.kernel.InternalException}, but perform
218    * the appropriate work.
219    *
220    * @return true if the target protocol is an invocation protocol.
221    */

222
223   public boolean isAnInvocationProtocol () {
224     return true;
225   }
226
227   /**
228    * Creates a new protocol graph with a number of given sub protocol graphs.
229    *
230    * @param subgraphs the lower-level graphs.
231    * @param hints the information required to build the graph.
232    * @return a new ProtocolGraph.
233    * @throws JonathanException if the hints or the subgraphs are invalid for
234    * this protocol.
235    */

236
237   public ProtocolGraph createProtocolGraph (
238     final ProtocolGraph[] subgraphs,
239     final Context hints) throws JonathanException
240   {
241     if (subgraphs.length != 1) {
242       throw new JonathanException("Lower layers badly specified in RMIP");
243     }
244     return new Graph(subgraphs[0]);
245   }
246
247   /**
248    * Creates a new session identifier with the provided info.
249    *
250    * @param info the information to create the session identifier.
251    * @param next lower session identifiers, if any.
252    * @return the created session identifer.
253    * @throws JonathanException if something goes wrong.
254    */

255
256   public SessionIdentifier createSessionIdentifier (
257     final Properties JavaDoc info,
258     final SessionIdentifier[] next) throws JonathanException
259   {
260     if (next.length != 1) {
261       throw new JonathanException("Lower layers badly specified in RMIP");
262     }
263     byte[] key = (byte[])(info.get("object_key"));
264     return new CltSessionId(key, next[0]);
265   }
266
267   // --------------------------------------------------------------------------
268
// ProtocolGraph and Session identifiers
269
// --------------------------------------------------------------------------
270

271   class Graph implements ProtocolGraph {
272
273     ProtocolGraph next;
274
275     public Graph (final ProtocolGraph next) {
276       this.next = next;
277     }
278
279     /*
280     public boolean equals (final Object o) {
281       if (o instanceof Graph) {
282         Graph pgraph = (Graph) o;
283         boolean one = false, two = false;
284         if (next != null) {
285           one = next.equals(pgraph.next);
286         } else {
287           one = pgraph.next == null;
288         }
289         return one;
290       }
291       return false;
292     }
293
294     public int hashCode () {
295       if (next != null) {
296         return next.hashCode();
297       } else {
298         return 0;
299       }
300     }
301     */

302
303     public SessionIdentifier export (final Session_Low ignored)
304       throws JonathanException
305     {
306       if (next == null) {
307         throw new ExportException("Badly specified participants");
308       }
309       return new SrvSessionId(next.export(serverSessionLow));
310     }
311   }
312
313   class SrvSessionId implements SessionIdentifier {
314
315     SessionIdentifier next;
316
317     public SrvSessionId (final SessionIdentifier next) {
318       this.next = next;
319     }
320
321     public Session_High bind (final Session_Low ignored)
322       throws JonathanException
323     {
324       throw new BindException("Bad session identifier type");
325     }
326
327     public void unexport () {
328       next.unexport();
329     }
330
331     public Protocol getProtocol () {
332       return RmiProtocol.this;
333     }
334
335     public SessionIdentifier[] next () {
336       return new SessionIdentifier[] {next};
337     }
338
339     /*
340     public boolean equals (final Object o) {
341       if (o instanceof SrvSessionId) {
342         SrvSessionId sessionId = (SrvSessionId)o;
343         boolean one = false, two = false;
344         if (next != null) {
345           one = next.equals(sessionId.next);
346         } else {
347           one = sessionId.next == null;
348         }
349         return one;
350
351       }
352       return false;
353     }
354
355     public int hashCode () {
356       if (next != null) {
357         return next.hashCode();
358       } else {
359         return 0;
360       }
361     }
362     */

363
364     public int getProtocolId () {
365       return 0;
366     }
367
368     public Context getInfo () throws JonathanException {
369       throw new JonathanException("Not implemented");
370       //return contextFactory.newContext();
371
}
372
373     public boolean isLocal () {
374       return false; // meaningless in this case
375
}
376   }
377
378   class CltSessionId extends SrvSessionId {
379
380     byte[] key;
381
382     public CltSessionId (
383       final byte[] key,
384       final SessionIdentifier next)
385     {
386       super(next);
387       this.key = key;
388     }
389
390     public Session_High bind (final Session_Low ignored)
391       throws JonathanException
392     {
393       if (next == null) {
394         throw new BindException("Badly specified participants");
395       }
396       return new ClientSession_High(key, next.bind(clientSessionLow));
397     }
398
399     public void unexport () {}
400
401     /*
402     public boolean equals (final Object o) {
403       if (o instanceof CltSessionId) {
404         CltSessionId sessionId = (CltSessionId)o;
405         int len = key.length;
406         byte[] otherKey = sessionId.key;
407         if (otherKey.length == len) {
408           for (int i = 0; i < len; i++) {
409             if (otherKey[i] != key[i]) {
410               return false;
411             }
412           }
413           return super.equals(sessionId);
414         }
415       }
416       return false;
417     }
418
419     public int hashCode () {
420       int hash = 0;
421       int len = key.length;
422       for (int i = 0; i < len; i++) {
423         hash += (key[i] << (i % 32));
424       }
425       return hash + super.hashCode();
426     }
427
428     public String toString () {
429       String str = "CltSessionId[key[";
430       if (key.length > 0) {
431         str = str + key[0];
432       }
433       for (int i = 1; i < key.length; i++) {
434         str = str + "," + key[i];
435       }
436       return str;
437     }
438     */

439
440     public Context getInfo () throws JonathanException {
441       throw new JonathanException("Meaningless");
442     }
443
444     public boolean isLocal() {
445       return next.isLocal();
446     }
447   }
448
449   // --------------------------------------------------------------------------
450
// SessionLow and SessionHigh interfaces
451
// --------------------------------------------------------------------------
452

453   class ClientSession_Low implements Session_Low {
454
455     public void send (
456       final UnMarshaller unmarshaller,
457       final Session_High sender) throws JonathanException
458     {
459       try {
460         int rqId = unmarshaller.readInt();
461         ReplyHolder reply = getHolder(rqId);
462         if (reply == null) {
463           // we can't find the request: return silently
464
unmarshaller.close();
465           if (logger != null && logger.isLoggable(BasicLevel.INFO)) {
466             logger.log(
467               BasicLevel.DEBUG, "RMIP Request #" + rqId + " not found");
468           }
469         } else {
470           reply.sendReply(unmarshaller);
471         }
472       } catch (JonathanException e) {
473         unmarshaller.close();
474         send(e, sender);
475       }
476     }
477
478     public void send (final JonathanException e, final Session_High sender) {
479       forwardException(e, sender);
480     }
481   }
482
483   class ServerSession_Low implements Session_Low {
484
485     public void send (
486       final UnMarshaller unmarshaller,
487       final Session_High sender) throws JonathanException
488     {
489       boolean unmarshallerOpened = true;
490       int rqId = unmarshaller.readInt();
491       RequestSession requestSession = null;
492       try {
493         int len = unmarshaller.readInt();
494         byte[] key = new byte[len];
495         unmarshaller.readByteArray(key,0,len);
496         Identifier id = adapter.decode(key, 0, len);
497         requestSession = (RequestSession)id.bind(null, null);
498         if (requestSession != null) {
499           ServerSession_High replySession =
500             new ServerSession_High(sender, rqId);
501           requestSession.send(unmarshaller, replySession);
502           return;
503         }
504         unmarshallerOpened = false;
505         unmarshaller.close();
506       } catch (Exception JavaDoc e) {
507         if (logger != null && logger.isLoggable(BasicLevel.INFO)) {
508           logger.log(BasicLevel.DEBUG, "Exception caught in RMIP", e);
509         }
510         if (unmarshallerOpened) {
511           unmarshaller.close();
512         }
513         sendException(e, rqId, sender);
514       }
515     }
516
517     public void send (final JonathanException e, final Session_High sender) {
518       if (logger != null && logger.isLoggable(BasicLevel.INFO)) {
519         logger.log(
520           BasicLevel.DEBUG, "Exception caught in RMIP related to " + sender, e);
521       }
522       sender.close();
523     }
524
525     void sendException (
526       final Exception JavaDoc e,
527       final int rqId,
528       final Session_High session)
529     {
530       try {
531         Marshaller marshaller = prepareReplyMessage(rqId, true);
532         marshaller.writeValue(e);
533         sendMessage(marshaller, session);
534       } catch (Exception JavaDoc ignored) {
535       } finally {
536         session.close();
537       }
538     }
539   }
540
541   class RMIPSession_High {
542
543     Session_High lower;
544
545     public RMIPSession_High (final Session_High lower) {
546       this.lower = lower;
547     }
548
549     public boolean direct () {
550       return false;
551     }
552
553     public void send (final Marshaller marshaller) throws JonathanException {
554       sendMessage(marshaller, lower);
555     }
556
557     public void close() {
558       lower.close();
559     }
560   }
561
562   class ClientSession_High extends RMIPSession_High implements Session_High {
563
564     byte[] key;
565
566     /**
567      * @param key the target object key.
568      * @param lower the lower protocol session.
569      */

570
571     public ClientSession_High (final byte[] key, final Session_High lower) {
572       super(lower);
573       this.key = key;
574     }
575
576     public ReplyInterface prepareInvocation (final Marshaller marshaller)
577       throws JonathanException
578     {
579       ReplyHolder reply = registerHolder(lower);
580       marshaller.writeInt(reply.id);
581       int len = key.length;
582       marshaller.writeInt(len);
583       marshaller.writeByteArray(key, 0, len);
584       return reply;
585     }
586
587     public void prepare (final Marshaller marshaller) throws JonathanException {
588       marshaller.writeInt(0);
589       int len = key.length;
590       marshaller.writeInt(len);
591       marshaller.writeByteArray(key, 0, len);
592     }
593   }
594
595   class ServerSession_High extends RMIPSession_High implements ReplySession {
596
597     int rqId;
598
599     public ServerSession_High (final Session_High lower, final int rqId) {
600       super(lower);
601       this.rqId = rqId;
602     }
603
604     public Marshaller prepareReply () throws JonathanException {
605       return prepareReplyMessage(rqId, false);
606     }
607
608     public Marshaller prepareExceptionReply () throws JonathanException {
609       return prepareReplyMessage(rqId, true);
610     }
611
612     public Marshaller prepareSystemExceptionReply () throws JonathanException {
613       return prepareReplyMessage(rqId, true);
614     }
615
616     public Marshaller prepareLocationForwardReply () throws JonathanException {
617       return prepareReplyMessage(rqId, true);
618     }
619   }
620
621   // utility methods ----------------------------------------------------------
622

623   void sendMessage (final Marshaller marshaller, final Session_High lower)
624     throws JonathanException
625   {
626     Chunk first = marshaller.getState();
627     Chunk c = first;
628     int size = 0;
629     while (c != null) {
630       size += c.top - c.offset;
631       c = c.next;
632     }
633     if (lower.direct()) {
634       lower.send(marshaller);
635     } else {
636       Marshaller m = marshallerFactory.newMarshaller();
637       lower.prepare(m);
638       m.write(marshaller.getState());
639       marshaller.reset();
640       lower.send(m);
641     }
642   }
643
644   Marshaller prepareReplyMessage (final int rqId, final boolean isException)
645     throws JonathanException
646   {
647     Marshaller marshaller = marshallerFactory.newMarshaller();
648     marshaller.writeInt(rqId);
649     marshaller.writeBoolean(isException);
650     return marshaller;
651   }
652
653   synchronized void forwardException (
654     final JonathanException e,
655     final Session_High lower)
656   {
657     int len = table.length;
658     ReplyHolder holder;
659     for (int i = 0; i < len; i++) {
660       holder = table[i];
661       while (holder != null) {
662         if (holder.lower == lower) {
663           holder.sendReply(e);
664         }
665         holder = holder.next;
666       }
667     }
668   }
669
670   // --------------------------------------------------------------------------
671
// Reply holders management
672
// --------------------------------------------------------------------------
673

674   /**
675    * Returns the holder identified by id, or null if none exists.
676    * @param id a holder identifier.
677    * @return the corresponding holder.
678    */

679
680   synchronized ReplyHolder getHolder (final int id) {
681     int index = (id & 0x7FFFFFFF) % table.length;
682     ReplyHolder holder = table[index];
683     while (! (holder == null || holder.id == id)) {
684       holder = holder.next;
685     }
686     return holder;
687   }
688
689   synchronized ReplyHolder registerHolder (final Session_High lower) {
690     ReplyHolder holder;
691     if (reusable == null) {
692       holder = new ReplyHolder(lower);
693       id++;
694       holder.id = id;
695     } else {
696       holder = reusable;
697       holder.lower = lower;
698       reusable = reusable.next;
699     }
700     int len = table.length;
701     int index = (holder.id & 0x7FFFFFFF) % len;
702     holder.next = table[index];
703     table[index] = holder;
704     size++;
705     if (size > len / 2) {
706       rehash(len);
707     }
708     return holder;
709   }
710
711   /**
712    * Removes the holder identified by the provided id from the table.
713    * @param id the holder identifier.
714    */

715
716   synchronized void removeHolder (final int id) {
717     int index = (id & 0x7FFFFFFF) % table.length;
718     ReplyHolder first = table[index];
719     ReplyHolder holder = first;
720     ReplyHolder prev = null;
721     while (holder.id != id) {
722       prev = holder;
723       holder = holder.next;
724     }
725     if (holder != null) {
726       size--;
727       if (prev != null) {
728         prev.next = holder.next;
729       } else {
730         table[index] = holder.next;
731       }
732       holder.next = reusable;
733       holder.lower = null;
734       reusable = holder;
735     }
736   }
737
738   void rehash (final int len) {
739     int newLen = 2 * len + 1;
740     int index;
741     ReplyHolder holder, next_holder;
742     ReplyHolder[] newTable = new ReplyHolder[newLen];
743     for (int i = 0; i < len; i++) {
744       holder = table[i];
745       while (holder != null) {
746         next_holder = holder.next;
747         // rehashing
748
index = (holder.id & 0x7FFFFFFF) % newLen;
749         holder.next = newTable[index];
750         newTable[index] = holder;
751
752         holder = next_holder;
753       }
754     }
755     table = newTable;
756   }
757
758   class ReplyHolder implements ReplyInterface {
759
760     Object JavaDoc reply;
761     int id;
762     ReplyHolder next;
763     Session_High lower;
764
765     public ReplyHolder (final Session_High lower) {
766       super();
767       this.lower = lower;
768     }
769
770     public synchronized UnMarshaller listen () throws JonathanException {
771       try {
772         while (reply == null) {
773           scheduler.wait(this);
774         }
775         UnMarshaller message = (UnMarshaller)reply;
776         boolean isException = message.readBoolean();
777         if (isException) {
778           throw new ServerException(message);
779         } else {
780           return message;
781         }
782       } catch (InterruptedException JavaDoc e) {
783         throw new JonathanException(e);
784       } catch (ClassCastException JavaDoc e) {
785         throw (JonathanException)reply;
786       } finally {
787         reply = null;
788         removeHolder(id);
789       }
790     }
791
792     public synchronized boolean available () {
793       return reply != null;
794     }
795
796     final synchronized void sendReply (final Object JavaDoc reply) {
797       this.reply = reply;
798       scheduler.notify(this);
799     }
800   }
801 }
802
Popular Tags