KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > compiere > process > ReplicationLocal


1 /******************************************************************************
2  * The contents of this file are subject to the Compiere License Version 1.1
3  * ("License"); You may not use this file except in compliance with the License
4  * You may obtain a copy of the License at http://www.compiere.org/license.html
5  * Software distributed under the License is distributed on an "AS IS" basis,
6  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
7  * the specific language governing rights and limitations under the License.
8  * The Original Code is Compiere ERP & CRM Smart Business Solution
9  * The Initial Developer of the Original Code is Jorg Janke and ComPiere, Inc.
10  * Portions created by Jorg Janke are Copyright (C) 1999-2003 Jorg Janke, parts
11  * created by ComPiere are Copyright (C) ComPiere, Inc.; All Rights Reserved.
12  * Contributor(s): ______________________________________.
13  *****************************************************************************/

14 package org.compiere.process;
15
16 import java.util.*;
17 import java.sql.*;
18 import javax.naming.*;
19 import javax.sql.*;
20 import java.math.*;
21
22 import org.compiere.db.*;
23 import org.compiere.util.*;
24 import org.compiere.process.*;
25 import org.compiere.model.*;
26 import org.compiere.interfaces.*;
27 import java.rmi.*;
28
29 /**
30  * Local (Central) Data Replication.
31  * Note: requires migration technology
32  *
33  * @author Jorg Janke
34  * @version $Id: ReplicationLocal.java,v 1.6 2003/10/04 03:51:51 jjanke Exp $
35  */

