Source of file ElasticaService.php
Size: 6,970 Bytes - Last Modified: 2022-02-21T10:00:52+00:00
/var/www/docs.ssmods.com/process/src/src/Symbiote/Elastica/ElasticaService.php
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 | <?php namespace Symbiote\Elastica; use Elastica\Exception\Connection\HttpException; use Elastica\Exception\ResponseException; use Elastica\Client; use Elastica\Query; use Elastica\Type\Mapping; /** * A service used to interact with elastic search. */ class ElasticaService { /** * Custom mapping definitions * * Format of array( * 'type' => array( * 'FieldA' => array('type' => elastictype, 'etc' => other) * 'FieldB' => array('type' => elastictype, 'etc' => other) * ) * ) * * @var array */ public $mappings = array(); /** * @var \Elastica\Document[] */ protected $buffer = array(); /** * @var bool controls whether indexing operations are buffered or not */ protected $buffered = false; /** * @var Elastica\Client */ private $client; /** * @var string */ private $index; public $enabled = true; protected $connected = true; /** * @param \Elastica\Client $client * @param string $index */ public function __construct(Client $client, $index) { $this->client = $client; $this->index = $index; } /** * @return \Elastica\Client */ public function getClient() { return $this->client; } /** * @return \Elastica\Index */ public function getIndex() { return $this->getClient()->getIndex($this->index); } /** * Performs a search query and returns a result list. * * @param \Elastica\Query|string|array $query * @return ResultList */ public function search($query) { return new ResultList($this->getIndex(), Query::create($query)); } /** * Either creates or updates a record in the index. * * @param Searchable $record */ public function index($record, $stage = 'Stage') { if (!$this->enabled) { return; } $document = $record->getElasticaDocument($stage); $type = $record->getElasticaType(); $this->indexDocument($document, $type); } public function indexDocument($document, $type) { if (!$this->enabled) { return; } if (!$this->connected) { return; } if ($this->buffered) { if (array_key_exists($type, $this->buffer)) { $this->buffer[$type][] = $document; } else { $this->buffer[$type] = array($document); } } else { $index = $this->getIndex(); try { $index->getType($type)->addDocument($document); $index->refresh(); } catch (HttpException $ex) { $this->connected = false; // TODO LOG THIS ERROR \SS_Log::log($ex, \SS_Log::ERR); } catch (ResponseException $re) { // if it's a failure to parse, we can continue \SS_Log::log($re, \SS_Log::ERR); if (strpos($re->getMessage(), 'failed to parse') === false) { throw $re; } } } } /** * Begins a bulk indexing operation where documents are buffered rather than * indexed immediately. */ public function startBulkIndex() { $this->buffered = true; } /** * Ends the current bulk index operation and indexes the buffered documents. */ public function endBulkIndex() { if (!$this->connected) { return; } $index = $this->getIndex(); try { foreach ($this->buffer as $type => $documents) { $index->getType($type)->addDocuments($documents); $index->refresh(); } } catch (HttpException $ex) { $this->connected = false; // TODO LOG THIS ERROR \SS_Log::log($ex, \SS_Log::ERR); } catch (\Elastica\Exception\BulkException $be) { \SS_Log::log($be, \SS_Log::ERR); throw $be; } $this->buffered = false; $this->buffer = array(); } /** * Deletes a record from the index. * * @param Searchable $record */ public function remove($record, $stage = 'Stage') { $index = $this->getIndex(); $type = $index->getType($record->getElasticaType()); try { $type->deleteDocument($record->getElasticaDocument($stage)); } catch (\Exception $ex) { \SS_Log::log($ex, \SS_Log::WARN); return false; } return true; } /** * Creates the index and the type mappings. */ public function define() { $index = $this->getIndex(); if (!$index->exists()) { $index->create(); } try { $this->createMappings($index); } catch (\Elastica\Exception\ResponseException $ex) { \SS_Log::log($ex, \SS_Log::WARN); } } /** * Define all known mappings */ protected function createMappings(\Elastica\Index $index) { foreach ($this->getIndexedClasses() as $class) { /** @var $sng Searchable */ $sng = singleton($class); $type = $sng->getElasticaType(); if (isset($this->mappings[$type])) { // captured later continue; } $mapping = $sng->getElasticaMapping(); $mapping->setType($index->getType($type)); $mapping->send(); } if ($this->mappings) { foreach ($this->mappings as $type => $fields) { $mapping = new Mapping(); $mapping->setProperties($fields); $mapping->setParam('date_detection', false); $mapping->setType($index->getType($type)); $mapping->send(); } } } /** * Re-indexes each record in the index. */ public function refresh($logFunc = null) { $index = $this->getIndex(); if (!$logFunc) { $logFunc = function ($msg) { }; } foreach ($this->getIndexedClasses() as $class) { $logFunc("Indexing items of type $class"); $this->startBulkIndex(); foreach ($class::get() as $record) { $logFunc("Indexing " . $record->Title); $this->index($record); } if (\Object::has_extension($class, 'Versioned')) { $live = \Versioned::get_by_stage($class, 'Live'); foreach ($live as $liveRecord) { $logFunc("Indexing Live record " . $liveRecord->Title); $this->index($liveRecord, 'Live'); } } $this->endBulkIndex(); } } /** * Gets the classes which are indexed (i.e. have the extension applied). * * @return array */ public function getIndexedClasses() { $classes = array(); foreach (\ClassInfo::subclassesFor('DataObject') as $candidate) { if (singleton($candidate)->hasExtension('Symbiote\\Elastica\\Searchable')) { $classes[] = $candidate; } } return $classes; } } |