1 17 package org.alfresco.repo.node.index; 18 19 import java.util.ArrayList ; 20 import java.util.List ; 21 22 import org.alfresco.error.AlfrescoRuntimeException; 23 import org.alfresco.model.ContentModel; 24 import org.alfresco.repo.domain.NodeStatus; 25 import org.alfresco.repo.search.Indexer; 26 import org.alfresco.repo.search.impl.lucene.LuceneIndexerImpl; 27 import org.alfresco.repo.search.impl.lucene.fts.FullTextSearchIndexer; 28 import org.alfresco.repo.transaction.TransactionUtil; 29 import org.alfresco.repo.transaction.TransactionUtil.TransactionWork; 30 import org.alfresco.service.cmr.repository.ChildAssociationRef; 31 import org.alfresco.service.cmr.repository.NodeRef; 32 import org.alfresco.service.cmr.repository.NodeService; 33 import org.alfresco.service.cmr.repository.StoreRef; 34 import org.alfresco.service.cmr.search.ResultSet; 35 import org.alfresco.service.cmr.search.SearchParameters; 36 import org.alfresco.service.cmr.search.SearchService; 37 import org.alfresco.service.transaction.TransactionService; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.hibernate.CacheMode; 41 import org.hibernate.Query; 42 import org.hibernate.Session; 43 import org.springframework.orm.hibernate3.HibernateCallback; 44 import org.springframework.orm.hibernate3.support.HibernateDaoSupport; 45 46 79 public class FullIndexRecoveryComponent extends HibernateDaoSupport implements IndexRecovery 80 { 81 public static final String QUERY_GET_NEXT_CHANGE_TXN_IDS = "node.GetNextChangeTxnIds"; 82 public static final String QUERY_GET_CHANGED_NODE_STATUSES = "node.GetChangedNodeStatuses"; 83 public static final String QUERY_GET_CHANGED_NODE_STATUSES_COUNT = "node.GetChangedNodeStatusesCount"; 84 85 private static final String START_TXN_ID = "000"; 86 87 private static Log logger = LogFactory.getLog(FullIndexRecoveryComponent.class); 88 89 90 private static boolean started = false; 91 92 private static String currentTxnId = START_TXN_ID; 93 94 private boolean killThread = false; 95 96 97 private TransactionService transactionService; 98 99 private Indexer indexer; 100 101 private FullTextSearchIndexer ftsIndexer; 102 103 private SearchService searcher; 104 105 private NodeService nodeService; 106 107 private List <StoreRef> storeRefs; 108 109 private boolean executeFullRecovery; 110 111 private boolean runContinuously; 112 113 private long waitTime; 114 115 private CacheMode l2CacheMode; 116 117 120 public static String getCurrentTransactionId() 121 { 122 return currentTxnId; 123 } 124 125 public FullIndexRecoveryComponent() 126 { 127 this.storeRefs = new ArrayList <StoreRef>(2); 128 129 this.killThread = false; 130 this.executeFullRecovery = false; 131 this.runContinuously = false; 132 this.waitTime = 1000L; 133 this.l2CacheMode = CacheMode.REFRESH; 134 135 Runnable shutdownRunnable = new Runnable () 137 { 138 public void run() 139 { 140 killThread = true; 141 }; 142 }; 143 Thread shutdownThread = new Thread (shutdownRunnable); 144 Runtime.getRuntime().addShutdownHook(shutdownThread); 145 } 146 147 150 public static boolean isStarted() 151 { 152 return started; 153 } 154 155 158 public void setTransactionService(TransactionService transactionService) 159 { 160 this.transactionService = transactionService; 161 } 162 163 166 public void setIndexer(Indexer indexer) 167 { 168 this.indexer = indexer; 169 } 170 171 174 public void setFtsIndexer(FullTextSearchIndexer ftsIndexer) 175 { 176 this.ftsIndexer = ftsIndexer; 177 } 178 179 182 public void setSearcher(SearchService searcher) 183 { 184 this.searcher = searcher; 185 } 186 187 190 public void setNodeService(NodeService nodeService) 191 { 192 this.nodeService = nodeService; 193 } 194 195 200 public void setStores(List <String > storeRefStrings) 201 { 202 storeRefs.clear(); 203 for (String storeRefStr : storeRefStrings) 204 { 205 StoreRef storeRef = new StoreRef(storeRefStr); 206 storeRefs.add(storeRef); 207 } 208 } 209 210 219 public void setExecuteFullRecovery(boolean executeFullRecovery) 220 { 221 this.executeFullRecovery = executeFullRecovery; 222 } 223 224 231 public void setRunContinuously(boolean runContinuously) 232 { 233 this.runContinuously = runContinuously; 234 } 235 236 241 public void setWaitTime(long waitTime) 242 { 243 this.waitTime = waitTime; 244 } 245 246 251 public void setL2CacheMode(String l2CacheModeStr) 252 { 253 if (l2CacheModeStr.equals("GET")) 254 { 255 l2CacheMode = CacheMode.GET; 256 } 257 else if (l2CacheModeStr.equals("IGNORE")) 258 { 259 l2CacheMode = CacheMode.IGNORE; 260 } 261 else if (l2CacheModeStr.equals("NORMAL")) 262 { 263 l2CacheMode = CacheMode.NORMAL; 264 } 265 else if (l2CacheModeStr.equals("PUT")) 266 { 267 l2CacheMode = CacheMode.PUT; 268 } 269 else if (l2CacheModeStr.equals("REFRESH")) 270 { 271 l2CacheMode = CacheMode.REFRESH; 272 } 273 else 274 { 275 throw new IllegalArgumentException ("Unrecognised Hibernate L2 cache mode: " + l2CacheModeStr); 276 } 277 } 278 279 285 public synchronized void reindex() 286 { 287 if (FullIndexRecoveryComponent.started) 288 { 289 throw new AlfrescoRuntimeException 290 ("Only one FullIndexRecoveryComponent may be used per VM and it may only be called once"); 291 } 292 293 FullIndexRecoveryComponent.started = true; 295 296 TransactionWork<Object > ftsReindexWork = new TransactionWork<Object >() 298 { 299 public Object doWork() 300 { 301 for (StoreRef storeRef : storeRefs) 303 { 304 if (!nodeService.exists(storeRef)) 306 { 307 if (logger.isDebugEnabled()) 309 { 310 logger.debug("Skipping reindex of non-existent store: " + storeRef); 311 } 312 continue; 313 } 314 315 ftsIndexer.requiresIndex(storeRef); 317 } 318 if (logger.isDebugEnabled()) 320 { 321 logger.debug("Prompted FTS index on stores: " + storeRefs); 322 } 323 return null; 324 } 325 }; 326 TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, ftsReindexWork); 327 328 if (!this.executeFullRecovery) 330 { 331 if (logger.isDebugEnabled()) 332 { 333 logger.debug("Full index recovery is off - quitting"); 334 } 335 } 336 else 337 { 338 FullIndexRecoveryComponent.currentTxnId = START_TXN_ID; 340 341 Runnable runnable = new ReindexRunner(); 343 Thread reindexThread = new Thread (runnable); 344 reindexThread.setDaemon(true); 346 reindexThread.setPriority(Thread.MIN_PRIORITY); 348 reindexThread.start(); 350 351 if (logger.isDebugEnabled()) 352 { 353 logger.debug("Full index recovery thread started: \n" + 354 " continuous: " + runContinuously + "\n" + 355 " stores: " + storeRefs); 356 } 357 } 358 } 359 360 367 private class ReindexRunner implements Runnable 368 { 369 public void run() 370 { 371 while (!killThread) 373 { 374 try 375 { 376 List <String > txnsIndexed = FullIndexRecoveryComponent.this.reindexNodes(); 378 int missingContentCount = FullIndexRecoveryComponent.this.reindexMissingContent(); 380 if (txnsIndexed.size() == 0 && !runContinuously) 382 { 383 if (logger.isDebugEnabled()) 386 { 387 logger.debug("Thread quitting - no more available indexing to do: \n" + 388 " last txn: " + FullIndexRecoveryComponent.getCurrentTransactionId()); 389 } 390 break; 391 } 392 synchronized(FullIndexRecoveryComponent.this) 394 { 395 FullIndexRecoveryComponent.this.wait(waitTime); 396 } 397 } 398 catch (InterruptedException e) 399 { 400 } 402 catch (Throwable e) 403 { 404 if (killThread) 405 { 406 } 408 else 409 { 410 logger.error("Reindex failure", e); 412 } 413 } 414 } 415 } 416 } 417 418 421 private int reindexMissingContent() 422 { 423 int count = 0; 424 for (StoreRef storeRef : storeRefs) 425 { 426 count += reindexMissingContent(storeRef); 427 } 428 return count; 429 } 430 431 435 private int reindexMissingContent(StoreRef storeRef) 436 { 437 SearchParameters sp = new SearchParameters(); 438 sp.addStore(storeRef); 439 440 String query = "TEXT:" + LuceneIndexerImpl.NOT_INDEXED_CONTENT_MISSING; 442 sp.setLanguage(SearchService.LANGUAGE_LUCENE); 443 sp.setQuery(query); 444 ResultSet results = null; 445 try 446 { 447 results = searcher.query(sp); 448 449 int count = 0; 450 List <ChildAssociationRef> assocRefs = results.getChildAssocRefs(); 452 for (ChildAssociationRef assocRef : assocRefs) 453 { 454 final NodeRef childNodeRef = assocRef.getChildRef(); 455 TransactionWork<Object > reindexWork = new TransactionWork<Object >() 457 { 458 public Object doWork() 459 { 460 indexer.updateNode(childNodeRef); 461 return null; 462 } 463 }; 464 TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, reindexWork); 465 count++; 466 } 467 if (logger.isDebugEnabled()) 469 { 470 logger.debug("Reindexed missing content: \n" + 471 " store: " + storeRef + "\n" + 472 " node count: " + count); 473 } 474 return count; 475 } 476 finally 477 { 478 if (results != null) 479 { 480 results.close(); 481 } 482 } 483 } 484 485 488 private List <String > reindexNodes() 489 { 490 List <String > txnsToCheck = getNextChangeTxnIds(FullIndexRecoveryComponent.currentTxnId); 492 493 for (String changeTxnId : txnsToCheck) 495 { 496 reindexNodes(changeTxnId); 497 } 498 499 return txnsToCheck; 501 } 502 503 508 private void reindexNodes(final String changeTxnId) 509 { 510 514 TransactionWork<Object > reindexWork = new TransactionWork<Object >() 515 { 516 public Object doWork() throws Exception 517 { 518 HibernateCallback callback = new ReindexCallback(changeTxnId); 520 getHibernateTemplate().execute(callback); 521 return null; 523 } 524 }; 525 try 526 { 527 TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, reindexWork); 528 } 529 catch (Throwable e) 530 { 531 logger.error("Transaction reindex failed: \n" + 532 " txn: " + changeTxnId, 533 e); 534 } 535 finally 536 { 537 currentTxnId = changeTxnId; 542 } 543 } 544 545 553 private class ReindexCallback implements HibernateCallback 554 { 555 private final String changeTxnId; 556 557 public ReindexCallback(String changeTxnId) 558 { 559 this.changeTxnId = changeTxnId; 560 } 561 562 567 public Object doInHibernate(Session session) 568 { 569 getSession().setCacheMode(l2CacheMode); 571 572 for (StoreRef storeRef : storeRefs) 574 { 575 if (!nodeService.exists(storeRef)) 576 { 577 continue; 579 } 580 reindexNodes(storeRef, changeTxnId); 582 } 583 return null; 585 } 586 587 private void reindexNodes(StoreRef storeRef, String changeTxnId) 588 { 589 SearchParameters sp = new SearchParameters(); 591 sp.addStore(storeRef); 592 593 String query = "TX:\"" + changeTxnId + "\""; 595 sp.setLanguage(SearchService.LANGUAGE_LUCENE); 596 sp.setQuery(query); 597 ResultSet results = null; 598 try 599 { 600 results = searcher.query(sp); 601 if (results.length() > 0) 603 { 604 if (logger.isDebugEnabled()) 607 { 608 logger.debug("Transaction present in index - no indexing required: \n" + 609 " store: " + storeRef + "\n" + 610 " txn: " + changeTxnId); 611 } 612 return; 613 } 614 } 615 finally 616 { 617 if (results != null) 618 { 619 results.close(); 620 } 621 } 622 int changedCount = getChangedNodeStatusesCount(storeRef, changeTxnId); 625 if (changedCount == 0) 626 { 627 if (logger.isDebugEnabled()) 630 { 631 logger.debug("Transaction only has deletions - no indexing required: \n" + 632 " store: " + storeRef + "\n" + 633 " txn: " + changeTxnId); 634 } 635 return; 636 } 637 638 List <NodeStatus> deletedNodeStatuses = getDeletedNodeStatuses(storeRef, changeTxnId); 640 for (NodeStatus status : deletedNodeStatuses) 641 { 642 NodeRef nodeRef = new NodeRef(storeRef, status.getKey().getGuid()); 643 ChildAssociationRef assocRef = new ChildAssociationRef( 645 ContentModel.ASSOC_CHILDREN, 646 null, 647 null, 648 nodeRef); 649 indexer.deleteNode(assocRef); 650 } 651 652 List <NodeStatus> changedNodeStatuses = getChangedNodeStatuses(storeRef, changeTxnId); 654 for (NodeStatus status : changedNodeStatuses) 655 { 656 NodeRef nodeRef = new NodeRef(storeRef, status.getKey().getGuid()); 657 ChildAssociationRef primaryAssocRef = nodeService.getPrimaryParent(nodeRef); 659 indexer.createNode(primaryAssocRef); 661 } 662 663 if (logger.isDebugEnabled()) 665 { 666 logger.debug("Transaction reindexed: \n" + 667 " store: " + storeRef + "\n" + 668 " txn: " + changeTxnId + "\n" + 669 " deletions: " + deletedNodeStatuses.size() + "\n" + 670 " modifications: " + changedNodeStatuses.size()); 671 } 672 } 673 }; 674 675 681 @SuppressWarnings ("unchecked") 682 public List <String > getNextChangeTxnIds(final String currentTxnId) 683 { 684 HibernateCallback callback = new HibernateCallback() 685 { 686 public Object doInHibernate(Session session) 687 { 688 Query query = session.getNamedQuery(QUERY_GET_NEXT_CHANGE_TXN_IDS); 689 query.setString("currentTxnId", currentTxnId) 690 .setReadOnly(true); 691 return query.list(); 692 } 693 }; 694 List <String > queryResults = (List <String >) getHibernateTemplate().execute(callback); 695 return queryResults; 697 } 698 699 @SuppressWarnings ("unchecked") 700 public int getChangedNodeStatusesCount(final StoreRef storeRef, final String changeTxnId) 701 { 702 HibernateCallback callback = new HibernateCallback() 703 { 704 public Object doInHibernate(Session session) 705 { 706 Query query = session.getNamedQuery(QUERY_GET_CHANGED_NODE_STATUSES_COUNT); 707 query.setBoolean("deleted", false) 708 .setString("storeProtocol", storeRef.getProtocol()) 709 .setString("storeIdentifier", storeRef.getIdentifier()) 710 .setString("changeTxnId", changeTxnId) 711 .setReadOnly(true); 712 return query.uniqueResult(); 713 } 714 }; 715 Integer changeCount = (Integer ) getHibernateTemplate().execute(callback); 716 return changeCount.intValue(); 718 } 719 720 @SuppressWarnings ("unchecked") 721 public List <NodeStatus> getChangedNodeStatuses(final StoreRef storeRef, final String changeTxnId) 722 { 723 HibernateCallback callback = new HibernateCallback() 724 { 725 public Object doInHibernate(Session session) 726 { 727 Query query = session.getNamedQuery(QUERY_GET_CHANGED_NODE_STATUSES); 728 query.setBoolean("deleted", false) 729 .setString("storeProtocol", storeRef.getProtocol()) 730 .setString("storeIdentifier", storeRef.getIdentifier()) 731 .setString("changeTxnId", changeTxnId) 732 .setReadOnly(true); 733 return query.list(); 734 } 735 }; 736 List <NodeStatus> queryResults = (List ) getHibernateTemplate().execute(callback); 737 return queryResults; 739 } 740 741 @SuppressWarnings ("unchecked") 742 public List <NodeStatus> getDeletedNodeStatuses(final StoreRef storeRef, final String changeTxnId) 743 { 744 HibernateCallback callback = new HibernateCallback() 745 { 746 public Object doInHibernate(Session session) 747 { 748 Query query = session.getNamedQuery(QUERY_GET_CHANGED_NODE_STATUSES); 749 query.setBoolean("deleted", true) 750 .setString("storeProtocol", storeRef.getProtocol()) 751 .setString("storeIdentifier", storeRef.getIdentifier()) 752 .setString("changeTxnId", changeTxnId) 753 .setReadOnly(true); 754 return query.list(); 755 } 756 }; 757 List <NodeStatus> queryResults = (List ) getHibernateTemplate().execute(callback); 758 return queryResults; 760 } 761 } | Popular Tags |