Source of file FileBasedSqsQueue.php
Size: 2,250 Bytes - Last Modified: 2021-12-23T10:26:51+00:00
/var/www/docs.ssmods.com/process/src/src/Service/FileBasedSqsQueue.php
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 | <?php namespace Symbiote\SqsJobQueue\Service; /** * @author marcus */ class FileBasedSqsQueue { const SYS_KEY = '__src_system'; public $queuePath; public function __construct() { } protected function getQueuePath() { if (!$this->queuePath) { $this->queuePath = __DIR__.'/.queues'; } if (!is_dir($this->queuePath)) { mkdir($this->queuePath, 02770, true); } return $this->queuePath; } public function getQueueUrl() { return new FileBasedSqsMessageList(); } public function sendMessage($message) { $message[self::SYS_KEY] = BASE_PATH; $data = json_encode($message); $path = $this->getQueuePath(); $name = md5($data); file_put_contents($path.'/'.$name, $data); } public function receiveMessage($params = []) { $messages = glob($this->getQueuePath().'/*'); $all = new FileBasedSqsMessageList(); foreach ($messages as $file) { $content = file_get_contents($file); if (strlen($content)) { $data = json_decode($content, true); if (isset($data[self::SYS_KEY]) && $data[self::SYS_KEY] == BASE_PATH) { $message = [ 'Body' => isset($data['MessageBody']) ? (is_string($data['MessageBody']) ? $data['MessageBody'] : json_encode($data['MessageBody'])) : '', 'ReceiptHandle' => $file ]; $all->add($message); } } } return $all; } public function deleteMessage($params) { $file = isset($params['ReceiptHandle']) ? $params['ReceiptHandle'] : null; if ($file && file_exists($file) && strpos($file, $this->getQueuePath()) !== false) { unlink($file); } } } class FileBasedSqsMessageList { protected $messages = []; public function __construct() { } public function add($msg) { $this->messages[] = $msg; } public function get($key) { if ($key === 'Messages') { return $this->messages; } return $key; } } |