Source of file MessageQueueTest.php
Size: 18,491 Bytes - Last Modified: 2021-12-23T10:32:53+00:00
/var/www/docs.ssmods.com/process/src/tests/MessageQueueTest.php
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 | <?php class MessageQueueTest extends SapphireTest { protected static $fixture_file = 'messagequeue/tests/MessageQueueTest.yml'; protected $extraDataObjects = array( 'MessageQueueTest_DataObject' ); /** * @TODO * *** Test rule matching: * - test multiple interfaces * - test lists of queue names in an interface * - test patterned queue names in an interface * - test passing over a patterned rule for another * *** Test message sending: * - test sending plain message over non SQL interface * - test different encodings * - test for send error * *** Test message receipt: * - test drop in rule execution * - test delivery via MethodInvocationMessage and non-SQL interface * - test delivery of non-MethodInvocationMessage * - test send and delivery of plain text message * - test callback on delivery error */ /** * Test MethodInvocationMessage class, independent from queueing. Test using * static method. */ public function testMethodInvocationStatic() { // test the initial values $inv = new MethodInvocationMessage("MessageQueueTest", "doStaticMethod", "p1", 2); $frame = new MessageFrame(); $frame->body = $inv; $conf = array(); $inv->execute($frame, $conf); $this->assertTrue(self::$testP1 == "p1", "Static method test executed, P1 test"); $this->assertTrue(self::$testP2 == 2, "Static method test executed, P2 test"); } /** * Test MethodInvocationMessage class, independent from queueing. Test using * object method. */ public function testMethodInvocationObject() { $inv = new MethodInvocationMessage(new MessageQueue_Object("p1"), "doNonDOMethod", "_suffix"); $frame = new MessageFrame(); $frame->body = $inv; $conf = array(); $inv->execute($frame, $conf); $this->assertTrue(MessageQueue_Object::$testP1 == "p1_suffix", "Non-DO method invocation correctly set its static output"); } /** * Test MethodInvocationMessage class, independent from queueing. Test using * data object method. */ public function testMethodInvocationDataObject() { $obj = new MessageQueueTest_DataObject(); $obj->prop1 = "p1"; $obj->prop2 = 2; $obj->write(); $id = $obj->ID; $inv = new MethodInvocationMessage($obj, "doDataObjectMethod", "_suffix"); $frame = new MessageFrame(); $frame->body = $inv; $conf = array(); $inv->execute($frame, $conf); $obj2 = MessageQueueTest_DataObject::get()->byID($id); $this->assertTrue($obj2->result == "p12_suffix", "DO method invocation correctly set its static output"); } private function getQueueSizeSimpleDB($queue) { $ds = SimpleDBMQ::get()->filter(array("QueueName"=>$queue)); if (!$ds) { return 0; } return ($ds->Count()); } /** * Test a message send using the default configuration (uses SimpleDBMQ, clears queue on * PHP shutdown. Note we need to disable force_immediate_delivery, otherwise the message * is not in the queue. */ public function testMessageSendDefaultConfig() { $old = MessageQueue::get_force_immediate_delivery(); MessageQueue::set_force_immediate_delivery(false); // set test-environment for message queue, store the current // testqueue message-queue to reset it after the test is done. $interfaces = MessageQueue::get_interfaces(); MessageQueue::clear_all_interfaces(); MessageQueue::add_interface("testqueue", array( "queues" => "testqueue", "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array( "log", "requeue" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testqueue") == 0, "Queue is empty before we put anything in it"); MessageQueue::send("testqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethod", "p1", 2)); // Check the message is queue in the database. $this->assertTrue($this->getQueueSizeSimpleDB("testqueue") == 1, "Queue has an item after we add to it"); // reset testqueue to the original state MessageQueue::set_all_interfaces($interfaces); MessageQueue::set_force_immediate_delivery($old); } /** * Test use of the SimpleDB queue, and with explicitly received message. * This tests the manual receive, and that the sent message comes back to us * just the way it was sent. */ public function testMessageSimpleDBExplicitReceive() { // save and disable force immediate, we are testing whether stuff gets queued. $old = MessageQueue::get_force_immediate_delivery(); MessageQueue::set_force_immediate_delivery(false); MessageQueue::add_interface("default", array( "queues" => "/.*/", "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array( "log", "requeue" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testqueue") == 0, "Queue is empty before we put anything in it"); MessageQueue::send("testqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethod", "p1", 2)); // check message in queue $this->assertTrue($this->getQueueSizeSimpleDB("testqueue") == 1, "Queue has an item after we add to it"); // get message $msgs = MessageQueue::get_messages("testqueue"); $this->assertTrue($msgs != null, "Got a set"); $this->assertTrue($msgs->Count() == 1, "Got one message"); $msg = $msgs->First(); $this->assertTrue($msg instanceof MessageFrame, "Message is a frame"); $this->assertTrue($msg->body instanceof MethodInvocationMessage, "Got a method invocation message"); $this->assertTrue($msg->body->objectOrClass == "MessageQueueTest" && $msg->body->method == "doStaticMethod", "Got the original message"); MessageQueue::set_force_immediate_delivery($old); } /** * This tests a couple of error handling cases. We have 2 queues, testmainqueue * and testerrorqueue. We queue a message that will fail on delivery, and * another after it which will execute successfully. Error handling is * to requeue all errors on testerrorqueue. So we expect that mainqueue is * empty after consumption, with the first error going to the error queue, * and the second message deliverying successfully. (i.e. error doesn't block * messages behind it. */ public function testRequeueOnError() { // Configure simple db queuing, two queues. MessageQueue::add_interface("default", array( "queues" => array("testmainqueue", "testerrorqueue"), "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array( "log", "requeue" => "testerrorqueue" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is empty before we put anything in it"); MessageQueue::send("testmainqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethodWithError", "p1", 2)); MessageQueue::send("testmainqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethod", "p1", 2)); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 2, "Main queue has two items after we add to it"); $this->assertTrue($this->getQueueSizeSimpleDB("testerrorqueue") == 0, "Error queue is empty before we put anything in it"); self::$testP1 = null; // clear the queue, causing the message to execute and fail MessageQueue::consume("testmainqueue"); // Check there is nothing in testmainqueue, and now something in testerrorqueue $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is cleared after consumption"); $this->assertTrue($this->getQueueSizeSimpleDB("testerrorqueue") == 1, "Error queue magically has one message in it"); $this->assertTrue(self::$testP1 == "p1", "Message with no error has been run"); } /** * Static method that throws an exception on delivery if called in a method. */ public static function doStaticMethodWithError() { // We pass an empty string so it doesn't show up in the TestRunner output throw new Exception(""); } private static $testP1 = null; private static $testP2 = null; public static function doStaticMethod($p1 = null, $p2 = null) { //user_error("barf"); self::$testP1 = $p1; self::$testP2 = $p2; } public function testDropOnError() { MessageQueue::add_interface("default", array( "queues" => array("testmainqueue"), "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array( "drop" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is empty before we put anything in it"); MessageQueue::send("testmainqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethodWithError", "p1", 2)); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 1, "Main queue has an item after we add to it"); // clear the queue, causing the message to execute and fail MessageQueue::consume("testmainqueue"); // Check there is nothing in testmainqueue, and now something in testerrorqueue $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is cleared after consumption"); } /** * Test that when a message is delivered by callback, the message disappears * off the queue, and the message we get is as we expect. */ public function testCallbackDelivery() { MessageQueue::add_interface("default", array( "queues" => array("testmainqueue"), "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "callback" => array("MessageQueueTest", "messageCallback"), "onerror" => array( "requeue" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is empty before we put anything in it"); MessageQueue::send("testmainqueue", new MethodInvocationMessage("MessageQueueTest", "doStaticMethod", "p1", 2)); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 1, "Main queue has an item after we add to it"); self::$message_frame = null; // clear the queue, causing the callback to be executed, which will leave the message in self::$message_frame MessageQueue::consume("testmainqueue"); // Check there is nothing in testmainqueue, and now something in testerrorqueue $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is cleared after consumption"); $this->assertTrue(self::$message_frame != null, "Message has been captured"); $this->assertTrue(self::$message_frame->body != null, "Message has body"); $this->assertTrue(self::$message_frame->body instanceof MethodInvocationMessage, "Message is the same type of object we sent"); } /** * Our callback function simply stores the message frame it gets so * the test can examine it. */ private static $message_frame = null; public static function messageCallback($msgframe, $conf) { self::$message_frame = $msgframe; } /** * Send a message that is already a frame. Ensure what we get back * is what we sent in the frame. */ public function testFrameSend() { MessageQueue::add_interface("default", array( "queues" => array("testmainqueue"), "implementation" => "SimpleDBMQ", "encoding" => "raw", // no encoding "send" => array( "onShutdown" => "none" ), "delivery" => array( "callback" => array("MessageQueueTest", "messageCallback"), "onerror" => array( "requeue" ) ) )); $testData = "Nigel mouse"; $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 0, "Main queue is empty before we put anything in it"); $frame = new MessageFrame($testData); MessageQueue::send("testmainqueue", $frame); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue") == 1, "Main queue has an item after we add to it"); // Get messages, and make sure we get it back. $msgs = MessageQueue::get_messages("testmainqueue"); $this->assertTrue($msgs != null, "Got a set"); $this->assertTrue($msgs->Count() == 1, "Got one message"); $msg = $msgs->First(); $this->assertTrue($msg instanceof MessageFrame, "Message is a frame"); $this->assertTrue($msg->body == $testData); } /** * Test consume_all_queues using SimpleDB. We use a counter static method * and count the number of times its called, for messages on multiple * calls. */ public function testMultipleConsume() { MessageQueue::add_interface("default", array( "queues" => array("testmainqueue1", "testmainqueue2", "testmainqueue3"), "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array( "requeue" ) ) )); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue1") == 0, "Queue 1 is empty before we put anything in it"); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue2") == 0, "Queue 2 is empty before we put anything in it"); $this->assertTrue($this->getQueueSizeSimpleDB("testmainqueue3") == 0, "Queue 3 is empty before we put anything in it"); self::$countedCalls = 0; MessageQueue::send("testmainqueue1", new MethodInvocationMessage("MessageQueueTest", "doCountCalls")); MessageQueue::send("testmainqueue2", new MethodInvocationMessage("MessageQueueTest", "doCountCalls")); MessageQueue::send("testmainqueue3", new MethodInvocationMessage("MessageQueueTest", "doCountCalls")); $this->assertTrue(MessageQueue::consume_all_queues("default") == 3, "Consumed messages off 3 queues"); $this->assertTrue(self::$countedCalls == 3, "3 messages were delivered"); } public function testInterSS() { $simq = new SimpleInterSSMQ(); $frame = new MessageFrame(); $frame->body = "hello"; $conf = array("implementation_options" => array("remoteServer" => "http://localhost/test_receiver")); $dummy = $simq->encode("test", $frame, $conf); } public static $countedCalls = 0; public static function doCountCalls() { self::$countedCalls++; } public static $saved_interfaces = null; /** * Get the existing configuration * @return void */ public function setUpOnce() { self::$saved_interfaces = MessageQueue::get_interfaces(); // Clear all interface definitions. Individual tests will provide their own. foreach (self::$saved_interfaces as $name => $def) { MessageQueue::remove_interface($name); } parent::setUpOnce(); } // At the start of each test, ensure the default configuration is in place. Many of the tests // expect this. public function setUp() { // clear whatever is there foreach (MessageQueue::get_interfaces() as $name => $def) { MessageQueue::remove_interface($name); } // add the default ones. foreach (self::$saved_interfaces as $name => $def) { MessageQueue::add_interface($name, $def); } parent::setUp(); } /** * After executing the message queue tests, restore the original queue interfaces. * @return void */ public function tearDownOnce() { // Remove any queue definitions that are set up by tests. Currently only 'default' is used. MessageQueue::remove_interface("default"); // Restore each interface. foreach (self::$saved_interfaces as $name => $def) { MessageQueue::add_interface($name, $def); } parent::tearDownOnce(); } } class MessageQueueTest_DataObject extends DataObject implements TestOnly { private static $db = array( "prop1" => "Varchar", "prop2" => "Int", "result" => "Varchar" ); public function doDataObjectMethod($p1 = null) { $this->result = $this->prop1 . $this->prop2 . $p1; $this->write(); } } class MessageQueue_Object extends Object implements TestOnly { public $prop1 = null; public static $testP1 = null; public function __construct($p1 = null) { $this->prop1 = $p1; } public function doNonDOMethod($p1 = null) { self::$testP1 = $this->prop1 . $p1; } } |