KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > db4o > ReplicationImpl


1 /* Copyright (C) 2004 - 2006 db4objects Inc. http://www.db4o.com
2
3 This file is part of the db4o open source object database.
4
5 db4o is free software; you can redistribute it and/or modify it under
6 the terms of version 2 of the GNU General Public License as published
7 by the Free Software Foundation and as clarified by db4objects' GPL
8 interpretation policy, available at
9 http://www.db4o.com/about/company/legalpolicies/gplinterpretation/
10 Alternatively you can write to db4objects, Inc., 1900 S Norfolk Street,
11 Suite 350, San Mateo, CA 94403, USA.
12
13 db4o is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or
15 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 for more details.
17
18 You should have received a copy of the GNU General Public License along
19 with this program; if not, write to the Free Software Foundation, Inc.,
20 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */

21 package com.db4o;
22
23 import com.db4o.ext.*;
24 import com.db4o.inside.replication.*;
25 import com.db4o.query.*;
26 import com.db4o.replication.*;
27
28 /**
29  * @exclude
30  */

31 public class ReplicationImpl implements ReplicationProcess {
32
33     final YapStream _peerA;
34
35     final Transaction _transA;
36
37     final YapStream _peerB;
38
39     final Transaction _transB;
40
41     final ReplicationConflictHandler _conflictHandler;
42
43     final ReplicationRecord _record;
44
45     private int _direction;
46
47     private static final int IGNORE = 0;
48
49     private static final int TO_B = -1;
50
51     private static final int TO_A = 1;
52
53     private static final int CHECK_CONFLICT = -99;
54     
55     public ReplicationImpl(YapStream peerA, ObjectContainer peerB,
56             ReplicationConflictHandler conflictHandler) {
57         
58         if(conflictHandler == null){
59             // We don't allow starting replication without a
60
// conflict handler, so we don't get late failures.
61
throw new NullPointerException JavaDoc();
62         }
63         
64         synchronized (peerA.ext().lock()) {
65             synchronized (peerB.ext().lock()) {
66
67                 _peerA = peerA;
68                 _transA = peerA.checkTransaction(null);
69
70                 _peerB = (YapStream) peerB;
71                 _transB = _peerB.checkTransaction(null);
72
73                 MigrationConnection mgc = new MigrationConnection(_peerA, _peerB);
74
75                 _peerA.i_handlers.i_migration = mgc;
76                 _peerA.i_handlers.i_replication = this;
77                 _peerA._replicationCallState = YapConst.OLD;
78
79                 _peerB.i_handlers.i_migration = mgc;
80                 _peerB.i_handlers.i_replication = this;
81                 _peerB._replicationCallState = YapConst.OLD;
82
83                 _conflictHandler = conflictHandler;
84
85                 _record = ReplicationRecord.beginReplication(_transA, _transB);
86             }
87         }
88         
89     }
90
91     private int bindAndSet(Transaction trans, YapStream peer, YapObject ref, Object JavaDoc sourceObject){
92         if(sourceObject instanceof Db4oTypeImpl){
93             Db4oTypeImpl db4oType = (Db4oTypeImpl)sourceObject;
94             if(! db4oType.canBind()){
95                 Db4oTypeImpl targetObject = (Db4oTypeImpl)ref.getObject();
96                 targetObject.replicateFrom(sourceObject);
97                 return ref.getID();
98             }
99         }
100         peer.bind2(ref, sourceObject);
101         return peer.setAfterReplication(trans, sourceObject, 1, true);
102     }
103
104     public void checkConflict(Object JavaDoc obj) {
105         int temp = _direction;
106         _direction = CHECK_CONFLICT;
107         replicate(obj);
108         _direction = temp;
109     }
110
111     public void commit() {
112         synchronized (_peerA.lock()) {
113             synchronized (_peerB.lock()) {
114         
115                 _peerA.commit();
116                 _peerB.commit();
117         
118                 endReplication();
119         
120                 long versionA = _peerA.currentVersion();
121                 long versionB = _peerB.currentVersion();
122         
123                 _record._version = (versionA > versionB) ? versionA :versionB;
124                 
125                 _peerA.raiseVersion(_record._version + 1);
126                 _peerB.raiseVersion(_record._version + 1);
127         
128                 _record.store(_peerA);
129                 _record.store(_peerB);
130             }
131         }
132     }
133
134     private void endReplication() {
135         
136         _peerA._replicationCallState = YapConst.NONE;
137         _peerA.i_handlers.i_migration = null;
138         _peerA.i_handlers.i_replication = null;
139         
140         _peerA._replicationCallState = YapConst.NONE;
141         _peerB.i_handlers.i_migration = null;
142         _peerB.i_handlers.i_replication = null;
143     }
144     
145     private int idInCaller(YapStream caller, YapObject referenceA, YapObject referenceB){
146         return (caller == _peerA) ? referenceA.getID() : referenceB.getID();
147     }
148
149     private int ignoreOrCheckConflict() {
150         if (_direction == CHECK_CONFLICT) {
151             return CHECK_CONFLICT;
152         }
153         return IGNORE;
154     }
155     
156     private boolean isInConflict(long versionA, long versionB) {
157         if(versionA > _record._version && versionB > _record._version) {
158             return true;
159         }
160         if(versionB > _record._version && _direction == TO_B) {
161             return true;
162         }
163         if(versionA > _record._version && _direction == TO_A) {
164             return true;
165         }
166         return false;
167     }
168
169     private long lastSynchronization() {
170         return _record._version;
171     }
172     
173     public ObjectContainer peerA() {
174         return _peerA;
175     }
176
177     public ObjectContainer peerB() {
178         return _peerB;
179     }
180     
181     public void replicate(Object JavaDoc obj) {
182
183         // When there is an active replication process, the set() method
184
// will call back to the #process() method in this class.
185

186         // This detour is necessary, since #set() has to handle all cases
187
// anyway, for members of the replicated object, especially the
188
// prevention of endless loops in case of circular references.
189

190         YapStream stream = _peerB;
191
192         if (_peerB.isStored(obj)) {
193             if (!_peerA.isStored(obj)) {
194                 stream = _peerA;
195             }
196         }
197
198         stream.set(obj);
199     }
200
201     public void rollback() {
202         _peerA.rollback();
203         _peerB.rollback();
204         endReplication();
205     }
206
207     public void setDirection(ObjectContainer replicateFrom,
208             ObjectContainer replicateTo) {
209         if (replicateFrom == _peerA && replicateTo == _peerB) {
210             _direction = TO_B;
211         }
212         if (replicateFrom == _peerB && replicateTo == _peerA) {
213             _direction = TO_A;
214         }
215     }
216
217     private void shareBinding(YapObject sourceReference, YapObject referenceA, Object JavaDoc objectA, YapObject referenceB, Object JavaDoc objectB) {
218         if(sourceReference == null) {
219             return;
220         }
221         if(objectA instanceof Db4oTypeImpl){
222             if(! ((Db4oTypeImpl)objectA).canBind() ){
223                 return;
224             }
225         }
226         
227         if(sourceReference == referenceA) {
228             _peerB.bind2(referenceB, objectA);
229         }else {
230             _peerA.bind2(referenceA, objectB);
231         }
232     }
233
234     private int toA() {
235         if (_direction == CHECK_CONFLICT) {
236             return CHECK_CONFLICT;
237         }
238         if (_direction != TO_B) {
239             return TO_A;
240         }
241         return IGNORE;
242     }
243
244     private int toB() {
245         if (_direction == CHECK_CONFLICT) {
246             return CHECK_CONFLICT;
247         }
248         if (_direction != TO_A) {
249             return TO_B;
250         }
251         return IGNORE;
252     }
253     
254     
255     /**
256      * called by YapStream.set()
257      * @return id of reference in caller or 0 if not handled or -1
258      * if #set() should stop processing because of a direction
259      * setting.
260      */

261     int tryToHandle(YapStream caller, Object JavaDoc obj) {
262         
263         int notProcessed = 0;
264         YapStream other = null;
265         YapObject sourceReference = null;
266         
267         if(caller == _peerA){
268             other = _peerB;
269             if(_direction == TO_B){
270                 notProcessed = -1;
271             }
272         }else{
273             other = _peerA;
274             if(_direction == TO_A){
275                 notProcessed = -1;
276             }
277         }
278         
279         synchronized (other.i_lock) {
280             
281             Object JavaDoc objectA = obj;
282             Object JavaDoc objectB = obj;
283             
284             YapObject referenceA = _peerA.getYapObject(obj);
285             YapObject referenceB = _peerB.getYapObject(obj);
286             
287             VirtualAttributes attA = null;
288             VirtualAttributes attB = null;
289             
290             if (referenceA == null) {
291                 if(referenceB == null) {
292                     return notProcessed;
293                 }
294                 
295                 sourceReference = referenceB;
296                 
297                 attB = referenceB.virtualAttributes(_transB);
298                 if(attB == null){
299                     return notProcessed;
300                 }
301                 
302                 Object JavaDoc[] arr = _transA.objectAndYapObjectBySignature(attB.i_uuid,
303                         attB.i_database.i_signature);
304                 if (arr[0] == null) {
305                     return notProcessed;
306                 }
307                 
308                 referenceA = (YapObject) arr[1];
309                 objectA = arr[0];
310                 
311                 attA = referenceA.virtualAttributes(_transA);
312             }else {
313                 
314                 attA = referenceA.virtualAttributes(_transA);
315                 if(attA == null){
316                     return notProcessed;
317                 }
318                 
319                 if(referenceB == null) {
320                     
321                     sourceReference = referenceA;
322                     
323                     Object JavaDoc[] arr = _transB.objectAndYapObjectBySignature(attA.i_uuid,
324                             attA.i_database.i_signature);
325                     
326                     if (arr[0] == null) {
327                         return notProcessed;
328                     }
329                     
330                     referenceB = (YapObject) arr[1];
331                     objectB = arr[0];
332                     
333                 }
334                 
335                 attB = referenceB.virtualAttributes(_transB);
336             }
337             
338             if(attA == null || attB == null){
339                 return notProcessed;
340             }
341             
342             if(objectA == objectB) {
343                 if(caller == _peerA && _direction == TO_B) {
344                     return -1;
345                 }
346                 if(caller == _peerB && _direction == TO_A) {
347                     return -1;
348                 }
349                 return idInCaller(caller, referenceA, referenceB);
350             }
351             
352             _peerA.refresh(objectA, 1);
353             _peerB.refresh(objectB, 1);
354             
355             if (attA.i_version <= _record._version
356                     && attB.i_version <= _record._version) {
357
358                 if (_direction != CHECK_CONFLICT) {
359                     shareBinding(sourceReference, referenceA, objectA, referenceB, objectB);
360                 }
361                 return idInCaller(caller, referenceA, referenceB);
362             }
363
364             int direction = ignoreOrCheckConflict();
365
366             if (isInConflict(attA.i_version, attB.i_version)) {
367                 
368                 Object JavaDoc prevailing = _conflictHandler.resolveConflict(this,
369                         objectA, objectB);
370
371                 if (prevailing == objectA) {
372                     direction = (_direction == TO_A) ? IGNORE : toB();
373                 }
374
375                 if (prevailing == objectB) {
376                     direction = (_direction == TO_B) ? IGNORE : toA();
377                 }
378
379                 if (direction == IGNORE) {
380                     return -1;
381                 }
382
383             } else {
384                 direction = attB.i_version > _record._version ? toA(): toB();
385             }
386
387             if (direction == TO_A) {
388                 if (!referenceB.isActive()) {
389                     referenceB.activate(_transB, objectB, 1, false);
390                 }
391                 int idA = bindAndSet(_transA, _peerA, referenceA, objectB);
392                 if(caller == _peerA){
393                     return idA;
394                 }
395             }
396
397             if (direction == TO_B) {
398                 if (!referenceA.isActive()) {
399                     referenceA.activate(_transA, objectA, 1, false);
400                 }
401                 int idB = bindAndSet(_transB, _peerB, referenceB, objectA);
402                 if(caller == _peerB){
403                     return idB;
404                 }
405             }
406
407             return idInCaller(caller, referenceA, referenceB);
408         }
409
410     }
411     
412     public void whereModified(Query query) {
413         query.descend(VirtualField.VERSION).constrain(
414                 new Long JavaDoc(lastSynchronization())).greater();
415     }
416  
417 }
Popular Tags