36 public class ReplicationLocal extends SvrProcess
37 {
38     /**
39      * Replication
40      */

41     public ReplicationLocal()
42     {
43         m_system = MSystem.get (getCtx());
44     } // Replication
45

46     /** System Record */
47     private MSystem m_system = null;
48     /** Replication Info */
49     private MReplication m_replication = null;
50     /** Replication Run */
51     private MReplicationRun m_replicationRun = null;
52     /** Test Flag */
53     private Boolean JavaDoc m_test = Boolean.FALSE;
54     /** Replication Flag */
55     private boolean m_replicated = true;
56     /** Remote Server */
57     private Server m_serverRemote = null;
58     private long m_start = System.currentTimeMillis();
59     /** Date Run on Remote */
60     private Timestamp m_replicationStart = new Timestamp (m_start);
61
62     /** Logger */
63     private static Logger s_log = Logger.getCLogger(ReplicationLocal.class);
64
65     /** Remote class */
66     private static String JavaDoc REMOTE = "org.compiere.process.ReplicationRemote";
67     protected static String JavaDoc START = "com.compiere.client.StartReplication";
68
69     /**
70      * Prepare - e.g., get Parameters.
71      */

72     public void prepare()
73     {
74         ProcessInfoParameter[] para = getParameter();
75         for (int i = 0; i < para.length; i++)
76         {
77             String JavaDoc name = para[i].getParameterName();
78             if (para[i].getParameter() == null)
79                 ;
80             else if (name.equals("IsTest"))
81                 m_test = Boolean.valueOf("Y".equals (para[i].getParameter()));
82             else
83                 log.error("prepare - Unknown Parameter: " + name);
84         }
85     } // prepare
86

87     /**
88      * Perrform process.
89      * @return Message
90      * @throws Exception if not successful
91      */

92     public String JavaDoc doIt() throws Exception JavaDoc
93     {
94         if (!m_system.isValid())
95         {
96             log.error("System not setup correctly for Migration");
97             throw new Exception JavaDoc ("SystemNotSetup");
98         }
99         log.info("doIt - Record_ID=" + getRecord_ID() + ", test=" + m_test);
100         connectRemote();
101         //
102
setupRemote();
103         mergeData();
104         sendUpdates();
105
106         // Save Info
107
log.info("doIt - Replicated=" + m_replicated + " - " + m_replicationStart);
108         m_replicationRun.setIsReplicated(m_replicated);
109         double sec = (System.currentTimeMillis() - m_start);
110         sec /= 1000;
111         m_replicationRun.setDescription(sec + " s");
112         m_replicationRun.save();
113         if (m_replicated)
114         {
115             m_replication.setDateLastRun (m_replicationStart);
116             m_replication.save();
117         }
118         //
119
exit();
120         return m_replicated ? "Replicated" : "Replication Error";
121     } // doIt
122

123
124     /**
125      * Connect to Remote Server
126      * @throws java.lang.Exception
127      */

128     private void connectRemote() throws Exception JavaDoc
129     {
130         // Replication Info
131
m_replication = new MReplication (getCtx(), getRecord_ID());
132         //
133
String JavaDoc AppsHost = m_replication.getHostAddress();
134         int AppsPort = m_replication.getHostPort();
135         boolean RMIoverHTTP = m_replication.isRMIoverHTTP();
136         log.info ("connectRemote - " + AppsHost + ":" + AppsPort + " - HTTP Tunnel=" + RMIoverHTTP);
137         InitialContext ic = CConnection.getInitialContext(
138             CConnection.getInitialEnvironment(AppsHost, AppsPort, RMIoverHTTP));
139         if (ic == null)
140             throw new Exception JavaDoc ("NoInitialContext");
141
142         try
143         {
144             ServerHome serverHome = (ServerHome)ic.lookup (ServerHome.JNDI_NAME);
145         // log.debug("- ServerHome: " + serverHome);
146
if (serverHome == null)
147                 throw new Exception JavaDoc ("NoServer");
148             m_serverRemote = serverHome.create();
149         // log.debug("- Server: " + m_serverRemote);
150
// log.debug("- Remote Status = " + m_serverRemote.getStatus());
151
}
152         catch (Exception JavaDoc ex)
153         {
154             log.error("connectRemote", ex);
155             throw new Exception JavaDoc (ex);
156         }
157     } // connectRemote
158

159
160     /**
161      * Setup Remote AD_System/AD_Table/AD_Sequence for Remote Management.
162      * @throws Exception
163      */

164     private void setupRemote() throws Exception JavaDoc
165     {
166         log.info("setupRemote");
167         //
168
String JavaDoc sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName "
169             + "FROM AD_ReplicationTable rt"
170             + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
171             + "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
172             + " AND AD_ReplicationStrategy_ID=? " // #1
173
+ "ORDER BY t.LoadSeq";
174         RowSet rowset = getRowSet(sql, new Object JavaDoc[]{new Integer JavaDoc(m_replication.getAD_ReplicationStrategy_ID())});
175         if (rowset == null)
176             throw new Exception JavaDoc("setupRemote - No RowSet Data");
177
178         // Data Info
179
RemoteSetupVO data = new RemoteSetupVO();
180         data.Test = m_test;
181         data.ReplicationTable = rowset; // RowSet
182
data.IDRangeStart = m_replication.getIDRangeStart();
183         data.IDRangeEnd = m_replication.getIDRangeEnd();
184         data.AD_Client_ID = m_replication.getRemote_Client_ID();
185         data.AD_Org_ID = m_replication.getRemote_Org_ID();
186         data.Prefix = m_replication.getPrefix();
187         data.Suffix = m_replication.getSuffix();
188         // Process Info
189
ProcessInfo pi = new ProcessInfo(data.toString(), 0, 0);
190         pi.setClassName (REMOTE);
191         pi.setSerializableObject(data);
192         Object JavaDoc result = doIt(START, "init", new Object JavaDoc[]{m_system});
193         if (result == null || !Boolean.TRUE.equals(result))
194             throw new Exception JavaDoc("setupRemote - Init Error - " + result);
195         // send it
196
pi = m_serverRemote.process (new Properties (), pi);
197         ProcessInfoLog[] logs = pi.getLogs();
198         Timestamp dateRun = null;
199         if (logs != null && logs.length > 0)
200             dateRun = logs[0].getP_Date(); // User Remote Timestamp!
201
//
202
log.info ("setupRemote - " + pi + " - Remote Timestamp = " + dateRun);
203         if (dateRun != null)
204             m_replicationStart = dateRun;
205         m_replicationRun = new MReplicationRun (getCtx(), m_replication.getAD_Replication_ID(), m_replicationStart);
206         m_replicationRun.save();
207     } // setupRemote
208

209     /*************************************************************************/
210
211     /**
212      * Receive new Data from Remote.
213      * @throws Exception
214      */

215     private void mergeData() throws Exception JavaDoc
216     {
217         log.info("receiveNewData");
218         //
219
String JavaDoc sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID "
220             + "FROM AD_ReplicationTable rt"
221             + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
222             + "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
223             + " AND AD_ReplicationStrategy_ID=?" // #1
224
+ " AND rt.ReplicationType='M' " // Merge
225
+ "ORDER BY t.LoadSeq";
226         RowSet rowset = getRowSet(sql, new Object JavaDoc[]{new Integer JavaDoc(m_replication.getAD_ReplicationStrategy_ID())});
227         try
228         {
229             while (rowset.next())
230                 mergeDataTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4));
231             rowset.close();
232             rowset = null;
233         }
234         catch (SQLException ex)
235         {
236             log.error("mergeData", ex);
237             m_replicated = false;
238         }
239     } // mergeData
240

