<?php namespace Illuminate\Queue; use Exception; use Throwable; use Illuminate\Contracts\Queue\Job; use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\Failed\FailedJobProviderInterface; use Symfony\Component\Debug\Exception\FatalThrowableError; use Illuminate\Contracts\Cache\Repository as CacheContract; class Worker { /** * The queue manager instance. * * @var \Illuminate\Queue\QueueManager */ protected $manager; /** * The failed job provider implementation. * * @var \Illuminate\Queue\Failed\FailedJobProviderInterface */ protected $failer; /** * The event dispatcher instance. * * @var \Illuminate\Contracts\Events\Dispatcher */ protected $events; /** * The cache repository implementation. * * @var \Illuminate\Contracts\Cache\Repository */ protected $cache; /** * The exception handler instance. * * @var \Illuminate\Foundation\Exceptions\Handler */ protected $exceptions; /** * Create a new queue worker. * * @param \Illuminate\Queue\QueueManager $manager * @param \Illuminate\Queue\Failed\FailedJobProviderInterface $failer * @param \Illuminate\Contracts\Events\Dispatcher $events * @return void */ public function __construct(QueueManager $manager, FailedJobProviderInterface $failer = null, Dispatcher $events = null) { $this->failer = $failer; $this->events = $events; $this->manager = $manager; } /** * Listen to the given queue in a loop. * * @param string $connectionName * @param string $queue * @param int $delay * @param int $memory * @param int $sleep * @param int $maxTries * @return array */ public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0) { $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { if ($this->daemonShouldRun()) { $this->runNextJobForDaemon( $connectionName, $queue, $delay, $sleep, $maxTries ); } else { $this->sleep($sleep); } if ($this->memoryExceeded($memory) || $this->queueShouldRestart($lastRestart)) { $this->stop(); } } } /** * Run the next job for the daemon worker. * * @param string $connectionName * @param string $queue * @param int $delay * @param int $sleep * @param int $maxTries * @return void */ protected function runNextJobForDaemon($connectionName, $queue, $delay, $sleep, $maxTries) { try { $this->pop($connectionName, $queue, $delay, $sleep, $maxTries); } catch (Exception $e) { if ($this->exceptions) { $this->exceptions->report($e); } } catch (Throwable $e) { if ($this->exceptions) { $this->exceptions->report(new FatalThrowableError($e)); } } } /** * Determine if the daemon should process on this iteration. * * @return bool */ protected function daemonShouldRun() { return $this->manager->isDownForMaintenance() ? false : $this->events->until('illuminate.queue.looping') !== false; } /** * Listen to the given queue. * * @param string $connectionName * @param string $queue * @param int $delay * @param int $sleep * @param int $maxTries * @return array */ public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0) { try { $connection = $this->manager->connection($connectionName); $job = $this->getNextJob($connection, $queue); // If we're able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if (! is_null($job)) { return $this->process( $this->manager->getName($connectionName), $job, $maxTries, $delay ); } } catch (Exception $e) { if ($this->exceptions) { $this->exceptions->report($e); } } $this->sleep($sleep); return ['job' => null, 'failed' => false]; } /** * Get the next job from the queue connection. * * @param \Illuminate\Contracts\Queue\Queue $connection * @param string $queue * @return \Illuminate\Contracts\Queue\Job|null */ protected function getNextJob($connection, $queue) { if (is_null($queue)) { return $connection->pop(); } foreach (explode(',', $queue) as $queue) { if (! is_null($job = $connection->pop($queue))) { return $job; } } } /** * Process a given job from the queue. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @param int $maxTries * @param int $delay * @return array|null * * @throws \Throwable */ public function process($connection, Job $job, $maxTries = 0, $delay = 0) { if ($maxTries > 0 && $job->attempts() > $maxTries) { return $this->logFailedJob($connection, $job); } try { $this->raiseBeforeJobEvent($connection, $job); // First we will fire off the job. Once it is done we will see if it will be // automatically deleted after processing and if so we'll fire the delete // method on the job. Otherwise, we will just keep on running our jobs. $job->fire(); $this->raiseAfterJobEvent($connection, $job); return ['job' => $job, 'failed' => false]; } catch (Exception $e) { $this->handleJobException($connection, $job, $delay, $e); } catch (Throwable $e) { $this->handleJobException($connection, $job, $delay, $e); } } /** * Handle an exception that occurred while the job was running. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @param int $delay * @param \Throwable $e * @return void * * @throws \Throwable */ protected function handleJobException($connection, Job $job, $delay, $e) { // If we catch an exception, we will attempt to release the job back onto // the queue so it is not lost. This will let is be retried at a later // time by another listener (or the same one). We will do that here. try { $this->raiseExceptionOccurredJobEvent( $connection, $job, $e ); } finally { if (! $job->isDeleted()) { $job->release($delay); } } throw $e; } /** * Raise the before queue job event. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @return void */ protected function raiseBeforeJobEvent($connection, Job $job) { if ($this->events) { $data = json_decode($job->getRawBody(), true); $this->events->fire(new Events\JobProcessing($connection, $job, $data)); } } /** * Raise the after queue job event. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @return void */ protected function raiseAfterJobEvent($connection, Job $job) { if ($this->events) { $data = json_decode($job->getRawBody(), true); $this->events->fire(new Events\JobProcessed($connection, $job, $data)); } } /** * Raise the exception occurred queue job event. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @param \Throwable $exception * @return void */ protected function raiseExceptionOccurredJobEvent($connection, Job $job, $exception) { if ($this->events) { $data = json_decode($job->getRawBody(), true); $this->events->fire(new Events\JobExceptionOccurred($connection, $job, $data, $exception)); } } /** * Log a failed job into storage. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @return array */ protected function logFailedJob($connection, Job $job) { if ($this->failer) { $failedId = $this->failer->log($connection, $job->getQueue(), $job->getRawBody()); $job->delete(); $job->failed(); $this->raiseFailedJobEvent($connection, $job, $failedId); } return ['job' => $job, 'failed' => true]; } /** * Raise the failed queue job event. * * @param string $connection * @param \Illuminate\Contracts\Queue\Job $job * @param int|null $failedId * @return void */ protected function raiseFailedJobEvent($connection, Job $job, $failedId) { if ($this->events) { $data = json_decode($job->getRawBody(), true); $this->events->fire(new Events\JobFailed($connection, $job, $data, $failedId)); } } /** * Determine if the memory limit has been exceeded. * * @param int $memoryLimit * @return bool */ public function memoryExceeded($memoryLimit) { return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; } /** * Stop listening and bail out of the script. * * @return void */ public function stop() { $this->events->fire(new Events\WorkerStopping); die; } /** * Sleep the script for a given number of seconds. * * @param int $seconds * @return void */ public function sleep($seconds) { sleep($seconds); } /** * Get the last queue restart timestamp, or null. * * @return int|null */ protected function getTimestampOfLastQueueRestart() { if ($this->cache) { return $this->cache->get('illuminate:queue:restart'); } } /** * Determine if the queue worker should restart. * * @param int|null $lastRestart * @return bool */ protected function queueShouldRestart($lastRestart) { return $this->getTimestampOfLastQueueRestart() != $lastRestart; } /** * Set the exception handler to use in Daemon mode. * * @param \Illuminate\Contracts\Debug\ExceptionHandler $handler * @return void */ public function setDaemonExceptionHandler(ExceptionHandler $handler) { $this->exceptions = $handler; } /** * Set the cache repository implementation. * * @param \Illuminate\Contracts\Cache\Repository $cache * @return void */ public function setCache(CacheContract $cache) { $this->cache = $cache; } /** * Get the queue manager instance. * * @return \Illuminate\Queue\QueueManager */ public function getManager() { return $this->manager; } /** * Set the queue manager instance. * * @param \Illuminate\Queue\QueueManager $manager * @return void */ public function setManager(QueueManager $manager) { $this->manager = $manager; } }