KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > jndi2 > distributed > ReplicationManager


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2003 ScalAgent Distributed Technologies
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): David Feliot
21  */

22 package fr.dyade.aaa.jndi2.distributed;
23
24 import java.io.IOException JavaDoc;
25 import java.util.Enumeration JavaDoc;
26 import java.util.Hashtable JavaDoc;
27 import java.util.Vector JavaDoc;
28
29 import javax.naming.CompositeName JavaDoc;
30 import javax.naming.NamingException JavaDoc;
31
32 import org.objectweb.util.monolog.api.BasicLevel;
33
34 import fr.dyade.aaa.agent.AgentId;
35 import fr.dyade.aaa.agent.AgentServer;
36 import fr.dyade.aaa.jndi2.impl.BindEvent;
37 import fr.dyade.aaa.jndi2.impl.CreateSubcontextEvent;
38 import fr.dyade.aaa.jndi2.impl.DestroySubcontextEvent;
39 import fr.dyade.aaa.jndi2.impl.MissingContextException;
40 import fr.dyade.aaa.jndi2.impl.MissingRecordException;
41 import fr.dyade.aaa.jndi2.impl.NamingContext;
42 import fr.dyade.aaa.jndi2.impl.NamingContextInfo;
43 import fr.dyade.aaa.jndi2.impl.NotOwnerException;
44 import fr.dyade.aaa.jndi2.impl.RebindEvent;
45 import fr.dyade.aaa.jndi2.impl.UnbindEvent;
46 import fr.dyade.aaa.jndi2.impl.UpdateEvent;
47 import fr.dyade.aaa.jndi2.impl.UpdateListener;
48 import fr.dyade.aaa.jndi2.msg.ChangeOwnerRequest;
49 import fr.dyade.aaa.jndi2.msg.CreateSubcontextRequest;
50 import fr.dyade.aaa.jndi2.msg.JndiError;
51 import fr.dyade.aaa.jndi2.msg.JndiReply;
52 import fr.dyade.aaa.jndi2.msg.JndiRequest;
53 import fr.dyade.aaa.jndi2.server.JndiReplyNot;
54 import fr.dyade.aaa.jndi2.server.JndiScriptReplyNot;
55 import fr.dyade.aaa.jndi2.server.JndiScriptRequestNot;
56 import fr.dyade.aaa.jndi2.server.RequestContext;
57 import fr.dyade.aaa.jndi2.server.RequestManager;
58 import fr.dyade.aaa.jndi2.server.Trace;
59
60 public class ReplicationManager
61     extends RequestManager implements UpdateListener {
62
63   public final static String JavaDoc INIT_REQUEST_TABLE = "initRequestTable";
64   public final static String JavaDoc SYNC_REQUEST_TABLE = "syncRequestTable";
65   public final static String JavaDoc WRITE_REQUEST_TABLE = "writeRequestTable";
66   public final static String JavaDoc SERVER_LIST = "serverList";
67
68   /**
69    * Identifier of the server that owns
70    * the root naming context.
71    */

72   private AgentId rootOwnerId;
73
74   /**
75    * List of the initially known
76    * servers.
77    */

78   private short[] serverIds;
79
80   /**
81    * List of the JNDI servers discovered by
82    * this server. These servers first receive
83    * an initialization notification that contain
84    * the naming data owned by this server. Then they
85    * receive the update notifications about
86    * the contexts owned by this server.
87    */

88   private transient Vector JavaDoc servers;
89
90   /**
91    * Table that contains the write requests
92    * forwarded to the owner.
93    * key = owner identifier (AgentId)
94    * value = requests list (RequestContextList)
95    */

96   private transient Hashtable JavaDoc writeRequestContextLists;
97
98   /**
99    * Table that contains the requests (read or write)
100    * waiting for the initialization of the context.
101    * key = id of the missing context
102    * value = requests list (RequestContextList)
103    */

104   private transient Hashtable JavaDoc initRequestContextLists;
105
106   /**
107    * Table that contains the requests (read or write)
108    * waiting for the synchronization of the context.
109    * key = owner identifier (AgentId)
110    * value = requests list (RequestContextList)
111    */

112   private transient Hashtable JavaDoc syncRequestContextLists;
113
114   public ReplicationManager(short[] serverIds) {
115     this.serverIds = serverIds;
116   }
117
118   /**
119    * Overrides the <code>JndiServer</code> behavior.
120    */

121   protected AgentId getRootOwnerId() {
122     return rootOwnerId;
123   }
124
125   public void agentInitialize(boolean firstTime) throws Exception JavaDoc {
126     if (firstTime) {
127       if (serverIds.length > 0) {
128         rootOwnerId =
129           DistributedJndiServer.getDefault(serverIds[0]);
130       } else {
131         rootOwnerId = getId();
132       }
133     }
134
135     super.agentInitialize(firstTime);
136
137     writeRequestContextLists =
138       (Hashtable JavaDoc)AgentServer.getTransaction().load(
139         WRITE_REQUEST_TABLE);
140     if (writeRequestContextLists == null) {
141       writeRequestContextLists = new Hashtable JavaDoc();
142     }
143
144     initRequestContextLists =
145       (Hashtable JavaDoc)AgentServer.getTransaction().load(
146         INIT_REQUEST_TABLE);
147     if (initRequestContextLists == null) {
148       initRequestContextLists = new Hashtable JavaDoc();
149     }
150
151     syncRequestContextLists =
152       (Hashtable JavaDoc)AgentServer.getTransaction().load(
153         SYNC_REQUEST_TABLE);
154     if (syncRequestContextLists == null) {
155       syncRequestContextLists = new Hashtable JavaDoc();
156     }
157
158     servers =
159       (Vector JavaDoc)AgentServer.getTransaction().load(
160         SERVER_LIST);
161     if (servers == null) {
162       servers = new Vector JavaDoc();
163       for (int i = 0; i < serverIds.length; i++) {
164         AgentId aid = DistributedJndiServer.getDefault(serverIds[i]);
165         servers.addElement(aid);
166         sendTo(aid, new InitJndiServerNot(
167           null, null, true));
168       }
169       saveServers();
170     }
171
172     getServerImpl().setUpdateListener(this);
173   }
174
175   /**
176    * Reacts to an update notification from an other JNDI server.
177    */

178   void doReact(AgentId from, JndiUpdateNot not) throws Exception JavaDoc {
179     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
180       Trace.logger.log(BasicLevel.DEBUG,
181                        "ReplicationManager[" + getId() + "].doReact(" +
182                        from + ',' + not + ')');
183     UpdateEvent updateEvent = not.getUpdateEvent();
184     try {
185       if (updateEvent instanceof BindEvent) {
186         onUpdateEvent(from, (BindEvent)updateEvent);
187       } else if (updateEvent instanceof RebindEvent) {
188         onUpdateEvent(from, (RebindEvent)updateEvent);
189       } else if (updateEvent instanceof UnbindEvent) {
190         onUpdateEvent(from, (UnbindEvent)updateEvent);
191       } else if (updateEvent instanceof CreateSubcontextEvent) {
192         onUpdateEvent(from, (CreateSubcontextEvent)updateEvent);
193       } else if (updateEvent instanceof DestroySubcontextEvent) {
194         onUpdateEvent(from, (DestroySubcontextEvent)updateEvent);
195 // } else if (updateEvent instanceof ChangeOwnerEvent) {
196
// onUpdateEvent(from, (ChangeOwnerEvent)updateEvent);
197
}
198     } catch (NotOwnerException exc) {
199       // This may happen after a change owner event
200
Trace.logger.log(BasicLevel.WARN,
201                        "Distributed jndi update warn:",
202                        exc);
203     } catch (NamingException JavaDoc exc) {
204       Trace.logger.log(BasicLevel.ERROR,
205                        "Distributed jndi update error:",
206                        exc);
207       throw new Error JavaDoc(exc.toString());
208     }
209   }
210
211   private void onUpdateEvent(AgentId from, BindEvent evt)
212     throws NamingException JavaDoc {
213     getServerImpl().bind(
214       getServerImpl().getNamingContext(
215         evt.getUpdatedContextId()),
216       evt.getName(),
217       evt.getObject(),
218       from);
219   }
220
221   private void onUpdateEvent(AgentId from, RebindEvent evt)
222     throws NamingException JavaDoc {
223     getServerImpl().rebind(
224       getServerImpl().getNamingContext(
225         evt.getUpdatedContextId()),
226       evt.getName(),
227       evt.getObject(),
228       from);
229   }
230
231   private void onUpdateEvent(AgentId from, UnbindEvent evt)
232     throws NamingException JavaDoc {
233     getServerImpl().unbind(
234       getServerImpl().getNamingContext(
235         evt.getUpdatedContextId()),
236       evt.getName(),
237       from);
238   }
239
240   private void onUpdateEvent(AgentId from, CreateSubcontextEvent evt)
241     throws NamingException JavaDoc {
242     getServerImpl().createSubcontext(
243       getServerImpl().getNamingContext(
244         evt.getUpdatedContextId()),
245       evt.getName(),
246       evt.getPath(),
247       evt.getContextId(),
248       evt.getOwnerId(),
249       from);
250   }
251
252   private void onUpdateEvent(AgentId from, DestroySubcontextEvent evt)
253     throws NamingException JavaDoc {
254     getServerImpl().destroySubcontext(
255       getServerImpl().getNamingContext(
256         evt.getUpdatedContextId()),
257       evt.getName(),
258       evt.getPath(),
259       from);
260   }
261
262   // private void onUpdateEvent(AgentId from, ChangeOwnerEvent evt)
263
// throws NamingException {
264
// NamingContextInfo[] contexts = evt.getNamingContexts();
265
// for (int i = 0; i < contexts.length; i++) {
266
// NamingContext nc = getServerImpl().getNamingContext(
267
// contexts[i].getNamingContext().getId());
268
// if (nc == null) {
269
// // The InitJndiServerNot sent by
270
// // the server that created this context may not
271
// // have been received.
272
// getServerImpl().addNamingContext(contexts[i]);
273
// retryRequestsWaitingForMissingContext(
274
// contexts[i].getNamingContext().getId());
275
// } else {
276
// getServerImpl().resetNamingContext(
277
// contexts[i].getNamingContext());
278
// // DF: must retry the sync and write
279
// // requests to the new owner.
280
// }
281
// }
282
// }
283

284   /**
285    * Overrides the <code>JndiServer</code> behavior.
286    * Send a JNDI request to the owner (JNDI server).
287    * Waits for the asynchronous reply.
288    */

289   protected JndiReply invokeOwner(AgentId owner,
290                                   RequestContext reqCtx) {
291     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
292       Trace.logger.log(BasicLevel.DEBUG,
293                        "ReplicationManager.invokeOwner(" +
294                        owner + ',' + reqCtx + ')');
295     JndiRequest request = reqCtx.getRequest();
296     if (request instanceof CreateSubcontextRequest) {
297       CreateSubcontextRequest csr =
298         (CreateSubcontextRequest)request;
299       request = new CreateRemoteSubcontextRequest(
300         csr.getName(),
301         getId());
302     }
303
304     sendTo(owner,
305            new JndiScriptRequestNot(
306              new JndiRequest[]{request}, true));
307     RequestContextList list =
308       (RequestContextList)writeRequestContextLists.get(owner);
309     if (list == null) {
310       list = new RequestContextList();
311       writeRequestContextLists.put(owner, list);
312     }
313     list.put(reqCtx);
314     saveWriteRequestTable();
315     return null;
316   }
317
318   void doReact(AgentId from, JndiScriptReplyNot not)
319     throws Exception JavaDoc {
320     onReply(from, not.getReplies()[0]);
321   }
322
323   void doReact(AgentId from, JndiReplyNot not)
324     throws Exception JavaDoc {
325     onReply(from, not.getReply());
326   }
327   
328   /**
329    * Reacts to the reply of a JNDI server that has been called
330    * as it is the owner of a naming context.
331    */

332   private void onReply(AgentId from, JndiReply reply) throws Exception JavaDoc {
333     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
334       Trace.logger.log(BasicLevel.DEBUG,
335                        "ReplicationManager[" +
336                        getId() + "].onReply(" +
337                        from + ',' + reply + ')');
338     RequestContextList ctxList =
339       (RequestContextList)writeRequestContextLists.get(from);
340     RequestContext ctx = ctxList.get();
341     ctxList.pop();
342     if (ctxList.getSize() == 0) {
343       writeRequestContextLists.remove(from);
344     }
345     if (ctx != null) {
346       ctx.reply(reply);
347       saveWriteRequestTable();
348     } else {
349       Trace.logger.log(BasicLevel.ERROR,
350                        "Reply context not found: " +
351                        from + ", " + reply);
352     }
353   }
354
355   void doReact(AgentId from,
356                InitJndiServerNot not) throws Exception JavaDoc {
357     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
358       Trace.logger.log(BasicLevel.DEBUG,
359                        "ReplicationManager.doReact(" +
360                        from + ',' + not + ')');
361     AgentId[] jndiServerIds = not.getJndiServerIds();
362     Vector JavaDoc initServers = new Vector JavaDoc();
363     if (jndiServerIds != null) {
364       for (int i = 0; i < jndiServerIds.length; i++) {
365         if (servers.indexOf(jndiServerIds[i]) < 0) {
366           initServers.addElement(jndiServerIds[i]);
367         }
368       }
369     }
370     
371     // Send back an init notif if:
372
// - the init notif is a request
373
// - or the server 'from' is unknown
374
if (not.isRequest() || servers.indexOf(from) < 0) {
375       initServers.addElement(from);
376     }
377
378     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
379       Trace.logger.log(BasicLevel.DEBUG,
380                        " -> initServers = " + initServers);
381
382     if (initServers.size() > 0) {
383       AgentId[] localJndiServerIds = new AgentId[servers.size()];
384       servers.copyInto(localJndiServerIds);
385       NamingContextInfo[] localContexts =
386         getServerImpl().copyNamingContexts(getId());
387       int serversInitialLength = servers.size();
388       for (int i = 0; i < initServers.size(); i++) {
389         AgentId newServerId =
390           (AgentId)initServers.elementAt(i);
391         sendTo(newServerId, new InitJndiServerNot(
392           localJndiServerIds,
393           localContexts,
394           (!from.equals(newServerId))));
395         if (servers.indexOf(newServerId) < 0) {
396           servers.addElement(newServerId);
397         }
398         // else the server has already been registered.
399
// (it is a recovering server)
400
}
401       if (servers.size() > serversInitialLength) {
402         saveServers();
403       }
404     }
405
406     NamingContextInfo[] contexts = not.getContexts();
407     if (contexts != null) {
408       Vector JavaDoc newNames = new Vector JavaDoc();
409       for (int i = 0; i < contexts.length; i++) {
410         NamingContext nc = getServerImpl().getNamingContext(
411           contexts[i].getNamingContext().getId());
412         if (nc == null) {
413           getServerImpl().addNamingContext(contexts[i]);
414           newNames.addElement(contexts[i].getCompositeName());
415         }
416         // Else the naming context has already been
417
// added by an other server that is the (new)
418
// owner of this context.
419
}
420       
421       Vector JavaDoc retryNames = new Vector JavaDoc();
422       Vector JavaDoc retryLists = new Vector JavaDoc();
423       Enumeration JavaDoc names = initRequestContextLists.keys();
424       Enumeration JavaDoc lists = initRequestContextLists.elements();
425       while (lists.hasMoreElements()) {
426         CompositeName JavaDoc name =
427           (CompositeName JavaDoc)names.nextElement();
428         RequestContextList ctxList =
429           (RequestContextList)lists.nextElement();
430         boolean retry = false;
431         for (int i = 0; i < newNames.size(); i++) {
432           CompositeName JavaDoc newName =
433             (CompositeName JavaDoc)newNames.elementAt(i);
434           if (name.startsWith(newName)) {
435             retry = true;
436             break;
437           }
438         }
439         if (retry) {
440           retryNames.addElement(name);
441           retryLists.addElement(ctxList);
442         }
443       }
444       
445       for (int i = 0; i < retryNames.size(); i++) {
446         CompositeName JavaDoc name =
447           (CompositeName JavaDoc)retryNames.elementAt(i);
448         RequestContextList ctxList =
449           (RequestContextList)retryLists.elementAt(i);
450         initRequestContextLists.remove(name);
451         while (ctxList.getSize() > 0) {
452           RequestContext reqCtx = ctxList.get();
453           JndiReply reply = invoke(reqCtx);
454           if (reply != null) {
455             reqCtx.reply(reply);
456           }
457           ctxList.pop();
458         }
459       }
460       saveInitRequestTable();
461     }
462   }
463
464   protected JndiReply onMissingContext(MissingContextException mce,
465                                        RequestContext reqCtx) {
466     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
467       Trace.logger.log(BasicLevel.DEBUG,
468                        "ReplicationManager.onMissingContext(" +
469                        mce + ',' + reqCtx + ')');
470     RequestContextList ctxList =
471       (RequestContextList)initRequestContextLists.get(
472         mce.getName());
473     if (ctxList == null) {
474       ctxList = new RequestContextList();
475       if (Trace.logger.isLoggable(BasicLevel.DEBUG))
476         Trace.logger.log(BasicLevel.DEBUG,
477                          " -> add a waiting request context: " +
478                          mce.getName());
479       initRequestContextLists.put(
480         mce.getName(), ctxList);
481     }
482     ctxList.put(reqCtx);
483     saveInitRequestTable();
484     return null;
485   }
486
487   protected JndiReply onMissingRecord(MissingRecordException mre,
488                                       RequestContext reqCtx) {
489     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
490       Trace.logger.log(BasicLevel.DEBUG,
491                        "ReplicationManager.onMissingRecord(" +
492                        mre + ',' + reqCtx + ')');
493     CompositeName JavaDoc resolvedName =
494       (CompositeName JavaDoc)mre.getNameNotFoundException().getResolvedName();
495     if (mre.getOwnerId().equals(getId()) ||
496         resolvedName.equals(reqCtx.getResolvedName())) {
497       // The resolved context has already been updated.
498
return new JndiError(mre.getNameNotFoundException());
499     } else {
500       reqCtx.setResolvedName(resolvedName);
501       synchronizeRequest(
502         (AgentId)mre.getOwnerId(), reqCtx);
503       return null;
504     }
505   }
506
507   private void synchronizeRequest(AgentId owner,
508                                   RequestContext reqCtx) {
509     if (Trace.logger.isLoggable(BasicLevel.DEBUG))
510       Trace.logger.log(
511         BasicLevel.DEBUG,
512         "ReplicationManager.synchronizeRequest(" +
513         owner + ',' + reqCtx + ')');
514     sendTo(owner, new SyncRequestNot());
515     RequestContextList list =
516       (RequestContextList)syncRequestContextLists.get(owner);
517     if (list == null) {
518       list = new RequestContextList();
519       syncRequestContextLists.put(owner, list);
520     }
521     list.put(reqCtx);
522     saveSyncRequestTable();
523   }
524
525   void doReact(AgentId from, SyncRequestNot not) {
526     sendTo(from, new SyncReplyNot());
527   }
528
529   void doReact(AgentId from, SyncReplyNot not) {
530     RequestContextList ctxList =
531       (RequestContextList)syncRequestContextLists.get(from);
532     RequestContext ctx = ctxList.get();
533     ctxList.pop();
534     if (ctxList.getSize() == 0) {
535       syncRequestContextLists.remove(from);
536     }
537     if (ctx != null) {
538       JndiReply reply = invoke(ctx);
539       if (reply != null) {
540         ctx.reply(reply);
541         saveSyncRequestTable();
542       }
543     }
544   }
545
546   public void onUpdate(UpdateEvent event) {
547     for (int i = 0; i < servers.size(); i++) {
548       AgentId aid = (AgentId)servers.elementAt(i);
549       sendTo(aid, new JndiUpdateNot(event));
550     }
551   }
552
553   protected void createSubcontext(CreateSubcontextRequest request)
554     throws NamingException JavaDoc {
555     if (request instanceof CreateRemoteSubcontextRequest) {
556       createRemoteSubcontext((CreateRemoteSubcontextRequest)request);
557     } else {
558       super.createSubcontext(request);
559     }
560   }
561
562   private void createRemoteSubcontext(CreateRemoteSubcontextRequest request)
563     throws NamingException JavaDoc {
564     getServerImpl().createSubcontext(
565       request.getName(), request.getOwnerId());
566   }
567
568   protected void changeOwner(ChangeOwnerRequest request)
569     throws NamingException JavaDoc {
570     super.changeOwner(request);
571
572     writeRequestContextLists.remove(request.getOwnerId());
573     syncRequestContextLists.remove(request.getOwnerId());
574     // DF: must reply to those requests because
575
// this server is the new owner.
576
}
577
578   private void saveInitRequestTable() {
579     try {
580       AgentServer.getTransaction().save(
581         initRequestContextLists, INIT_REQUEST_TABLE);
582     } catch (IOException JavaDoc exc) {
583       throw new Error JavaDoc(exc.toString());
584     }
585   }
586
587   private void saveWriteRequestTable() {
588     try {
589       AgentServer.getTransaction().save(
590         writeRequestContextLists, WRITE_REQUEST_TABLE);
591     } catch (IOException JavaDoc exc) {
592       throw new Error JavaDoc(exc.toString());
593     }
594   }
595
596   private void saveSyncRequestTable() {
597     try {
598       AgentServer.getTransaction().save(
599         syncRequestContextLists, SYNC_REQUEST_TABLE);
600     } catch (IOException JavaDoc exc) {
601       throw new Error JavaDoc(exc.toString());
602     }
603   }
604
605   private void saveServers() {
606     try {
607       AgentServer.getTransaction().save(servers, SERVER_LIST);
608     } catch (IOException JavaDoc exc) {
609       throw new Error JavaDoc(exc.toString());
610     }
611   }
612
613   static class RequestContextList
614       implements java.io.Serializable JavaDoc {
615     private Vector JavaDoc list;
616
617     RequestContextList() {
618       this.list = new Vector JavaDoc();
619     }
620
621     void put(RequestContext ctx) {
622       list.addElement(ctx);
623     }
624
625     RequestContext get() {
626       if( list.size() > 0) {
627         return (RequestContext)list.elementAt(0);
628       } else {
629         return null;
630       }
631     }
632
633     void pop() {
634       if( list.size() > 0) {
635         list.removeElementAt(0);
636       }
637     }
638
639     int getSize() {
640       return list.size();
641     }
642
643     public String JavaDoc toString() {
644       return '(' + super.toString() +
645         ",list=" + list + ')';
646     }
647   }
648 }
649
Popular Tags