241     /**
242      * Receive New Data from Remote (and send local updates)
243      * @param AD_Table_ID table id
244      * @param TableName table name
245      * @param AD_ReplicationTable_ID id
246      * @return true if success
247      * @throws java.lang.Exception
248      */

249     private boolean mergeDataTable (int AD_Table_ID, String JavaDoc TableName, int AD_ReplicationTable_ID) throws Exception JavaDoc
250     {
251         RemoteMergeDataVO data = new RemoteMergeDataVO();
252         data.Test = m_test;
253         data.TableName = TableName;
254         // Create SQL
255
StringBuffer JavaDoc sql = new StringBuffer JavaDoc("SELECT * FROM ")
256             .append(TableName)
257             .append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID());
258         if (m_replication.getRemote_Org_ID() != 0)
259             sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")");
260         if (m_replication.getDateLastRun() != null)
261             sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false));
262         sql.append(" ORDER BY ");
263         data.KeyColumns = getKeyColumns(AD_Table_ID);
264         if (data.KeyColumns == null || data.KeyColumns.length == 0)
265         {
266             log.error("mergeDataTable - No KeyColumns for " + TableName);
267             m_replicated = false;
268             return false;
269         }
270         for (int i = 0; i < data.KeyColumns.length; i++)
271         {
272             if (i > 0)
273                 sql.append(",");
274             sql.append(data.KeyColumns[i]);
275         }
276         data.Sql = sql.toString();
277         // New Central Data
278
data.CentralData = getRowSet(data.Sql, null);
279         if (data.CentralData == null)
280         {
281             log.debug("mergeDataTable - CentralData is Null - " + TableName);
282             m_replicated = false;
283             return false;
284         }
285
286         // Process Info
287
ProcessInfo pi = new ProcessInfo("MergeData", 0, 0);
288         pi.setClassName (REMOTE);
289         pi.setSerializableObject(data);
290         // send it
291
pi = m_serverRemote.process (new Properties (), pi);
292         ProcessInfoLog[] logs = pi.getLogs();
293         String JavaDoc msg = "< ";
294         if (logs != null && logs.length > 0)
295             msg += logs[0].getP_Msg(); // Remote Message
296
log.info("mergeDataTable - " + pi);
297         //
298
MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg);
299         if (pi.isError())
300         {
301             log.error ("mergeDataTable Error - " + pi);
302             m_replicated = false;
303             rLog.setIsReplicated(false);
304         }
305         else // import data fom remote
306
{
307             RowSet sourceRS = (RowSet)pi.getSerializableObject();
308             RowSet targetRS = getRowSet(data.Sql, null);
309             Object JavaDoc result = doIt (START, "sync", new Object JavaDoc[] // Merge
310
{data.TableName, data.KeyColumns, sourceRS, targetRS, m_test, Boolean.TRUE});
311             boolean replicated = isReplicated(result);
312             if (replicated)
313                 log.debug ("mergeDataTable -> " + TableName + " - " + result);
314             else
315             {
316                 m_replicated = false;
317                 log.error ("mergeDataTable -> " + TableName + " - " + result);
318             }
319             rLog.setIsReplicated(replicated);
320             if (result != null)
321                 rLog.setP_Msg("< " + result.toString());
322             sourceRS.close();
323             sourceRS = null;
324             targetRS.close();
325             targetRS = null;
326         }
327         rLog.save();
328         return !pi.isError();
329     } // mergeDataTable
330

331     /**
332      * Get Key Columns (PK or FK) of Table
333      * @param AD_Table_ID id
334      * @return Key Columns
335      */

