KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > SOFA > SOFAnet > Transport > RMI > RMITransportNodeConnection


1 /*
2  * RMITransportNodeConnection.java
3  *
4  * Created on 8. duben 2004, 15:19
5  */

6
7 package SOFA.SOFAnet.Transport.RMI;
8
9 import SOFA.SOFAnet.Repository.NodeInfo;
10 import SOFA.SOFAnet.Core.NetOps;
11 import SOFA.SOFAnet.Core.CoreException;
12 import SOFA.SOFAnet.Core.Reporter;
13 import SOFA.SOFAnet.Transport.*;
14 import java.rmi.RemoteException JavaDoc;
15 import java.util.*;
16
17 /**
18  * The object of this class implements connection to other nodes (in transport layer).
19  * The connection is kept alive certain time (one minute) after last operation on the connection.
20  * It prevents the system from creating many connections if there are more transport requests.
21  * <p>
22  * The "push" and "pull" operations are performed by extra thread, therefore these "distribution" operations
23  * creates just one RMI connection between two nodes.
24  *
25  * @author Ladislav Sobr
26  */

27 public class RMITransportNodeConnection
28 {
29   private RMITransportClient parent;
30   private NetOps netOps;
31   private RMITransportServerHolder serverHolder;
32   private NodeInfo nodeInfo;
33   private Thread JavaDoc asyncThread;
34   private Thread JavaDoc stopThread;
35   private Object JavaDoc stopEvent;
36   private boolean stopped;
37   private long lastUseTime;
38   private List asyncOps;
39   private String JavaDoc myNodeName;
40   
41   final static private int OP_STOP = 0;
42   final static private int OP_PUSH = 1;
43   final static private int OP_PULL = 2;
44   
45   private static class AsyncOperation
46   {
47     public int op;
48     public IOParams params;
49     
50     AsyncOperation(int op, IOParams params)
51     {
52       this.op = op;
53       this.params = params;
54     }
55   }
56   
57   /** Creates a new instance of RMITransportNodeConnection */
58   public RMITransportNodeConnection(NodeInfo nodeInfo, RMITransportClient parent)
59   {
60     this.parent = parent;
61     
62     serverHolder = new RMITransportServerHolder();
63     serverHolder.setNode(nodeInfo);
64     this.nodeInfo = nodeInfo;
65     
66     stopEvent = new Object JavaDoc();
67     
68     stopped = false;
69     lastUseTime = System.currentTimeMillis();
70     asyncOps = new LinkedList();
71     
72     myNodeName = NodeInfo.getLocalNodeName();
73
74     
75     asyncThread = new Thread JavaDoc()
76                     {
77                       public void run()
78                       {
79                         asyncThreadProcedure();
80                       }
81                     };
82                         
83     stopThread = new Thread JavaDoc()
84                     {
85                       public void run()
86                       {
87                         stopThreadProcedure();
88                       }
89                     };
90     
91     asyncThread.start();
92     stopThread.start();
93
94   }
95   
96   private void asyncThreadProcedure()
97   {
98     for (;;)
99     {
100       AsyncOperation operation;
101       
102       synchronized (asyncOps)
103       {
104         while (asyncOps.isEmpty())
105         {
106           try
107           {
108             asyncOps.wait();
109           }
110           catch (InterruptedException JavaDoc e)
111           {
112           }
113         }
114
115         touch();
116         
117         //go through operation list and choose operation with highest priority
118

119         int index = -1;
120         int priority = -1; //2 == interactive ; 1 == distribution ; 0 == stop
121

122         Iterator it = asyncOps.iterator();
123         int i = 0;
124         while (it.hasNext())
125         {
126           AsyncOperation asyncOp = (AsyncOperation)it.next();
127
128           int p;
129           if (asyncOp.op == OP_STOP) p = 0;
130           else if (asyncOp.params.getPriority() == IOParams.PRIORITY_INTERACTIVE ||
131                    asyncOp.params.getPriority() == IOParams.PRIORITY_BLOCKING) p = 2;
132           else p = 1;
133
134           if (index == -1)
135           {
136             priority = p;
137             index = i;
138           }
139           else if (p > priority)
140           {
141             priority = p;
142             index = i;
143           }
144
145           i++;
146         }
147
148         operation = (AsyncOperation)asyncOps.remove(index);
149       }
150       
151       //perform operation
152

153       switch (operation.op)
154       {
155       case OP_STOP:
156         return;
157         
158       case OP_PUSH:
159         {
160           try
161           {
162             RMITransportInterface server = serverHolder.getServer();
163             LicenceRMI licenceRMI = null;
164             if (operation.params.getLicence() != null) licenceRMI = new LicenceRMI(operation.params.getLicence());
165             byte[] bundle = operation.params.getBundleData().getData();
166             if (bundle != null)
167             {
168               server.push(myNodeName, operation.params.getBundleName(), bundle, licenceRMI, operation.params.isOffer());
169               bundle = null;
170             }
171             else
172             {
173               Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed: Cannot load bundle from BundleData");
174             }
175           }
176           catch (RemoteException JavaDoc e)
177           {
178             serverHolder.releaseServer();
179             Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed", e);
180           }
181           catch (RMITransportException e)
182           {
183             Reporter.error("Asynchronous push of '" + operation.params.getBundleName() + "' failed", e);
184           }
185           
186           operation.params.getBundleData().delete();
187         }
188         break;
189         
190       
191       case OP_PULL:
192         {
193           RMIPullOutputHolder outputHolder = null;
194           try
195           {
196             RMITransportInterface server = serverHolder.getServer();
197             outputHolder = server.pull(myNodeName, operation.params.getBundleName(), false, false, operation.params.getContractID());
198           }
199           catch (RemoteException JavaDoc e)
200           {
201             serverHolder.releaseServer();
202             Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e);
203           }
204           catch (RMITransportException e)
205           {
206             Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e);
207           }
208           
209           touch();
210
211           if (outputHolder != null)
212           {
213             if (outputHolder.errCode == 0)
214             {
215               if (outputHolder.licenceRMI != null) operation.params.setLicence(outputHolder.licenceRMI.getLicence());
216               else operation.params.setLicence(null);
217               operation.params.setBundleData(new BundleData(outputHolder.bundle));
218               operation.params.setOffer(false);
219               operation.params.setSourceNodeName(nodeInfo.getName());
220               
221               try
222               {
223                 parent.getNetOps().deliverBundlePull(operation.params);
224               }
225               catch (CoreException e)
226               {
227                 Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed", e);
228               }
229               
230               operation.params.getBundleData().delete(); //just to be sure
231
}
232             else
233             {
234               switch (outputHolder.errCode)
235               {
236               case 1: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Cannot find binary bundle on remote node"); break;
237               case 2: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Cannot create BundleData for bundle"); break;
238               case 3: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Pull not allowed by remote node"); break;
239               default: Reporter.error("Asynchronous pull of '" + operation.params.getBundleName() + "' failed: Transport layer failed"); break;
240               }
241             }
242             
243             outputHolder = null;
244           }
245
246         }
247         break;
248       }
249       
250       touch();
251     }
252   }
253   
254   private void stopThreadProcedure()
255   {
256     synchronized (stopEvent)
257     {
258       do
259       {
260         try
261         {
262           stopEvent.wait(1000 * 60);
263         }
264         catch (InterruptedException JavaDoc e)
265         {
266         }
267       }
268       while (System.currentTimeMillis() - lastUseTime < 1000 * 60);
269     }
270     
271     parent.removeFromMap(nodeInfo.getAddressAndPort(), this);
272     
273     synchronized (asyncOps)
274     {
275       asyncOps.add(new AsyncOperation(OP_STOP, null));
276       asyncOps.notify();
277       stopped = true;
278     }
279     
280   }
281   
282   private void touch()
283   {
284     lastUseTime = System.currentTimeMillis();
285     synchronized (stopEvent)
286     {
287       stopEvent.notify();
288     }
289   }
290   
291   /**
292    * Test whether 'push' of bundle or bundle offer is accepted by addressee.
293    * <p>
294    * Synchronous (waits for completion).
295    * <p>
296    * Input:
297    * <ul>
298    * <li> bundleName
299    * <li> offer
300    * </ul>
301    * <p>
302    * Output:
303    * <ul>
304    * <li> errCode
305    * </ul>
306    *
307    * @param params Input/Output data for the call
308    */

