From 17fbe5c299d5a85abe1bb862233fcc515b597d26 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 16:01:05 +0000 Subject: [PATCH] Delete IPC entries --- src/Core/Worker.php | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index f84dd2947..619cb5bfa 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -98,8 +98,9 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { if (self::IPCJobsExists(getmypid())) { - self::IPCSetJobState(false, getmypid()); + self::IPCDeleteJobState(getmypid()); } + // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { @@ -1217,8 +1218,9 @@ class Worker } elseif ($pid) { // The parent process continues here DBA::connect(); - Logger::info('Spawned new worker', ['pid' => $pid]); + self::IPCSetJobState(true, $pid); + Logger::info('Spawned new worker', ['pid' => $pid]); $cycles = 0; while (self::IPCJobsExists($pid) && (++$cycles < 100)) { @@ -1231,15 +1233,11 @@ class Worker // We now are in the new worker DBA::connect(); + /// @todo Reinitialize the logger to set a new process_id and uid + + self::IPCSetJobState(true, getmypid()); Logger::info('Worker spawned', ['pid' => getmypid()]); - $cycles = 0; - while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { - usleep(10000); - } - - Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]); - self::processQueue($do_cron); self::unclaimProcess(); @@ -1477,6 +1475,7 @@ class Worker * Set the flag if some job is waiting * * @param boolean $jobs Is there a waiting job? + * @param int $key Key number * @throws \Exception */ public static function IPCSetJobState(bool $jobs, int $key = 0) @@ -1487,9 +1486,24 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } + /** + * Delete a key entry + * + * @param int $key Key number + * @throws \Exception + */ + public static function IPCDeleteJobState(int $key) + { + $stamp = (float)microtime(true); + DBA::delete('worker-ipc', ['key' => $key]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); + } + /** * Checks if some worker job waits to be executed * + * @param int $key Key number * @return bool * @throws \Exception */