Source of file SearchUpdateCommitJobProcessor.php
Size: 8,065 Bytes - Last Modified: 2021-12-23T10:52:39+00:00
/var/www/docs.ssmods.com/process/src/src/Search/Processors/SearchUpdateCommitJobProcessor.php
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 | <?php namespace SilverStripe\FullTextSearch\Search\Processors; use DateTime; use DateInterval; use SilverStripe\FullTextSearch\Search\FullTextSearch; use SilverStripe\Core\Config\Config; use SilverStripe\Core\Injector\Injector; use SilverStripe\ORM\FieldType\DBDatetime; use stdClass; use Symbiote\QueuedJobs\Services\QueuedJob; use Symbiote\QueuedJobs\Services\QueuedJobService; if (!interface_exists(QueuedJob::class)) { return; } class SearchUpdateCommitJobProcessor implements QueuedJob { /** * The QueuedJob queue to use when processing commits * * @config * @var string */ private static $commit_queue = QueuedJob::QUEUED; /** * List of indexes to commit * * @var array */ protected $indexes = array(); /** * True if this job is skipped to be be re-scheduled in the future * * @var boolean */ protected $skipped = false; /** * List of completed indexes * * @var array */ protected $completed = array(); /** * List of messages * * @var array */ protected $messages = array(); /** * List of dirty indexes to be committed * * @var array */ public static $dirty_indexes = array(); /** * If solrindex::commit has already been performed, but additional commits are necessary, * how long do we wait before attempting to touch the index again? * * {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers} * * @var int * @config */ private static $cooldown = 300; /** * True if any commits have been executed this request. If so, any attempts to run subsequent commits * should be delayed until next queuedjob to prevent solr reaching maxWarmingSearchers * * {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers} * * @var boolean */ public static $has_run = false; /** * This method is invoked once indexes with dirty ids have been updapted and a commit is necessary * * @param boolean $dirty Marks all indexes as dirty by default. Set to false if there are known comitted and * clean indexes * @param string $startAfter Start date * @return int The ID of the next queuedjob to run. This could be a new one or an existing one. */ public static function queue($dirty = true, $startAfter = null) { $commit = Injector::inst()->create(__CLASS__); $id = singleton(QueuedJobService::class)->queueJob($commit, $startAfter); if ($dirty) { $indexes = FullTextSearch::get_indexes(); static::$dirty_indexes = array_keys($indexes); } return $id; } public function getJobType() { return Config::inst()->get(__CLASS__, 'commit_queue'); } public function getSignature() { return sha1(get_class($this) . time() . mt_rand(0, 100000)); } public function getTitle() { return "FullTextSearch Commit Job"; } /** * Get the list of index names we should process * * @return array */ public function getAllIndexes() { if (empty($this->indexes)) { $indexes = FullTextSearch::get_indexes(); $this->indexes = array_keys($indexes); } return $this->indexes; } public function jobFinished() { // If we've indexed exactly as many as we would like, we are done return $this->skipped || (count($this->getAllIndexes()) <= count($this->completed)); } public function prepareForRestart() { // NOOP } public function afterComplete() { // NOOP } /** * Abort this job, potentially rescheduling a replacement if there is still work to do */ protected function discardJob() { $this->skipped = true; // If we do not have dirty records, then assume that these dirty records were committed // already this request (but probably another job), so we don't need to commit anything else. // This could occur if we completed multiple searchupdate jobs in a prior request, and // we only need one commit job to commit all of them in the current request. if (empty(static::$dirty_indexes)) { $this->addMessage("Indexing already completed this request: Discarding this job"); return; } // If any commit has run, but some (or all) indexes are un-comitted, we must re-schedule this task. // This could occur if we completed a searchupdate job in a prior request, as well as in // the current request $cooldown = Config::inst()->get(__CLASS__, 'cooldown'); $now = new DateTime(DBDatetime::now()->getValue()); $now->add(new DateInterval('PT' . $cooldown . 'S')); $runat = $now->Format('Y-m-d H:i:s'); $this->addMessage("Indexing already run this request, but incomplete. Re-scheduling for {$runat}"); // Queue after the given cooldown static::queue(false, $runat); } public function process() { // If we have already run an instance of SearchUpdateCommitJobProcessor this request, immediately // quit this job to prevent hitting warming search limits in Solr if (static::$has_run) { $this->discardJob(); return true; } // To prevent other commit jobs from running this request static::$has_run = true; // Run all incompleted indexes $indexNames = $this->getAllIndexes(); foreach ($indexNames as $name) { $index = singleton($name); $this->commitIndex($index); } $this->addMessage("All indexes committed"); return true; } /** * Commits a specific index * * @param SolrIndex $index * @throws Exception */ protected function commitIndex($index) { // Skip index if this is already complete $name = get_class($index); if (in_array($name, $this->completed)) { $this->addMessage("Skipping already comitted index {$name}"); return; } // Bypass SolrIndex::commit exception handling so that queuedjobs can handle the error $this->addMessage("Committing index {$name}"); $index->getService()->commit(false, false, false); $this->addMessage("Committing index {$name} was successful"); // If this index is currently marked as dirty, it's now clean if (in_array($name, static::$dirty_indexes)) { static::$dirty_indexes = array_diff(static::$dirty_indexes, array($name)); } // Mark complete $this->completed[] = $name; } public function setup() { // NOOP } public function getJobData() { $data = new stdClass(); $data->totalSteps = count($this->getAllIndexes()); $data->currentStep = count($this->completed); $data->isComplete = $this->jobFinished(); $data->messages = $this->messages; $data->jobData = new stdClass(); $data->jobData->skipped = $this->skipped; $data->jobData->completed = $this->completed; $data->jobData->indexes = $this->getAllIndexes(); return $data; } public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { $this->isComplete = $isComplete; $this->messages = $messages; $this->skipped = $jobData->skipped; $this->completed = $jobData->completed; $this->indexes = $jobData->indexes; } public function addMessage($message, $severity = 'INFO') { $severity = strtoupper($severity); $this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message"; } public function getMessages() { return $this->messages; } } |