1 14 package org.compiere.process; 15 16 import java.util.*; 17 import java.sql.*; 18 import javax.naming.*; 19 import javax.sql.*; 20 import java.math.*; 21 22 import org.compiere.db.*; 23 import org.compiere.util.*; 24 import org.compiere.process.*; 25 import org.compiere.model.*; 26 import org.compiere.interfaces.*; 27 import java.rmi.*; 28 29 36 public class ReplicationLocal extends SvrProcess 37 { 38 41 public ReplicationLocal() 42 { 43 m_system = MSystem.get (getCtx()); 44 } 46 47 private MSystem m_system = null; 48 49 private MReplication m_replication = null; 50 51 private MReplicationRun m_replicationRun = null; 52 53 private Boolean m_test = Boolean.FALSE; 54 55 private boolean m_replicated = true; 56 57 private Server m_serverRemote = null; 58 private long m_start = System.currentTimeMillis(); 59 60 private Timestamp m_replicationStart = new Timestamp (m_start); 61 62 63 private static Logger s_log = Logger.getCLogger(ReplicationLocal.class); 64 65 66 private static String REMOTE = "org.compiere.process.ReplicationRemote"; 67 protected static String START = "com.compiere.client.StartReplication"; 68 69 72 public void prepare() 73 { 74 ProcessInfoParameter[] para = getParameter(); 75 for (int i = 0; i < para.length; i++) 76 { 77 String name = para[i].getParameterName(); 78 if (para[i].getParameter() == null) 79 ; 80 else if (name.equals("IsTest")) 81 m_test = Boolean.valueOf("Y".equals (para[i].getParameter())); 82 else 83 log.error("prepare - Unknown Parameter: " + name); 84 } 85 } 87 92 public String doIt() throws Exception 93 { 94 if (!m_system.isValid()) 95 { 96 log.error("System not setup correctly for Migration"); 97 throw new Exception ("SystemNotSetup"); 98 } 99 log.info("doIt - Record_ID=" + getRecord_ID() + ", test=" + m_test); 100 connectRemote(); 101 setupRemote(); 103 mergeData(); 104 sendUpdates(); 105 106 log.info("doIt - Replicated=" + m_replicated + " - " + m_replicationStart); 108 m_replicationRun.setIsReplicated(m_replicated); 109 double sec = (System.currentTimeMillis() - m_start); 110 sec /= 1000; 111 m_replicationRun.setDescription(sec + " s"); 112 m_replicationRun.save(); 113 if (m_replicated) 114 { 115 m_replication.setDateLastRun (m_replicationStart); 116 m_replication.save(); 117 } 118 exit(); 120 return m_replicated ? "Replicated" : "Replication Error"; 121 } 123 124 128 private void connectRemote() throws Exception 129 { 130 m_replication = new MReplication (getCtx(), getRecord_ID()); 132 String AppsHost = m_replication.getHostAddress(); 134 int AppsPort = m_replication.getHostPort(); 135 boolean RMIoverHTTP = m_replication.isRMIoverHTTP(); 136 log.info ("connectRemote - " + AppsHost + ":" + AppsPort + " - HTTP Tunnel=" + RMIoverHTTP); 137 InitialContext ic = CConnection.getInitialContext( 138 CConnection.getInitialEnvironment(AppsHost, AppsPort, RMIoverHTTP)); 139 if (ic == null) 140 throw new Exception ("NoInitialContext"); 141 142 try 143 { 144 ServerHome serverHome = (ServerHome)ic.lookup (ServerHome.JNDI_NAME); 145 if (serverHome == null) 147 throw new Exception ("NoServer"); 148 m_serverRemote = serverHome.create(); 149 } 152 catch (Exception ex) 153 { 154 log.error("connectRemote", ex); 155 throw new Exception (ex); 156 } 157 } 159 160 164 private void setupRemote() throws Exception 165 { 166 log.info("setupRemote"); 167 String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName " 169 + "FROM AD_ReplicationTable rt" 170 + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) " 171 + "WHERE rt.IsActive='Y' AND t.IsActive='Y'" 172 + " AND AD_ReplicationStrategy_ID=? " + "ORDER BY t.LoadSeq"; 174 RowSet rowset = getRowSet(sql, new Object []{new Integer (m_replication.getAD_ReplicationStrategy_ID())}); 175 if (rowset == null) 176 throw new Exception ("setupRemote - No RowSet Data"); 177 178 RemoteSetupVO data = new RemoteSetupVO(); 180 data.Test = m_test; 181 data.ReplicationTable = rowset; data.IDRangeStart = m_replication.getIDRangeStart(); 183 data.IDRangeEnd = m_replication.getIDRangeEnd(); 184 data.AD_Client_ID = m_replication.getRemote_Client_ID(); 185 data.AD_Org_ID = m_replication.getRemote_Org_ID(); 186 data.Prefix = m_replication.getPrefix(); 187 data.Suffix = m_replication.getSuffix(); 188 ProcessInfo pi = new ProcessInfo(data.toString(), 0, 0); 190 pi.setClassName (REMOTE); 191 pi.setSerializableObject(data); 192 Object result = doIt(START, "init", new Object []{m_system}); 193 if (result == null || !Boolean.TRUE.equals(result)) 194 throw new Exception ("setupRemote - Init Error - " + result); 195 pi = m_serverRemote.process (new Properties (), pi); 197 ProcessInfoLog[] logs = pi.getLogs(); 198 Timestamp dateRun = null; 199 if (logs != null && logs.length > 0) 200 dateRun = logs[0].getP_Date(); log.info ("setupRemote - " + pi + " - Remote Timestamp = " + dateRun); 203 if (dateRun != null) 204 m_replicationStart = dateRun; 205 m_replicationRun = new MReplicationRun (getCtx(), m_replication.getAD_Replication_ID(), m_replicationStart); 206 m_replicationRun.save(); 207 } 209 210 211 215 private void mergeData() throws Exception 216 { 217 log.info("receiveNewData"); 218 String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID " 220 + "FROM AD_ReplicationTable rt" 221 + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) " 222 + "WHERE rt.IsActive='Y' AND t.IsActive='Y'" 223 + " AND AD_ReplicationStrategy_ID=?" + " AND rt.ReplicationType='M' " + "ORDER BY t.LoadSeq"; 226 RowSet rowset = getRowSet(sql, new Object []{new Integer (m_replication.getAD_ReplicationStrategy_ID())}); 227 try 228 { 229 while (rowset.next()) 230 mergeDataTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4)); 231 rowset.close(); 232 rowset = null; 233 } 234 catch (SQLException ex) 235 { 236 log.error("mergeData", ex); 237 m_replicated = false; 238 } 239 } 241 249 private boolean mergeDataTable (int AD_Table_ID, String TableName, int AD_ReplicationTable_ID) throws Exception 250 { 251 RemoteMergeDataVO data = new RemoteMergeDataVO(); 252 data.Test = m_test; 253 data.TableName = TableName; 254 StringBuffer sql = new StringBuffer ("SELECT * FROM ") 256 .append(TableName) 257 .append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID()); 258 if (m_replication.getRemote_Org_ID() != 0) 259 sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")"); 260 if (m_replication.getDateLastRun() != null) 261 sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false)); 262 sql.append(" ORDER BY "); 263 data.KeyColumns = getKeyColumns(AD_Table_ID); 264 if (data.KeyColumns == null || data.KeyColumns.length == 0) 265 { 266 log.error("mergeDataTable - No KeyColumns for " + TableName); 267 m_replicated = false; 268 return false; 269 } 270 for (int i = 0; i < data.KeyColumns.length; i++) 271 { 272 if (i > 0) 273 sql.append(","); 274 sql.append(data.KeyColumns[i]); 275 } 276 data.Sql = sql.toString(); 277 data.CentralData = getRowSet(data.Sql, null); 279 if (data.CentralData == null) 280 { 281 log.debug("mergeDataTable - CentralData is Null - " + TableName); 282 m_replicated = false; 283 return false; 284 } 285 286 ProcessInfo pi = new ProcessInfo("MergeData", 0, 0); 288 pi.setClassName (REMOTE); 289 pi.setSerializableObject(data); 290 pi = m_serverRemote.process (new Properties (), pi); 292 ProcessInfoLog[] logs = pi.getLogs(); 293 String msg = "< "; 294 if (logs != null && logs.length > 0) 295 msg += logs[0].getP_Msg(); log.info("mergeDataTable - " + pi); 297 MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg); 299 if (pi.isError()) 300 { 301 log.error ("mergeDataTable Error - " + pi); 302 m_replicated = false; 303 rLog.setIsReplicated(false); 304 } 305 else { 307 RowSet sourceRS = (RowSet)pi.getSerializableObject(); 308 RowSet targetRS = getRowSet(data.Sql, null); 309 Object result = doIt (START, "sync", new Object [] {data.TableName, data.KeyColumns, sourceRS, targetRS, m_test, Boolean.TRUE}); 311 boolean replicated = isReplicated(result); 312 if (replicated) 313 log.debug ("mergeDataTable -> " + TableName + " - " + result); 314 else 315 { 316 m_replicated = false; 317 log.error ("mergeDataTable -> " + TableName + " - " + result); 318 } 319 rLog.setIsReplicated(replicated); 320 if (result != null) 321 rLog.setP_Msg("< " + result.toString()); 322 sourceRS.close(); 323 sourceRS = null; 324 targetRS.close(); 325 targetRS = null; 326 } 327 rLog.save(); 328 return !pi.isError(); 329 } 331 336 public String [] getKeyColumns (int AD_Table_ID) 337 { 338 ArrayList list = new ArrayList(); 339 PreparedStatement pstmt = null; 340 try 341 { 342 String sql = "SELECT ColumnName FROM AD_Column " 344 + "WHERE AD_Table_ID=?" 345 + " AND IsKey='Y'"; 346 pstmt = DB.prepareStatement(sql); 347 pstmt.setInt(1, AD_Table_ID); 348 ResultSet rs = pstmt.executeQuery(); 349 while (rs.next()) 350 list.add(rs.getString(1)); 351 rs.close(); 352 353 if (list.size() == 0) 355 { 356 sql = "SELECT ColumnName FROM AD_Column " 357 + "WHERE AD_Table_ID=?" 358 + " AND IsParent='Y'"; 359 pstmt = DB.prepareStatement(sql); 360 pstmt.setInt(1, AD_Table_ID); 361 rs = pstmt.executeQuery(); 362 while (rs.next()) 363 list.add(rs.getString(1)); 364 rs.close(); 365 } 366 pstmt.close(); 367 pstmt = null; 368 } 369 catch (Exception e) 370 { 371 log.error("getKeyColumns", e); 372 } 373 try 374 { 375 if (pstmt != null) 376 pstmt.close(); 377 } 378 catch (Exception e) 379 { 380 } 381 382 String [] retValue = new String [list.size()]; 384 list.toArray(retValue); 385 return retValue; 386 } 388 389 390 394 private void sendUpdates() throws Exception 395 { 396 log.info("sendUpdates"); 397 String sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID " 399 + "FROM AD_ReplicationTable rt" 400 + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) " 401 + "WHERE rt.IsActive='Y' AND t.IsActive='Y'" 402 + " AND AD_ReplicationStrategy_ID=?" + " AND rt.ReplicationType='R' " + "ORDER BY t.LoadSeq"; 405 RowSet rowset = getRowSet(sql, new Object []{new Integer (m_replication.getAD_ReplicationStrategy_ID())}); 406 try 407 { 408 while (rowset.next()) 409 sendUpdatesTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4)); 410 rowset.close(); 411 } 412 catch (SQLException ex) 413 { 414 log.error("sendUpdates", ex); 415 m_replicated = false; 416 } 417 } 419 427 private boolean sendUpdatesTable (int AD_Table_ID, String TableName, int AD_ReplicationTable_ID) throws Exception 428 { 429 RemoteUpdateVO data = new RemoteUpdateVO(); 430 data.Test = m_test; 431 data.TableName = TableName; 432 StringBuffer sql = new StringBuffer ("SELECT * FROM ") 434 .append(TableName) 435 .append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID()); 436 if (m_replication.getRemote_Org_ID() != 0) 437 sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")"); 438 if (m_replication.getDateLastRun() != null) 439 sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false)); 440 sql.append(" ORDER BY "); 441 data.KeyColumns = getKeyColumns(AD_Table_ID); 442 if (data.KeyColumns == null || data.KeyColumns.length == 0) 443 { 444 log.error("sendUpdatesTable - No KeyColumns for " + TableName); 445 m_replicated = false; 446 return false; 447 } 448 for (int i = 0; i < data.KeyColumns.length; i++) 449 { 450 if (i > 0) 451 sql.append(","); 452 sql.append(data.KeyColumns[i]); 453 } 454 data.Sql = sql.toString(); 455 data.CentralData = getRowSet(data.Sql, null); 457 if (data.CentralData == null) 458 { 459 log.debug("sendUpdatesTable - Null - " + TableName); 460 m_replicated = false; 461 return false; 462 } 463 int rows = 0; 464 try 465 { 466 if (data.CentralData.last()) 467 rows = data.CentralData.getRow(); 468 data.CentralData.beforeFirst(); } 470 catch (SQLException ex) 471 { 472 log.debug("sendUpdatesTable - RowCheck", ex); 473 m_replicated = false; 474 return false; 475 } 476 if (rows == 0) 477 { 478 log.debug("sendUpdatesTable - No Rows - " + TableName); 479 return true; 480 } 481 else 482 log.debug("sendUpdatesTable - " + TableName + " #" + rows); 483 484 ProcessInfo pi = new ProcessInfo("SendUpdates", 0, 0); 486 pi.setClassName (REMOTE); 487 pi.setSerializableObject(data); 488 pi = m_serverRemote.process (new Properties (), pi); 490 log.info("sendUpdatesTable - " + pi); 491 ProcessInfoLog[] logs = pi.getLogs(); 492 String msg = "> "; 493 if (logs != null && logs.length > 0) 494 msg += logs[0].getP_Msg(); MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg); 497 if (pi.isError()) 498 m_replicated = false; 499 rLog.setIsReplicated(!pi.isError()); 500 rLog.save(); 501 return !pi.isError(); 502 } 504 507 private void exit() 508 { 509 log.info ("exit"); 510 Object result = doIt(START, "exit", null); 511 ProcessInfo pi = new ProcessInfo("Exit", 0, 0); 512 pi.setClassName (REMOTE); 513 pi.setSerializableObject(m_replicationStart); 514 try 516 { 517 m_serverRemote.process (new Properties (), pi); 518 } 519 catch (Exception ex) 520 { 521 } 522 } 524 525 526 532 public static RowSet getRowSet (String sql, Object [] args) 533 { 534 RowSet retValue = null; 535 PreparedStatement pstmt = null; 536 try 537 { 538 pstmt = DB.prepareStatement (sql); 539 if (args != null) 540 { 541 for (int i = 0; i < args.length; i++) 542 { 543 if (args[i] == null) 544 s_log.error("getRowSet - NULL Argument " + i); 545 else if (args[i] instanceof Integer ) 546 pstmt.setInt(i+1, ((Integer )args[i]).intValue()); 547 else if (args[i] instanceof Timestamp) 548 pstmt.setTimestamp(i+1, (Timestamp)args[i]); 549 else if (args[i] instanceof BigDecimal) 550 pstmt.setBigDecimal(i+1, (BigDecimal)args[i]); 551 else 552 pstmt.setString(i+1, args[i].toString()); 553 } 554 } 555 ResultSet rs = pstmt.executeQuery (); 556 retValue = DB.getDatabase().getRowSet(rs); 558 rs.close(); 560 pstmt.close (); 561 pstmt = null; 562 } 563 catch (SQLException ex) 564 { 565 s_log.error ("getRowSet - " + sql, ex); 566 } 567 try 568 { 569 if (pstmt != null) 570 pstmt.close (); 571 } 572 catch (SQLException ex1) 573 { 574 } 575 pstmt = null; 576 return retValue; 577 } 579 584 public static boolean isReplicated (Object result) 585 { 586 boolean replicated = result != null && !Boolean.FALSE.equals(result); 587 if (replicated) 588 replicated = result.toString().endsWith("Errors=0"); 589 return replicated; 590 } 592 } | Popular Tags |