\MessageQueue
Main interface to simple message queueing system. See docs/message_queue.md for details.
- Author: Mark Stephens <mark@silverstripe.com>
Synopsis
class MessageQueue
{
- // members
- protected static array $interfaces = ;
- private static boolean $foreground_process = false;
- protected static Array $queues_to_flush_on_shutdown = NULL;
- protected static String $debugging_path = NULL;
- protected static boolean $force_immediate_delivery = false;
- protected static string $onshutdown_option = "sake";
- protected static string $onshutdown_arg = "";
- protected static boolean $force_onshutdown_when_testing = false;
- // methods
- public static void MessageQueueProcess()
- public static void clear_all_interfaces()
- public static void set_all_interfaces()
- public static void add_interface()
- public static void remove_interface()
- public static void get_interfaces()
- public static void set_debugging()
- public static void set_force_immediate_delivery()
- public static void get_force_immediate_delivery()
- public static void set_onshutdown_option()
- public static void set_force_onshutdown_when_testing()
- public static void get_force_onshutdown_when_testing()
- public static void send()
- public static void consume_on_shutdown()
- public static void consume_in_subprocess()
- public static String get_queue_interface()
- public static Array get_queue_config()
- public static void flush()
- public static Int consume()
- public static DataObjectSet get_messages()
- public static void encode_message()
- public static void decode_message()
- public static void consume_all_queues()
- public static void deliver_message()
Tasks
Line | Task |
---|---|
353+ | This is copied directly from the code in consume_on_shutdown, which should be refactored to use this method once it's behaviour is verified. |
548+ | Generalise this. Note this may not be a function purely of the message body, and may entail reading and/or updating message headers, hence passing the whole frame. |
610+ | : would be easy to generalise onerror into a list of these actions that are done in sequence. e.g. log an error and then requeue it. |
610+ | : would be good to have a failure count on the error message. Not sure how to represent this generally. |
Members
private
-
$foreground_process
Optionally defined to 'true' via YML, this will stop MessageQueue_Process processing in the background if set to true
protected
- $debugging_path
—
String
Location of debugging files, null if not debugging. -
$force_immediate_delivery
Short-circuit the MessageQueue, so it delivers immediately. -
$force_onshutdown_when_testing
By default, when unit tests are run, the onshutdown is not actually registered and the action is not executed, as it is executed outside the test environment. It is difficult but not impossible for unit tests to check that messages have been delivered. If this is set to true, the shutdown function will be registered even when running unit tests, to cater for those tests that have a way to check delivery of the message. Set via set_force_onshutdown_when_testing(). -
$interfaces
Main configuration for the module, defaulted to a simple db MQ for all queues with automatic clearing of queues on PHP shutdown. This should only be altered/replaced with calls to add_interface()/remove_interface(). - $onshutdown_arg
- $onshutdown_option
- $queues_to_flush_on_shutdown
—
Array
An array of queues that need to be consumed after PHP shutdown. If this is null, there are none to consume, and the php shutdown function won't be called. Otherwise it is a map of queue names => true.
Methods
public
- MessageQueueProcess() — Decide whether or not to run the MessageQueue_Process in foreground or background
- add_interface() — Add an interface with it's configuration. If there is already a configuration of that name, it will be replaced.
- clear_all_interfaces() — Clears all interfaces with it's configuration and sets the interfaces to an empty array.
- consume() — Consume messages from a queue and deliver them.
- consume_all_queues() — Consume messages from all queues on a specific interface.
- consume_in_subprocess() — For a given queue, create a subprocess to send/consume messages on the queue as per the configuration for the queue. This can be used in consume_on_shutdown, and can also be used by the message processor to retrigger more messages.
- consume_on_shutdown() — PHP shutdown function for processing any queues that need to be processed/consumed after the PHP process is done. For each queue that needs to be processed, it starts a new sub-process for queue.
- decode_message() — Decode the message using the encoding specified in the configuration provided.
- deliver_message() — Determine how to deliver the message. This depends on what's in the interface configuration, delivery section.
- encode_message() — Encode the message using the encoding specified in the configuration provided.
- flush() — Flush any buffered messages on the specified queue. If the queue is not buffered, or the buffer is empty, it has no effect.
- get_force_immediate_delivery()
- get_force_onshutdown_when_testing()
- get_interfaces() — Method to return the interfaces configuration, primarily for the message queue so it can restore back.
- get_messages() — Get messages from a queue. This should only be used in circumstances where the retrieved messages are being processed as part of the user interaction, or where for some reason normal message delivery is not appropriate.
- get_queue_config() — Given a queue name, find the interface that will handle this queue. Returns the configuration of the interface, and does some basic checks that it has what's needed to work with it. If checks fail, exceptions are thrown.
- get_queue_interface() — Given a queue name, return the name of the interface that will handle it. If the queue name is not provided, the first interface is returned. If there is no interface that handles the queue, returns null.
- remove_interface() — Removed a named interface.
- send() — Send a message to queue. Works out which interface to send it to, and dispatches it.
- set_all_interfaces() — Sets all interfaces with it's configuration. Existing interfaces will be overwritten.
- set_debugging() — Supports debugging, specifically for php shutdown debugging which is otherwise impossible. If not set, stderr and stdout on php shutdown processes are redirected to /dev/null. If the path is set, stdout is redirector to $path/msgq.stdout and $path/msgq.stderr.
- set_force_immediate_delivery()
- set_force_onshutdown_when_testing()
- set_onshutdown_option() — This sets the mode in which onShutdown is handled, and may need to be called if the shutdown processing doesn't work.