Symbiote\QueuedJobs\Services\QueuedJobService
A service that can be used for starting, stopping and listing queued jobs.
When a job is first added, it is initialised, its job type determined, then persisted to the database
When the queues are scanned, a job is reloaded and processed. Ignoring the persistence and reloading, it looks
something like
job->getJobType();
job->getJobData();
data->write();
job->setup();
while !job->isComplete
job->process();
job->getJobData();
data->write();
- Author: Marcus Nyeholt <marcus@symbiote.com.au>
- License: BSD http://silverstripe.org/bsd-license/
Synopsis
class QueuedJobService
{
- // members
- private static integer $stall_threshold = 3;
- private static integer $memory_limit = 268435456;
- private static integer $time_limit = 0;
- private static bool $disable_health_check = false;
- private static integer $max_init_jobs = 0;
- private static string $worker_ttl = 'PT5M';
- private static string $initialising_state_ttl = 'PT2M';
- protected integer $startedAt = 0;
- private static boolean $use_shutdown_function = true;
- private static string $cache_dir = 'queuedjobs';
- private static bool $lock_file_enabled = false;
- private static string $lock_file_name = 'maintenance-lock.txt';
- private static string $lock_file_path = '';
- private LoggerInterface $logger;
- public DefaultQueueHandler $queueHandler;
- public TaskRunnerEngine $queueRunner;
- public array $defaultJobs = ;
- // methods
- public void __construct()
- public int queueJob()
- public void startJob()
- public bool isAtMaxJobs()
- protected void copyJobToDescriptor()
- protected void copyDescriptorToJob()
- public QueuedJobDescriptor|null getNextPendingJob()
- public array checkJobHealth()
- public void checkDefaultJobs()
- protected void restartStalledJob()
- protected QueuedJob|boolean initialiseJob()
- protected bool grabMutex()
- public boolean runJob()
- protected mixed withNestedState()
- protected void handleBrokenJobException()
- protected null|Member setRunAsUser()
- protected void unsetRunAsUser()
- protected void markStarted()
- protected bool hasPassedTimeLimit()
- protected bool isMemoryTooHigh()
- protected float getMemoryUsage()
- protected float getMemoryLimit()
- protected float getPHPMemoryLimit()
- protected float parseMemory()
- protected void humanReadable()
- public array getJobList()
- public string getJobListFilter()
- public void runQueue()
- public void processJobQueue()
- public void onShutdown()
- public LoggerInterface getLogger()
- public void setLogger()
- public void enableMaintenanceLock()
- public void disableMaintenanceLock()
- public bool isMaintenanceLockActive()
- protected string getWorkerExpiry()
- protected string getInitStateExpiry()
- private void addJobHandlersToLogger()
- protected void releaseJobLock()
- protected void markJobAsBroken()
- private string lockFilePath()
Hierarchy
Uses
- SilverStripe\Core\Config\Configurable
- SilverStripe\Core\Injector\Injectable
- SilverStripe\Core\Extensible
Members
private
- $cache_dir
—
string
The location for immediate jobs to be stored in - $disable_health_check
—
Symbiote\QueuedJobs\Services\bool
Disable health checks that usually occur when a runner first picks up a queue. Note that completely disabling health checks could result in many jobs that are always marked as running - that will never be restarted. If this option is disabled you may alternatively use the build task - $initialising_state_ttl
—
string
Duration for TTL of initialising state based on ISO 8601 duration specification. - $lock_file_enabled
—
Symbiote\QueuedJobs\Services\bool
Maintenance lock file feature enabled / disable setting - $lock_file_name
—
string
Maintenance lock file name - $lock_file_path
—
string
Maintenance lock path (relative path starting at the base folder) Note that this path needs to point to a folder on a shared drive if multiple instances are used - $logger — Psr\Log\LoggerInterface
- $max_init_jobs
—
int
Maximum number of jobs that can be initialised at any one time. - $memory_limit
—
int
How much ram will we allow before pausing and releasing the memory? - $stall_threshold — int
- $time_limit
—
int
Optional time limit (in seconds) to run the service before restarting to release resources. - $use_shutdown_function
—
boolean
Should "immediate" jobs be managed using the shutdown function? - $worker_ttl
—
string
Duration for TTL of queue workers based on ISO 8601 duration specification.
protected
- $startedAt
—
int
Timestamp (in seconds) when the queue was started
public
- $defaultJobs
—
array
Config controlled list of default/required jobs - $queueHandler — Symbiote\QueuedJobs\Services\DefaultQueueHandler
- $queueRunner — Symbiote\QueuedJobs\Tasks\Engines\TaskRunnerEngine
Methods
private
- addJobHandlersToLogger() — Add job-specific logger functionality which has the ability to flush logs into the job descriptor database record. Based on the default logger set for this class, which means it'll also log to other channels (e.g. stdout/stderr).
- lockFilePath()
protected
- copyDescriptorToJob()
- copyJobToDescriptor() — Copies data from a job into a descriptor for persisting
- getInitStateExpiry() — Get expiry time for a INIT state of a queued job this helps to identify jobs that have stalled more accurately
- getMemoryLimit() — Determines the memory limit (in bytes) for this application Limits to the smaller of memory_limit configured via php.ini or silverstripe config
- getMemoryUsage() — Get peak memory usage of this application
- getPHPMemoryLimit() — Calculate the current memory limit of the server
- getWorkerExpiry() — Get expiry time for a worker to be operating on a job, helps to identify jobs that have stalled more accurately.
- grabMutex() — Given a {@link QueuedJobDescriptor} mark the job as initialised. Works sort of like a mutex.
- handleBrokenJobException()
- hasPassedTimeLimit() — Is execution time too long?
- humanReadable()
- initialiseJob() — Prepares the given jobDescriptor for execution. Returns the job that will actually be run in a state ready for executing.
- isMemoryTooHigh() — Is memory usage too high?
- markJobAsBroken() — Mark a Job as Broken and release the lock so it can be resumed
- markStarted() — Start timer
- parseMemory() — Convert memory limit string to bytes.
- releaseJobLock() — Release job lock on the descriptor so it can run again
- restartStalledJob() — Attempt to restart a stalled job
- setRunAsUser()
- unsetRunAsUser()
- withNestedState() — Provides a wrapper when executing arbitrary code contained in job implementation this ensures that job specific code doesn't alter the configuration of the queue runner execution
public
- __construct() — QueuedJobService constructor.
- checkDefaultJobs() — Checks through ll the scheduled jobs that are expected to exist
- checkJobHealth() — Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing between each run. If it's not, then we need to flag it as paused due to an error.
- disableMaintenanceLock()
- enableMaintenanceLock()
- getJobList() — Gets a list of all the current jobs (or jobs that have recently finished)
- getJobListFilter() — Return the SQL filter used to get the job list - this is used by the UI for displaying the job list...
- getLogger() — Get a logger
- getNextPendingJob() — Check the current job queues and see if any of the jobs currently in there should be started. If so, return the next job that should be executed
- isAtMaxJobs() — Check if maximum number of jobs are currently initialised.
- isMaintenanceLockActive()
- onShutdown() — When PHP shuts down, we want to process all of the immediate queue items
- processJobQueue() — Process all jobs from a given queue
- queueJob() — Adds a job to the queue to be started
- runJob() — Start the actual execution of a job.
- runQueue() — Process the job queue with the current queue runner
- setLogger()
- startJob() — Start a job (or however the queue handler determines it should be started)