309   public void testPush(IOParams params) throws TransportException
310   {
311     touch();
312     
313     boolean result = false;
314     try
315     {
316       RMITransportInterface server = serverHolder.getServer();
317       result = server.testPush(myNodeName, params.getBundleName(), params.isOffer());
318     }
319     catch (RemoteException JavaDoc e)
320     {
321       touch();
322       serverHolder.releaseServer();
323       throw new TransportException(e.getMessage(), e);
324     }
325     catch (RMITransportException e)
326     {
327       touch();
328       throw new TransportException(e.getMessage(), e);
329     }
330     
331     if (result) params.setErrCode(0);
332     else params.setErrCode(1);
333
334     touch();
335   }
336   
337   /**
338    * Push bundle or bundle offer to node.
339    * <p>
340    * Asynchronous (doesn't wait for completion).
341    * <p>
342    * Input:
343    * <ul>
344    * <li> bundleName
345    * <li> bundleData
346    * <li> offer
347    * <li> priority
348    * <li> licence &nbsp;&nbsp;<i> (only if offer == false)</i>
349    * </ul>
350    *
351    * Output: none
352    *
353    * @param params Input data for the call
354    * @return false if push cannot be processed any more by this object
355    */

356   public boolean push(IOParams params)
357   {
358     touch();
359     synchronized (asyncOps)
360     {
361       if (stopped) return false;
362       
363       asyncOps.add(new AsyncOperation(OP_PUSH, params));
364       asyncOps.notify();
365     }
366     
367     return true;
368   }
369       
370   /**
371    * Test whether 'pull' of bundle is accepted by source node.
372    * <p>
373    * Synchronous (waits for completion).
374    * <p>
375    * Input:
376    * <ul>
377    * <li> bundleName
378    * <li> contractID
379    * </ul>
380    *
381    * Output:
382    * <ul>
383    * <li> errCode
384    * </ul>
385    *
386    * @param params Input/Output data for the call
387    */