336     public String JavaDoc[] getKeyColumns (int AD_Table_ID)
337     {
338         ArrayList list = new ArrayList();
339         PreparedStatement pstmt = null;
340         try
341         {
342             // Get Keys
343
String JavaDoc sql = "SELECT ColumnName FROM AD_Column "
344                 + "WHERE AD_Table_ID=?"
345                 + " AND IsKey='Y'";
346             pstmt = DB.prepareStatement(sql);
347             pstmt.setInt(1, AD_Table_ID);
348             ResultSet rs = pstmt.executeQuery();
349             while (rs.next())
350                 list.add(rs.getString(1));
351             rs.close();
352
353             // no keys - search for parents
354
if (list.size() == 0)
355             {
356                 sql = "SELECT ColumnName FROM AD_Column "
357                     + "WHERE AD_Table_ID=?"
358                     + " AND IsParent='Y'";
359                 pstmt = DB.prepareStatement(sql);
360                 pstmt.setInt(1, AD_Table_ID);
361                 rs = pstmt.executeQuery();
362                 while (rs.next())
363                     list.add(rs.getString(1));
364                 rs.close();
365             }
366             pstmt.close();
367             pstmt = null;
368         }
369         catch (Exception JavaDoc e)
370         {
371             log.error("getKeyColumns", e);
372         }
373         try
374         {
375             if (pstmt != null)
376                 pstmt.close();
377         }
378         catch (Exception JavaDoc e)
379         {
380         }
381
382         // Convert to Array
383
String JavaDoc[] retValue = new String JavaDoc[list.size()];
384         list.toArray(retValue);
385         return retValue;
386     } // getKeyColumns
387

388     /*************************************************************************/
389
390     /**
391      * Send Updates to Remote (i.e. r/o on remote)
392      * @throws Exception
393      */

394     private void sendUpdates() throws Exception JavaDoc
395     {
396         log.info("sendUpdates");
397         //
398
String JavaDoc sql = "SELECT rt.AD_Table_ID, rt.ReplicationType, t.TableName, rt.AD_ReplicationTable_ID "
399             + "FROM AD_ReplicationTable rt"
400             + " INNER JOIN AD_Table t ON (rt.AD_Table_ID=t.AD_Table_ID) "
401             + "WHERE rt.IsActive='Y' AND t.IsActive='Y'"
402             + " AND AD_ReplicationStrategy_ID=?" // #1
403
+ " AND rt.ReplicationType='R' " // Reference
404
+ "ORDER BY t.LoadSeq";
405         RowSet rowset = getRowSet(sql, new Object JavaDoc[]{new Integer JavaDoc(m_replication.getAD_ReplicationStrategy_ID())});
406         try
407         {
408             while (rowset.next())
409                 sendUpdatesTable (rowset.getInt(1), rowset.getString(3), rowset.getInt(4));
410             rowset.close();
411         }
412         catch (SQLException ex)
413         {
414             log.error("sendUpdates", ex);
415             m_replicated = false;
416         }
417     } // sendUpdates
418

419     /**
420      * Send UPdates to Remote
421      * @param AD_Table_ID table id
422      * @param TableName table
423      * @param AD_ReplicationTable_ID id
424      * @return true if success
425      * @throws Exception
426      */

