1 18 package sync4j.server.engine; 19 20 import java.security.Principal ; 21 import java.sql.Timestamp ; 22 import java.util.*; 23 import java.util.Date ; 24 import java.util.Map ; 25 import java.util.logging.Level ; 26 import java.util.logging.Logger ; 27 28 import sync4j.framework.config.ConfigurationConstants; 29 import sync4j.framework.core.*; 30 import sync4j.framework.database.Database; 31 import sync4j.framework.engine.*; 32 import sync4j.framework.engine.source.CommittableSyncSource; 33 import sync4j.framework.engine.source.MemorySyncSource; 34 import sync4j.framework.engine.source.SyncSource; 35 import sync4j.framework.engine.source.SyncSourceException; 36 import sync4j.framework.logging.Sync4jLogger; 37 import sync4j.framework.protocol.CommandIdGenerator; 38 import sync4j.framework.security.Officer; 39 import sync4j.framework.security.Sync4jPrincipal; 40 import sync4j.framework.server.*; 41 import sync4j.framework.server.session.SyncSourceState; 42 import sync4j.framework.server.store.NotFoundException; 43 import sync4j.framework.server.store.PersistentStore; 44 import sync4j.framework.server.store.PersistentStoreException; 45 import sync4j.framework.tools.Base64; 46 import sync4j.framework.tools.MD5; 47 48 import sync4j.server.config.Configuration; 49 50 51 59 public class Sync4jEngine 60 extends AbstractSyncEngine 61 implements java.io.Serializable , ConfigurationConstants { 62 63 65 public static final String LOG_NAME = "engine"; 66 67 69 72 private Configuration configuration = null; 73 74 public Configuration getConfiguration() { 75 return configuration; 76 } 77 78 81 private Officer officer = null; 82 83 public Officer getOfficer() { 84 return this.officer; 85 } 86 87 90 private PersistentStore store = null; 91 92 96 public PersistentStore getStore() { 97 return store; 98 } 99 100 104 public void setStore(PersistentStore store) { 105 this.store = store; 106 } 107 108 111 private ArrayList clientSources = new ArrayList(); 112 private HashMap serverSources = new HashMap(); 113 114 117 private HashMap operations = null; 118 119 122 private ArrayList status = null; 123 124 127 protected transient Logger log = null; 128 129 133 private String syncMLVerProto = null; 134 public void setSyncMLVerProto(String syncMLVerProto) { 135 this.syncMLVerProto = syncMLVerProto; 136 } 137 138 private boolean isLastMessage = false; 139 public void setLastMessage(final boolean isLastMessage) { 140 this.isLastMessage = isLastMessage; 141 } 142 143 public boolean isLastMessage() { 144 return this.isLastMessage; 145 } 146 147 150 private SyncSourceState syncSourceState; 151 152 155 private Map clientMappings; 156 157 159 162 protected Sync4jEngine() { 163 syncSourceState = new SyncSourceState(); 164 } 165 166 173 public Sync4jEngine(Configuration configuration) { 174 this(); 175 176 log = Sync4jLogger.getLogger(LOG_NAME); 177 178 this.configuration = configuration; 179 180 store = configuration.getStore(); 184 185 officer = configuration.getOfficer(); 189 190 SyncStrategy strategy = configuration.getStrategy(); 194 195 setStrategy(strategy); 196 } 197 198 200 203 private CommandIdGenerator cmdIdGenerator = null; 204 205 210 public void setCommandIdGenerator(CommandIdGenerator cmdIdGenerator) { 211 this.cmdIdGenerator = cmdIdGenerator; 212 } 213 214 215 220 public CommandIdGenerator getCommandIdGenerator() { 221 return cmdIdGenerator; 222 } 223 224 226 229 private HashMap dbs = new HashMap(); 230 231 public void setDbs(HashMap dbs) { 232 if (this.dbs != null) { 233 this.dbs.clear(); 234 } else { 235 this.dbs = new HashMap(dbs.size()); 236 } 237 this.dbs.putAll(dbs); 238 } 239 240 public void setDbs(Database[] dbs) { 241 if (this.dbs == null) { 242 this.dbs = new HashMap(dbs.length); 243 } 244 245 for (int i=0; ((dbs != null) && (i<dbs.length)); ++i) { 246 this.dbs.put(dbs[i].getName(), dbs[i]); 247 } 248 } 249 250 public Database[] getDbs() { 251 if (this.dbs == null) { 252 return new Database[0]; 253 } 254 255 Iterator i = this.dbs.values().iterator(); 256 257 Database[] ret = new Database[this.dbs.size()]; 258 259 int j = 0; 260 while(i.hasNext()) { 261 ret[j++] = (Database)i.next(); 262 } 263 264 return ret; 265 } 266 267 270 private Timestamp syncTimestamp; 271 272 277 public void setSyncTimestamp(Date syncTimestamp) { 278 this.syncTimestamp = new Timestamp (syncTimestamp.getTime()); 282 } 283 284 285 287 294 public void sync(final Sync4jPrincipal principal) 295 throws Sync4jException { 296 if (log.isLoggable(Level.INFO)) { 297 log.info("Starting synchronization ..."); 298 } 299 300 SyncStrategy syncStrategy = getStrategy(); 301 302 MemorySyncSource clientSource = null; 307 SyncSource serverSource = null; 308 Database db = null; 309 310 314 operations = new HashMap(); 315 status = new ArrayList(); 316 317 String uri = null; 318 ListIterator c = clientSources.listIterator(); 319 while(c.hasNext()) { 320 clientSource = (MemorySyncSource)c.next(); 321 uri = clientSource.getSourceURI(); 322 323 if (log.isLoggable(Level.FINEST)) { 324 log.finest( "SyncSource state of '" 325 + uri 326 + "' is " 327 + syncSourceState.getStateName(uri) 328 ); 329 } 330 331 if (syncSourceState.isError(uri)) { 335 continue; 336 } 337 338 serverSource = getServerSource(uri); 339 db = (Database)dbs.get(uri); 340 341 SyncSource[] sources = new SyncSource[] { 342 serverSource, clientSource 343 }; 344 345 Sync4jPrincipal p = (Sync4jPrincipal)db.getPrincipal(); 350 p.setId(principal.getId()); 351 352 if (syncSourceState.isConfigured(uri)) { 357 try { 358 serverSource.beginSync(p, db.getMethod()); 359 syncSourceState.moveToSyncing(uri); 360 } catch (SyncSourceException e) { 361 syncSourceState.moveToError(uri); 362 setDBError(db, e); 363 continue; 364 } 365 } 366 367 try { 368 clientSource.commitSync(); 369 } catch (SyncSourceException e) { 370 syncSourceState.moveToError(uri); 374 setDBError(db, e); 375 continue; 376 } 377 378 if ( (db.getMethod() == AlertCode.SLOW) 388 || (db.getMethod() == AlertCode.REFRESH_FROM_SERVER) 389 || (db.getStatusCode() == StatusCode.REFRESH_REQUIRED)) { 390 391 try { 392 operations.put(uri, syncStrategy.prepareSlowSync(sources, p, syncTimestamp, isLastMessage)); 393 394 status.addAll( 395 Arrays.asList( 396 syncStrategy.sync((SyncOperation[])operations.get(uri)) 397 ) 398 ); 399 } catch (SyncException e) { 400 syncSourceState.moveToError(uri); 401 setDBError(db, e); 402 continue; 403 } 404 } else { 405 LastTimestamp last = new LastTimestamp( 410 p.getId() , 411 db.getName() 412 ); 413 try { 414 store.read(last); 415 } catch (PersistentStoreException e) { 416 throw new Sync4jException("Error reading last timestamp", e); 417 } 418 419 Timestamp since = new Timestamp (last.start); 420 421 try { 422 operations.put(uri, syncStrategy.prepareFastSync(sources, p, since, syncTimestamp, isLastMessage)); 423 status.addAll( 427 Arrays.asList( 428 syncStrategy.sync((SyncOperation[])operations.get(uri)) 429 ) 430 ); 431 } catch (SyncException e) { 432 syncSourceState.moveToError(uri); 433 setDBError(db, e); 434 continue; 435 } 436 } 437 438 if (serverSource instanceof CommittableSyncSource) { 442 try { 443 ((CommittableSyncSource)serverSource).commitSync(); 444 syncSourceState.moveToCommitted(uri); 445 } catch (SyncSourceException e) { 446 syncSourceState.moveToError(uri); 447 setDBError(db, e); 448 continue; 449 } 450 } else { 451 syncSourceState.moveToCommitted(uri); 456 } 457 458 db.setStatusCode(StatusCode.OK); 462 463 } 465 if (isLastMessage) { 469 while (c.hasPrevious()) { 470 clientSource = (MemorySyncSource)c.previous(); 471 uri = clientSource.getSourceURI(); 472 serverSource = getServerSource(uri); 473 474 try { 475 serverSource.endSync(principal); 476 } catch (SyncException e) { 477 log.throwing(getClass().getName(), "sync", e); 478 if (log.isLoggable(Level.SEVERE)) { 479 log.severe(e.getMessage()); 480 } 481 } 482 } 483 } 484 485 log.info("Ending synchronization ..."); 486 487 try { 488 syncStrategy.endSync(); 489 } catch (SyncException e) { 490 log.throwing(getClass().getName(), "sync", e); 491 if (log.isLoggable(Level.SEVERE)) { 492 log.severe(e.getMessage()); 493 } 494 } 495 496 } 497 498 505 public SyncOperation[] getSyncOperations(String uri) { 506 return (SyncOperation[])operations.get(uri); 507 } 508 509 514 public void resetSyncOperations(String uri) { 515 operations.put(uri, new SyncOperation[0]); 516 } 517 518 527 public Status[] getModificationsStatusCommands(String msgId) { 528 529 if ((status == null) || (status.size() == 0)) { 530 return new Status[0]; 531 } 532 533 SyncOperationStatus[] operationStatus = 534 (SyncOperationStatus[])status.toArray(new SyncOperationStatus[status.size()]); 535 536 return EngineHelper.generateStatusCommands( 537 operationStatus, 538 msgId, 539 cmdIdGenerator 540 ); 541 } 542 543 548 public void addClientSource(SyncSource source) { 549 if (log.isLoggable(Level.FINEST)) { 550 log.finest("adding " + source); 551 } 552 553 clientSources.add(source); 554 555 syncSourceState.moveToConfigured(source.getSourceURI()); 556 } 557 558 563 public List getClientSources() { 564 return clientSources; 565 } 566 567 574 public SyncSource getClientSource(String name) { 575 Iterator i = clientSources.iterator(); 576 577 SyncSource s = null; 578 while (i.hasNext()) { 579 s = (SyncSource)i.next(); 580 581 if (s.getSourceURI().equals(name)) { 582 return s; 583 } 584 } 585 586 return null; 587 } 588 589 598 public int getClientSourceStatus(String uri) { 599 Database db = (Database)dbs.get(uri); 600 601 return db.getStatusCode(); 602 } 603 604 613 public String getClientStatusMessage(String uri) { 614 Database db = (Database)dbs.get(uri); 615 616 return db.getStatusMessage(); 617 } 618 619 626 public SyncSource getServerSource(String name) { 627 if (serverSources.containsKey(name)) { 628 return (SyncSource)serverSources.get(name); 629 } 630 631 Sync4jSource s = new Sync4jSource(name); 632 633 try { 634 store.read(s); 635 } catch (PersistentStoreException e) { 636 if (log.isLoggable(Level.SEVERE)) { 637 log.severe( "Error reading source configuration for source " 638 + name 639 + ": " 640 + e.getMessage() 641 ); 642 } 643 644 log.throwing(getClass().getName(), "getServerSource", e); 645 } 646 647 try { 648 SyncSource syncSource = (SyncSource)configuration.getBeanInstanceByName(s.getConfig()); 649 serverSources.put(name, syncSource); 650 return syncSource; 651 652 } catch (Exception e){ 653 String msg = "Unable to create sync source " 654 + s 655 + ": " 656 + e.getMessage(); 657 658 log.severe(msg); 659 log.throwing(getClass().getName(), "getServerSource", e); 660 } 661 662 return null; 663 } 664 665 675 public DataStore databaseToDataStore(Database db) { 676 ContentTypeInfo contentTypeInfo 677 = new ContentTypeInfo(db.getType(), "-" ); 678 679 return new DataStore( 680 new SourceRef(db.getSource()) , 681 null , 682 -1 , 683 contentTypeInfo , 684 new ContentTypeInfo[] { contentTypeInfo } , 685 contentTypeInfo , 686 new ContentTypeInfo[] { contentTypeInfo } , 687 new DSMem(false, -1, -1) , 688 new SyncCap(new SyncType[] { SyncType.SLOW }) 689 ); 690 } 691 692 697 public CTCap[] getContentTypeCapabilities() { 698 return new CTCap[0]; 699 } 700 701 706 public Ext[] getExtensions() { 707 return new Ext[0]; 708 } 709 710 715 public DataStore[] getDatastores() { 716 DataStore[] ds = null; 717 ArrayList al = new ArrayList(); 718 719 Database db = null; 720 String uri = null; 721 Iterator i = dbs.values().iterator(); 722 while(i.hasNext()) { 723 db = (Database)i.next(); 724 uri = db.getName(); 725 726 if (!db.isOkStatusCode()) { 727 continue; 728 } 729 730 SyncSource ss = getServerSource(uri); 731 732 if ( ss != null) { 733 al.add(EngineHelper.toDataStore(uri, ss.getInfo())); 734 } 735 } 736 737 int size = al.size(); 738 if (size == 0) { 739 ds = new DataStore[0]; 740 } else { 741 ds = (DataStore[])al.toArray(new DataStore[size]); 742 } 743 744 745 return ds; 746 } 747 748 749 759 public DevInf getServerCapabilities(VerDTD verDTD) { 760 DevInf devInf = configuration.getServerConfig().getServerInfo(); 761 devInf.setDataStore(this.getDatastores()); 762 devInf.setVerDTD(verDTD); 763 return devInf; 764 } 765 766 786 public void prepareDatabases(Sync4jPrincipal principal, 787 Database[] dbs , 788 SyncTimestamp next ) { 789 for (int i=0; ((dbs != null) && (i < dbs.length)); ++i) { 790 int statusCode = StatusCode.OK; 791 792 if (!checkServerDatabase(dbs[i])) { 793 statusCode = StatusCode.NOT_FOUND; 794 } else if (!checkDatabasePermissions(dbs[i])) { 795 statusCode = StatusCode.FORBIDDEN; 796 } 797 798 dbs[i].setStatusCode(statusCode); 799 800 if (statusCode == StatusCode.OK) { 804 LastTimestamp last = new LastTimestamp( 805 principal.getId(), 806 dbs[i].getName() 807 ); 808 809 try { 818 store.read(last); 819 dbs[i].setServerAnchor(new Anchor(last.tagClient, next.tagClient)); 820 } catch (NotFoundException e) { 821 last.tagServer = next.tagClient; 825 dbs[i].setServerAnchor(new Anchor(last.tagServer, next.tagClient)); 826 } catch(PersistentStoreException e) { 827 log.severe("Unable to retrieve timestamp from store"); 828 log.throwing(getClass().getName(), "prepareDatabases", e); 829 } 830 831 if ( !(last.tagServer.equals(dbs[i].getAnchor().getLast())) 832 && (dbs[i].getMethod() != AlertCode.REFRESH_FROM_SERVER)) { 833 if (log.isLoggable(Level.FINEST)) { 834 log.finest( "Forcing slow sync for database " 835 + dbs[i].getName() 836 ); 837 log.finest( "Server last: " 838 + last.tagServer 839 + "; client last: " 840 + dbs[i].getAnchor().getLast() 841 ); 842 } 843 if (dbs[i].getMethod() != AlertCode.SLOW) { 844 dbs[i].setStatusCode(StatusCode.REFRESH_REQUIRED); 845 } 846 dbs[i].setMethod(AlertCode.SLOW); 847 } 848 } 849 } 850 } 851 852 859 public boolean login(Cred credential) { 860 return officer.authenticate(credential); 861 } 862 863 870 public void logout(Cred credential) { 871 officer.unAuthenticate(credential); 872 } 873 874 883 public boolean authorize(Principal principal, String resource) { 884 return officer.authorize(principal, resource); 885 } 886 887 898 public ItemizedCommand[] operationsToCommands(SyncOperation[] operations, 899 String uri , 900 boolean slow ) { 901 ItemizedCommand[] commands = 902 EngineHelper.operationsToCommands( getMapping(uri), 903 operations , 904 uri , 905 cmdIdGenerator ); 906 try { 907 EngineHelper.updateClientMappings(clientMappings, operations, slow); 908 } catch (Exception e) { 909 log.severe(e.getMessage()); 910 log.throwing(getClass().getName(), "updateClientMappings", e); 911 } 912 913 return commands; 914 } 915 916 917 924 public void updateServerMappings(boolean slowSync) { 925 SyncOperationStatus[] operationStatus = 926 (SyncOperationStatus[])status.toArray(new SyncOperationStatus[status.size()]); 927 928 try { 929 EngineHelper.updateServerMappings(clientMappings, operationStatus, slowSync); 930 } catch (Exception e) { 931 log.throwing(getClass().getName(), "updateServerMappings", e); 932 933 if (log.isLoggable(Level.SEVERE)) { 934 log.severe(e.getMessage()); 935 } 936 } 937 } 938 939 956 public SyncItem[] itemsToSyncItems(SyncSource syncSource , 957 ModificationCommand cmd , 958 char state , 959 long timestamp ) { 960 961 ClientMapping m = getMapping(syncSource.getSourceURI()); 962 return EngineHelper.itemsToSyncItems(m, syncSource, cmd, state, timestamp); 963 } 964 965 966 974 public Cred getServerCredentials(Chal chal, Sync4jDevice device) { 975 String login = configuration.getServerConfig().getServerInfo().getDevID(); 976 977 if (login == null || login.equals("")) { 978 return null; 979 } 980 981 String pwd = device.getServerPassword(); 982 983 String type = chal.getType(); 984 String data = login + ':' + pwd; 985 986 if (Cred.AUTH_TYPE_BASIC.equals(type)) { 987 data = new String (Base64.encode(data.getBytes())); 988 } else if (Cred.AUTH_TYPE_MD5.equals(type)) { 989 990 if (this.syncMLVerProto.indexOf("1.0") != -1) { 991 byte[] serverNonce = chal.getNextNonce().getValue(); 997 byte[] serverNonceDecode = null; 998 if ((serverNonce == null) || (serverNonce.length == 0)) { 999 serverNonceDecode = new byte[0]; 1000 } else { 1001 serverNonceDecode = Constants.FORMAT_B64.equals(chal.getFormat()) 1002 ? Base64.decode(serverNonce) 1003 : serverNonce 1004 ; 1005 } 1006 byte[] dataBytes = data.getBytes(); 1007 byte[] buf = new byte[dataBytes.length + 1 + serverNonceDecode.length]; 1008 System.arraycopy(dataBytes, 0, buf, 0, dataBytes.length); 1009 buf[dataBytes.length] = (byte)':'; 1010 System.arraycopy(serverNonceDecode, 0, buf, dataBytes.length+1, serverNonceDecode.length); 1011 1012 byte[] digestNonce = MD5.digest(buf); 1013 data = new String (Base64.encode(digestNonce)); 1014 1015 } else { 1016 byte[] serverNonce = chal.getNextNonce().getValue(); 1026 byte[] serverNonceDecode = null; 1027 if ((serverNonce == null) || (serverNonce.length == 0)) { 1028 serverNonceDecode = new byte[0]; 1029 } else { 1030 serverNonceDecode = Constants.FORMAT_B64.equals(chal.getFormat()) 1031 ? Base64.decode(serverNonce) 1032 : serverNonce 1033 ; 1034 } 1035 byte[] digest = MD5.digest(data.getBytes()); byte[] b64 = Base64.encode(digest); 1037 byte[] buf = new byte[b64.length + 1 + serverNonceDecode.length]; 1038 1039 System.arraycopy(b64, 0, buf, 0, b64.length); 1040 buf[b64.length] = (byte)':'; 1041 System.arraycopy(serverNonceDecode, 0, buf, b64.length+1, serverNonceDecode.length); 1042 1043 data = new String (Base64.encode(MD5.digest(buf))); 1044 } 1045 } 1046 1047 Meta m = new Meta(); 1048 m.setType(type); 1049 m.setNextNonce(null); 1050 return new Cred(new Authentication(m, data)); 1051 } 1052 1053 1065 public void readPrincipal(Sync4jPrincipal principal) 1066 throws PersistentStoreException { 1067 assert(principal != null); 1068 assert(principal.getUsername() != null); 1069 assert(principal.getDeviceId() != null); 1070 1071 getStore().read(principal); 1072 } 1073 1074 1081 public void readDevice(Sync4jDevice device) 1082 throws PersistentStoreException { 1083 assert(device != null); 1084 assert(device.getDeviceId() != null); 1085 1086 getStore().read(device); 1087 } 1088 1089 1095 public void storeDevice(Sync4jDevice device) { 1096 try{ 1097 getStore().store(device); 1098 } catch (PersistentStoreException e) { 1099 if (log.isLoggable(Level.SEVERE)) { 1100 log.severe("Error storing the device: " + e.getMessage()); 1101 log.throwing(getClass().getName(), "storeDevice", e); 1102 } 1103 } 1104 } 1105 1106 1114 public void updateMapping(String uri, String guid, String luid) { 1115 getMapping(uri).updateMapping(guid, luid); 1116 } 1117 1118 1119 1120 1134 public ClientMapping getMapping(Sync4jPrincipal principal, 1135 String uri , 1136 boolean slow ) { 1137 if (clientMappings == null) { 1138 clientMappings = new HashMap(); 1139 } 1140 1141 ClientMapping ret = (ClientMapping)clientMappings.get(uri); 1142 1143 if (ret == null) { 1144 try { 1145 PersistentStore ps = getStore(); 1146 1147 ret = new ClientMapping(principal, uri); 1148 1149 ps.read(ret); 1150 1151 if (slow) { 1152 ret.clearMappings(); 1153 ps.store(ret); 1154 } 1155 1156 clientMappings.put(uri, ret); 1157 } catch (PersistentStoreException e) { 1158 log.severe("Unable to read clientMappings from the persistent store"); 1159 log.throwing(getClass().getName(), "getMapping", e); 1160 } 1161 } 1162 1163 return ret; 1164 } 1165 1166 1174 public ClientMapping getMapping(String uri) { 1175 return getMapping(null, uri, false); 1176 } 1177 1178 1183 public void storeMappings() { 1184 if (clientMappings == null) { 1185 return; 1186 } 1187 1188 try { 1189 PersistentStore ps = getStore(); 1190 1191 ClientMapping cm = null; 1192 Iterator i = clientMappings.keySet().iterator(); 1193 while (i.hasNext()) { 1194 cm = (ClientMapping)clientMappings.get(i.next()); 1195 if (log.isLoggable(Level.FINEST)) { 1196 log.finest("Saving client mapping: " + cm); 1197 } 1198 ps.store(cm); 1199 1200 cm.resetModifiedKeys(); 1201 } 1202 } catch (PersistentStoreException e) { 1203 log.severe("Unable to save clientMappings to the persistent store"); 1204 log.throwing(getClass().getName(), "storeMappings", e); 1205 } 1206 } 1207 1208 1209 1211 1221 private boolean checkServerDatabase(Database db) { 1222 if (log.isLoggable(Level.FINEST)) { 1223 log.finest( "Checking if the database " 1224 + db 1225 + " is in the server database list." 1226 ); 1227 } 1228 1229 Sync4jSource s = new Sync4jSource(db.getName()); 1230 1231 try { 1232 store.read(s); log.finest("Yes sir!"); 1233 1234 return true; 1235 } catch (NotFoundException e) { 1236 log.finest("Not found sir"); 1237 } catch (Exception e) { 1238 String msg = "Unable to access the perstistent store " 1239 + s 1240 + ": " 1241 + e.getMessage(); 1242 1243 log.severe(msg); 1244 log.throwing(getClass().getName(), "checkServerDatabase", e); 1245 } 1246 1247 return false; 1251 } 1252 1253 1260 private boolean checkDatabasePermissions(Database db) { 1261 return authorize(db.getPrincipal(), db.getName()); 1262 } 1263 1264 1271 private void setDBError(Database db, SyncException cause) { 1272 db.setStatusCode (cause.getStatusCode()); 1273 db.setStatusMessage(cause.getMessage() ); 1274 } 1275 1276 1277 1278 1279 1280 1281 1282 1283} 1284 1285 1286 1287 | Popular Tags |