388   public void testPull(IOParams params) throws TransportException
389   {
390     touch();
391     
392     boolean result = false;
393     try
394     {
395       RMITransportInterface server = serverHolder.getServer();
396       result = server.testPull(myNodeName, params.getBundleName(), false, params.getContractID());
397     }
398     catch (RemoteException JavaDoc e)
399     {
400       touch();
401       serverHolder.releaseServer();
402       throw new TransportException(e.getMessage(), e);
403     }
404     catch (RMITransportException e)
405     {
406       touch();
407       throw new TransportException(e.getMessage(), e);
408     }
409
410     if (result) params.setErrCode(0);
411     else params.setErrCode(1);
412     
413     touch();
414   }
415   
416   /**
417    * Pulls bundle from node.
418    * <p>
419    * Asynchronous (doesn't wait for completion).
420    * <p>
421    * Input:
422    * <ul>
423    * <li> bundleName
424    * <li> priority
425    * <li> contractID
426    * </ul>
427    * <p>
428    * Output: none (calls NetOps.deliverBundle)
429    *
430    *
431    * @param params Input data for the call
432    * @return false if pull cannot be processed any more by this object
433    */

434   public boolean pull(IOParams params)
435   {
436     touch();
437     synchronized (asyncOps)
438     {
439       if (stopped) return false;
440       
441       asyncOps.add(new AsyncOperation(OP_PULL, params));
442       asyncOps.notify();
443     }
444     
445     return true;
446   }
447
448   /**
449    * Pulls bundle from node.
450    * <p>
451    * Synchronous (waits for completion).
452    * <p>
453    * Input:
454    * <ul>
455    * <li> bundleName
456    * <li> priority
457    * <li> contractID
458    * </ul>
459    *
460    * Output:
461    * <ul>
462    * <li> errCode
463    * <ul>
464    * <li> 0 == OK
465    * <li> 1 == Cannot find binary bundle
466    * <li> 2 == Cannot create BundleData for bundle
467    * <li> 3 == Pull not allowed
468    * </ul>
469    * <li> bundleData &nbsp;&nbsp;<i> (only if errCode == 0)</i>
470    * <li> licence &nbsp;&nbsp;<i> (only if errCode == 0)</i>
471    * </ul>
472    *
473    *
474    * @param params Input/Output data for the call
475    */

