Source of file StompMQ.php
Size: 2,577 Bytes - Last Modified: 2021-12-23T10:32:53+00:00
/var/www/docs.ssmods.com/process/src/code/StompMQ.php
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 | <?php /** * Message queueing implementation class that uses Stomp to exchange messages with an * external message system. * * NOTE: THIS IS NOT COMPLETE OR TESTED IN ANY WAY, SO DON'T USE IT! * * Requires Stomp.php library. * * @TODO: * * include stomp redistributables if legal. * * complete the implementation and test it against ApacheMQ * * @author Mark Stephens <mark@silverstripe.com> */ class StompMQ implements MessageQueueImplementation { public static $conn = null; // Constructor needed for singleton() public function __construct() { } /** * Set up for interacting with Stomp, icnluding creating the connection. Configuration * info is taken from the interface configuration. * @param <type> $config * @return void */ protected function init($config) { if (self::$conn) { return; } require_once(Director::getAbsFile("messagequeue/thirdparty/stomp-php-1.0.0/Stomp.php")); $conf = $config["stomp"]; self::$conn = new Stomp($conf["server"]); if (isset($conf["durableClientId"])) { self::$conn->clientId = $conf["durableClientId"]; } // @TODO: handle authentication and any other connection properties self::$conn->connect(); } public function send($queue, $msgframe, $interfaceConfig) { $this->init($interfaceConfig); $result = self::$conn->send("/queue/" . $queue, $msgframe->body, $msgframe->header); } /** * Get a bunch of messages via Stomp. * @TODO Handle exceptions, and possibly using stomp transactions. If an exception occurs * while receiving multiple messages, we need to ensure that the messages successfully retrieved * are returned, because the server thinks these are done. * @param String $queue * @param Map $interfaceConfig * @param Map $options * @return DataObjectSet */ public function receive($queue, $interfaceConfig, $options) { $this->init($interfaceConfig); self::$conn->subscribe("/queue/" . $queue); $result = new ArrayList(); $count = 0; $limit = ($options && isset($options["limit"])) ? $options["limit"] : 0; while ((!$limit || $count < $limit) && ($frame = self::$conn->readFrame())) { $result->push(new MessageFrame($frame->body, $frame->headers, $queue)); self::$conn->ack($frame); $count++; } self::$conn->unsubscribe($queue); return $result; } } |