1 21 package com.db4o; 22 23 import com.db4o.ext.*; 24 import com.db4o.inside.replication.*; 25 import com.db4o.query.*; 26 import com.db4o.replication.*; 27 28 31 public class ReplicationImpl implements ReplicationProcess { 32 33 final YapStream _peerA; 34 35 final Transaction _transA; 36 37 final YapStream _peerB; 38 39 final Transaction _transB; 40 41 final ReplicationConflictHandler _conflictHandler; 42 43 final ReplicationRecord _record; 44 45 private int _direction; 46 47 private static final int IGNORE = 0; 48 49 private static final int TO_B = -1; 50 51 private static final int TO_A = 1; 52 53 private static final int CHECK_CONFLICT = -99; 54 55 public ReplicationImpl(YapStream peerA, ObjectContainer peerB, 56 ReplicationConflictHandler conflictHandler) { 57 58 if(conflictHandler == null){ 59 throw new NullPointerException (); 62 } 63 64 synchronized (peerA.ext().lock()) { 65 synchronized (peerB.ext().lock()) { 66 67 _peerA = peerA; 68 _transA = peerA.checkTransaction(null); 69 70 _peerB = (YapStream) peerB; 71 _transB = _peerB.checkTransaction(null); 72 73 MigrationConnection mgc = new MigrationConnection(_peerA, _peerB); 74 75 _peerA.i_handlers.i_migration = mgc; 76 _peerA.i_handlers.i_replication = this; 77 _peerA._replicationCallState = YapConst.OLD; 78 79 _peerB.i_handlers.i_migration = mgc; 80 _peerB.i_handlers.i_replication = this; 81 _peerB._replicationCallState = YapConst.OLD; 82 83 _conflictHandler = conflictHandler; 84 85 _record = ReplicationRecord.beginReplication(_transA, _transB); 86 } 87 } 88 89 } 90 91 private int bindAndSet(Transaction trans, YapStream peer, YapObject ref, Object sourceObject){ 92 if(sourceObject instanceof Db4oTypeImpl){ 93 Db4oTypeImpl db4oType = (Db4oTypeImpl)sourceObject; 94 if(! db4oType.canBind()){ 95 Db4oTypeImpl targetObject = (Db4oTypeImpl)ref.getObject(); 96 targetObject.replicateFrom(sourceObject); 97 return ref.getID(); 98 } 99 } 100 peer.bind2(ref, sourceObject); 101 return peer.setAfterReplication(trans, sourceObject, 1, true); 102 } 103 104 public void checkConflict(Object obj) { 105 int temp = _direction; 106 _direction = CHECK_CONFLICT; 107 replicate(obj); 108 _direction = temp; 109 } 110 111 public void commit() { 112 synchronized (_peerA.lock()) { 113 synchronized (_peerB.lock()) { 114 115 _peerA.commit(); 116 _peerB.commit(); 117 118 endReplication(); 119 120 long versionA = _peerA.currentVersion(); 121 long versionB = _peerB.currentVersion(); 122 123 _record._version = (versionA > versionB) ? versionA :versionB; 124 125 _peerA.raiseVersion(_record._version + 1); 126 _peerB.raiseVersion(_record._version + 1); 127 128 _record.store(_peerA); 129 _record.store(_peerB); 130 } 131 } 132 } 133 134 private void endReplication() { 135 136 _peerA._replicationCallState = YapConst.NONE; 137 _peerA.i_handlers.i_migration = null; 138 _peerA.i_handlers.i_replication = null; 139 140 _peerA._replicationCallState = YapConst.NONE; 141 _peerB.i_handlers.i_migration = null; 142 _peerB.i_handlers.i_replication = null; 143 } 144 145 private int idInCaller(YapStream caller, YapObject referenceA, YapObject referenceB){ 146 return (caller == _peerA) ? referenceA.getID() : referenceB.getID(); 147 } 148 149 private int ignoreOrCheckConflict() { 150 if (_direction == CHECK_CONFLICT) { 151 return CHECK_CONFLICT; 152 } 153 return IGNORE; 154 } 155 156 private boolean isInConflict(long versionA, long versionB) { 157 if(versionA > _record._version && versionB > _record._version) { 158 return true; 159 } 160 if(versionB > _record._version && _direction == TO_B) { 161 return true; 162 } 163 if(versionA > _record._version && _direction == TO_A) { 164 return true; 165 } 166 return false; 167 } 168 169 private long lastSynchronization() { 170 return _record._version; 171 } 172 173 public ObjectContainer peerA() { 174 return _peerA; 175 } 176 177 public ObjectContainer peerB() { 178 return _peerB; 179 } 180 181 public void replicate(Object obj) { 182 183 186 190 YapStream stream = _peerB; 191 192 if (_peerB.isStored(obj)) { 193 if (!_peerA.isStored(obj)) { 194 stream = _peerA; 195 } 196 } 197 198 stream.set(obj); 199 } 200 201 public void rollback() { 202 _peerA.rollback(); 203 _peerB.rollback(); 204 endReplication(); 205 } 206 207 public void setDirection(ObjectContainer replicateFrom, 208 ObjectContainer replicateTo) { 209 if (replicateFrom == _peerA && replicateTo == _peerB) { 210 _direction = TO_B; 211 } 212 if (replicateFrom == _peerB && replicateTo == _peerA) { 213 _direction = TO_A; 214 } 215 } 216 217 private void shareBinding(YapObject sourceReference, YapObject referenceA, Object objectA, YapObject referenceB, Object objectB) { 218 if(sourceReference == null) { 219 return; 220 } 221 if(objectA instanceof Db4oTypeImpl){ 222 if(! ((Db4oTypeImpl)objectA).canBind() ){ 223 return; 224 } 225 } 226 227 if(sourceReference == referenceA) { 228 _peerB.bind2(referenceB, objectA); 229 }else { 230 _peerA.bind2(referenceA, objectB); 231 } 232 } 233 234 private int toA() { 235 if (_direction == CHECK_CONFLICT) { 236 return CHECK_CONFLICT; 237 } 238 if (_direction != TO_B) { 239 return TO_A; 240 } 241 return IGNORE; 242 } 243 244 private int toB() { 245 if (_direction == CHECK_CONFLICT) { 246 return CHECK_CONFLICT; 247 } 248 if (_direction != TO_A) { 249 return TO_B; 250 } 251 return IGNORE; 252 } 253 254 255 261 int tryToHandle(YapStream caller, Object obj) { 262 263 int notProcessed = 0; 264 YapStream other = null; 265 YapObject sourceReference = null; 266 267 if(caller == _peerA){ 268 other = _peerB; 269 if(_direction == TO_B){ 270 notProcessed = -1; 271 } 272 }else{ 273 other = _peerA; 274 if(_direction == TO_A){ 275 notProcessed = -1; 276 } 277 } 278 279 synchronized (other.i_lock) { 280 281 Object objectA = obj; 282 Object objectB = obj; 283 284 YapObject referenceA = _peerA.getYapObject(obj); 285 YapObject referenceB = _peerB.getYapObject(obj); 286 287 VirtualAttributes attA = null; 288 VirtualAttributes attB = null; 289 290 if (referenceA == null) { 291 if(referenceB == null) { 292 return notProcessed; 293 } 294 295 sourceReference = referenceB; 296 297 attB = referenceB.virtualAttributes(_transB); 298 if(attB == null){ 299 return notProcessed; 300 } 301 302 Object [] arr = _transA.objectAndYapObjectBySignature(attB.i_uuid, 303 attB.i_database.i_signature); 304 if (arr[0] == null) { 305 return notProcessed; 306 } 307 308 referenceA = (YapObject) arr[1]; 309 objectA = arr[0]; 310 311 attA = referenceA.virtualAttributes(_transA); 312 }else { 313 314 attA = referenceA.virtualAttributes(_transA); 315 if(attA == null){ 316 return notProcessed; 317 } 318 319 if(referenceB == null) { 320 321 sourceReference = referenceA; 322 323 Object [] arr = _transB.objectAndYapObjectBySignature(attA.i_uuid, 324 attA.i_database.i_signature); 325 326 if (arr[0] == null) { 327 return notProcessed; 328 } 329 330 referenceB = (YapObject) arr[1]; 331 objectB = arr[0]; 332 333 } 334 335 attB = referenceB.virtualAttributes(_transB); 336 } 337 338 if(attA == null || attB == null){ 339 return notProcessed; 340 } 341 342 if(objectA == objectB) { 343 if(caller == _peerA && _direction == TO_B) { 344 return -1; 345 } 346 if(caller == _peerB && _direction == TO_A) { 347 return -1; 348 } 349 return idInCaller(caller, referenceA, referenceB); 350 } 351 352 _peerA.refresh(objectA, 1); 353 _peerB.refresh(objectB, 1); 354 355 if (attA.i_version <= _record._version 356 && attB.i_version <= _record._version) { 357 358 if (_direction != CHECK_CONFLICT) { 359 shareBinding(sourceReference, referenceA, objectA, referenceB, objectB); 360 } 361 return idInCaller(caller, referenceA, referenceB); 362 } 363 364 int direction = ignoreOrCheckConflict(); 365 366 if (isInConflict(attA.i_version, attB.i_version)) { 367 368 Object prevailing = _conflictHandler.resolveConflict(this, 369 objectA, objectB); 370 371 if (prevailing == objectA) { 372 direction = (_direction == TO_A) ? IGNORE : toB(); 373 } 374 375 if (prevailing == objectB) { 376 direction = (_direction == TO_B) ? IGNORE : toA(); 377 } 378 379 if (direction == IGNORE) { 380 return -1; 381 } 382 383 } else { 384 direction = attB.i_version > _record._version ? toA(): toB(); 385 } 386 387 if (direction == TO_A) { 388 if (!referenceB.isActive()) { 389 referenceB.activate(_transB, objectB, 1, false); 390 } 391 int idA = bindAndSet(_transA, _peerA, referenceA, objectB); 392 if(caller == _peerA){ 393 return idA; 394 } 395 } 396 397 if (direction == TO_B) { 398 if (!referenceA.isActive()) { 399 referenceA.activate(_transA, objectA, 1, false); 400 } 401 int idB = bindAndSet(_transB, _peerB, referenceB, objectA); 402 if(caller == _peerB){ 403 return idB; 404 } 405 } 406 407 return idInCaller(caller, referenceA, referenceB); 408 } 409 410 } 411 412 public void whereModified(Query query) { 413 query.descend(VirtualField.VERSION).constrain( 414 new Long (lastSynchronization())).greater(); 415 } 416 417 } | Popular Tags |