476   public void synchroPull(IOParams params) throws TransportException
477   {
478     touch();
479     
480     RMIPullOutputHolder outputHolder = null;
481     try
482     {
483       RMITransportInterface server = serverHolder.getServer();
484       outputHolder = server.pull(myNodeName, params.getBundleName(), false, false, params.getContractID());
485     }
486     catch (RemoteException JavaDoc e)
487     {
488       touch();
489       serverHolder.releaseServer();
490       throw new TransportException(e.getMessage(), e);
491     }
492     catch (RMITransportException e)
493     {
494       touch();
495       throw new TransportException(e.getMessage(), e);
496     }
497
498     params.setErrCode(outputHolder.errCode);
499     if (outputHolder.licenceRMI != null) params.setLicence(outputHolder.licenceRMI.getLicence());
500     else params.setLicence(null);
501     params.setBundleData(new BundleData(outputHolder.bundle));
502     
503     touch();
504   }
505   
506   /**
507    * Test whether source node's share manager can accept acquire.
508    * <p>
509    * Synchronous (waits for completion).
510    * <p>
511    * Input:
512    * <ul>
513    * <li> bundleName
514    * </ul>
515    * <p>
516    * Output:
517    * <ul>
518    * <li> errCode
519    * </ul>
520    *
521    *
522    * @param params Input/Output data for the call
523    */

524   public void testAcquireShared(IOParams params) throws TransportException
525   {
526     touch();
527     
528     boolean result = false;
529     try
530     {
531       RMITransportInterface server = serverHolder.getServer();
532       result = server.testPull(myNodeName, params.getBundleName(), true, "");
533     }
534     catch (RemoteException JavaDoc e)
535     {
536       touch();
537       serverHolder.releaseServer();
538       throw new TransportException(e.getMessage(), e);
539     }
540     catch (RMITransportException e)
541     {
542       touch();
543       throw new TransportException(e.getMessage(), e);
544     }
545
546     if (result) params.setErrCode(0);
547     else params.setErrCode(1);
548     
549     touch();
550   }
551   
552   /**
553    * Acquire bundle from node's share manager.
554    * <p>
555    * Synchronous (waits for completion).
556    * <p>
557    * Input:
558    * <ul>
559    * <li> bundleName
560    * <li> licenceOnly
561    * <li> priority
562    * </ul>
563    *
564    * Output:
565    * <ul>
566    * <li> errCode
567    * <ul>
568    * <li> 0 == OK
569    * <li> 1 == Not a share manager or access not allowed
570    * <li> 2 == Licence not available
571    * <li> 3 == Data error (binary bundle not available)
572    * <li> 4 == Incosistent state of share manager
573    * </ul>
574    * <li> address &nbsp;&nbsp;<i> (only if errCode == 0 && licenceOnly == false)</i>
575    * <li> bundleData &nbsp;&nbsp;<i> (only if errCode == 0 && licenceOnly == false && address == false)</i>
576    * <li> addressNodeName &nbsp;&nbsp;<i> (only if errCode == 0 && licenceOnly == false && address == true)</i>
577    * <li> licence &nbsp;&nbsp;<i> (only if errCode == 0)</i>
578    * </ul>
579    *
580    *
581    * @param params Input/Output data for the call
582    */

583   public void acquireShared(IOParams params) throws TransportException
584   {
585     touch();
586     
587     RMIPullOutputHolder outputHolder = null;
588     try
589     {
590       RMITransportInterface server = serverHolder.getServer();
591       outputHolder = server.pull(myNodeName, params.getBundleName(), true, params.isLicenceOnly(), "");
592     }
593     catch (RemoteException JavaDoc e)
594     {
595       touch();
596       serverHolder.releaseServer();
597       throw new TransportException(e.getMessage(), e);
598     }
599     catch (RMITransportException e)
600     {
601       touch();
602       throw new TransportException(e.getMessage(), e);
603     }
604
605     params.setErrCode(outputHolder.errCode);
606     if (outputHolder.licenceRMI != null) params.setLicence(outputHolder.licenceRMI.getLicence());
607     else params.setLicence(null);
608     if (params.isLicenceOnly())
609     {
610       //just to be sure, but nobody should read it
611
params.setAddress(false);
612       params.setAddressNodeName("");
613       params.setBundleData(null);
614     }
615     else
616     {
617       if (outputHolder.address == null || outputHolder.address.length() == 0)
618       {
619         params.setAddress(false);
620         params.setAddressNodeName("");
621         params.setBundleData(new BundleData(outputHolder.bundle));
622       }
623       else
624       {
625         params.setAddress(true);
626         params.setAddressNodeName(outputHolder.address);
627         params.setBundleData(null);
628       }
629     }
630     
631     touch();
632   }
633   
634   /**
635    * Tests whether node (share client) can return shared bundle (it's licence) back to share manager.
636    * <p>
637    * Synchronous (waits for completion).
638    * <p>
639    * Input:
640    * <ul>
641    * <li> bundleName
642    * </ul>
643    *
644    * Output:
645    * <ul>
646    * <li> errCode
647    * <ul>
648    * <li> 0 == OK
649    * <li> 1 == Cannot return bundle(licence)
650    * </ul>
651    * </ul>
652    *
653    * @param params Input/Output data for the call
654    */