427     private boolean sendUpdatesTable (int AD_Table_ID, String JavaDoc TableName, int AD_ReplicationTable_ID) throws Exception JavaDoc
428     {
429         RemoteUpdateVO data = new RemoteUpdateVO();
430         data.Test = m_test;
431         data.TableName = TableName;
432         // Create SQL
433
StringBuffer JavaDoc sql = new StringBuffer JavaDoc ("SELECT * FROM ")
434             .append(TableName)
435             .append(" WHERE AD_Client_ID=").append(m_replication.getRemote_Client_ID());
436         if (m_replication.getRemote_Org_ID() != 0)
437             sql.append(" AND AD_Org_ID IN (0,").append(m_replication.getRemote_Org_ID()).append(")");
438         if (m_replication.getDateLastRun() != null)
439             sql.append(" AND Updated >= ").append(DB.TO_DATE(m_replication.getDateLastRun(), false));
440         sql.append(" ORDER BY ");
441         data.KeyColumns = getKeyColumns(AD_Table_ID);
442         if (data.KeyColumns == null || data.KeyColumns.length == 0)
443         {
444             log.error("sendUpdatesTable - No KeyColumns for " + TableName);
445             m_replicated = false;
446             return false;
447         }
448         for (int i = 0; i < data.KeyColumns.length; i++)
449         {
450             if (i > 0)
451                 sql.append(",");
452             sql.append(data.KeyColumns[i]);
453         }
454         data.Sql = sql.toString();
455         // New Data
456
data.CentralData = getRowSet(data.Sql, null);
457         if (data.CentralData == null)
458         {
459             log.debug("sendUpdatesTable - Null - " + TableName);
460             m_replicated = false;
461             return false;
462         }
463         int rows = 0;
464         try
465         {
466             if (data.CentralData.last())
467                 rows = data.CentralData.getRow();
468             data.CentralData.beforeFirst(); // rewind
469
}
470         catch (SQLException ex)
471         {
472             log.debug("sendUpdatesTable - RowCheck", ex);
473             m_replicated = false;
474             return false;
475         }
476         if (rows == 0)
477         {
478             log.debug("sendUpdatesTable - No Rows - " + TableName);
479             return true;
480         }
481         else
482             log.debug("sendUpdatesTable - " + TableName + " #" + rows);
483
484         // Process Info
485
ProcessInfo pi = new ProcessInfo("SendUpdates", 0, 0);
486         pi.setClassName (REMOTE);
487         pi.setSerializableObject(data);
488         // send it
489
pi = m_serverRemote.process (new Properties (), pi);
490         log.info("sendUpdatesTable - " + pi);
491         ProcessInfoLog[] logs = pi.getLogs();
492         String JavaDoc msg = "> ";
493         if (logs != null && logs.length > 0)
494             msg += logs[0].getP_Msg(); // Remote Message
495
//
496
MReplicationLog rLog = new MReplicationLog (getCtx(), m_replicationRun.getAD_Replication_Run_ID(), AD_ReplicationTable_ID, msg);
497         if (pi.isError())
498             m_replicated = false;
499         rLog.setIsReplicated(!pi.isError());
500         rLog.save();
501         return !pi.isError();
502     } // sendUpdatesTable
503

504     /**
505      * Clean up resources (connections)
506      */

507     private void exit()
508     {
509         log.info ("exit");
510         Object JavaDoc result = doIt(START, "exit", null);
511         ProcessInfo pi = new ProcessInfo("Exit", 0, 0);
512         pi.setClassName (REMOTE);
513         pi.setSerializableObject(m_replicationStart);
514         // send it
515
try
516         {
517             m_serverRemote.process (new Properties (), pi);
518         }
519         catch (Exception JavaDoc ex)
520         {
521         }
522     } // exit
523

524     /*************************************************************************/
525
526     /**
527      * Get RowSet of Local Connection
528      * @param sql sql
529      * @param args optional argument array - supported: Integer, Timestamp, BigDecimal - rest is concerted to String
530      * @return row set
531      */

532     public static RowSet getRowSet (String JavaDoc sql, Object JavaDoc[] args)
533     {
534         RowSet retValue = null;
535         PreparedStatement pstmt = null;
536         try
537         {
538             pstmt = DB.prepareStatement (sql);
539             if (args != null)
540             {
541                 for (int i = 0; i < args.length; i++)
542                 {
543                     if (args[i] == null)
544                         s_log.error("getRowSet - NULL Argument " + i);
545                     else if (args[i] instanceof Integer JavaDoc)
546                         pstmt.setInt(i+1, ((Integer JavaDoc)args[i]).intValue());
547                     else if (args[i] instanceof Timestamp)
548                         pstmt.setTimestamp(i+1, (Timestamp)args[i]);
549                     else if (args[i] instanceof BigDecimal)
550                         pstmt.setBigDecimal(i+1, (BigDecimal)args[i]);
551                     else
552                         pstmt.setString(i+1, args[i].toString());
553                 }
554             }
555             ResultSet rs = pstmt.executeQuery ();
556             //
557
retValue = DB.getDatabase().getRowSet(rs);
558             //
559
rs.close();
560             pstmt.close ();
561             pstmt = null;
562         }
563         catch (SQLException ex)
564         {
565             s_log.error ("getRowSet - " + sql, ex);
566         }
567         try
568         {
569             if (pstmt != null)
570                 pstmt.close ();
571         }
572         catch (SQLException ex1)
573         {
574         }
575         pstmt = null;
576         return retValue;
577     } // getRowSet
578

579     /**
580      * Is data successful replicated
581      * @param result sync return value
582      * @return true if replicated
583      */

584     public static boolean isReplicated (Object JavaDoc result)
585     {
586         boolean replicated = result != null && !Boolean.FALSE.equals(result);
587         if (replicated)
588             replicated = result.toString().endsWith("Errors=0");
589         return replicated;
590     } // isReplicated
591

592 } // ReplicationLocal
593
Popular Tags