655   public void canReturnShared(IOParams params) throws TransportException
656   {
657     touch();
658     
659     boolean result = false;
660     try
661     {
662       RMITransportInterface server = serverHolder.getServer();
663       result = server.canReturnShared(myNodeName, params.getBundleName());
664     }
665     catch (RemoteException JavaDoc e)
666     {
667       touch();
668       serverHolder.releaseServer();
669       throw new TransportException(e.getMessage(), e);
670     }
671     catch (RMITransportException e)
672     {
673       touch();
674       throw new TransportException(e.getMessage(), e);
675     }
676
677     if (result) params.setErrCode(0);
678     else params.setErrCode(1);
679     touch();
680   }
681   
682   /**
683    * Return shared bundle (it's licence) back to share manager (and prepare binary bundle for acquire).
684    * <p>
685    * Synchronous (waits for completion).
686    * <p>
687    * Input:
688    * <ul>
689    * <li> bundleName
690    * <li> address
691    * <li> addressNodeName &nbsp;&nbsp;<i> (if address == true)</i>
692    * </ul>
693    *
694    * Output:
695    * <ul>
696    * <li> errCode
697    * <ul>
698    * <li> 0 == OK
699    * <li> 1 == Not a share client
700    * <li> 2 == Cannot return bundle(licence)
701    * <li> 3 == Cannot prepare for acquire
702    * </ul>
703    * </ul>
704    *
705    * @param params Input/Output data for the call
706    */

707   public void returnShared(IOParams params) throws TransportException
708   {
709     touch();
710     
711     int errCode = 0;
712     try
713     {
714       RMITransportInterface server = serverHolder.getServer();
715       errCode = server.returnShared(myNodeName, params.getBundleName(), params.isAddress(), params.getAddressNodeName());
716     }
717     catch (RemoteException JavaDoc e)
718     {
719       touch();
720       serverHolder.releaseServer();
721       throw new TransportException(e.getMessage(), e);
722     }
723     catch (RMITransportException e)
724     {
725       touch();
726       throw new TransportException(e.getMessage(), e);
727     }
728
729     params.setErrCode(errCode);
730     touch();
731   }
732
733   /**
734    * Ask share manager to call returnShared on us.
735    * <p>
736    * Synchronous (waits for completion).
737    * <p>
738    * Input:
739    * <ul>
740    * <li> bundleName
741    * </ul>
742    *
743    * Output:
744    * <ul>
745    * <li> errCode
746    * <ul>
747    * <li> 0 == OK
748    * <li> 1 == Not a share manager or access not allowed
749    * <li> 2 == returnShared failed
750    * </ul>
751    * </ul>
752    *
753    * @param params Input/Output data for the call
754    */

755   public void manualReturnShared(IOParams params) throws TransportException
756   {
757     touch();
758     
759     int errCode = 0;
760     try
761     {
762       RMITransportInterface server = serverHolder.getServer();
763       errCode = server.manualReturnShared(myNodeName, params.getBundleName());
764     }
765     catch (RemoteException JavaDoc e)
766     {
767       touch();
768       serverHolder.releaseServer();
769       throw new TransportException(e.getMessage(), e);
770     }
771     catch (RMITransportException e)
772     {
773       touch();
774       throw new TransportException(e.getMessage(), e);
775     }
776
777     params.setErrCode(errCode);
778     touch();
779   }
780   
781 }
782
Popular Tags