diff --git a/composer.json b/composer.json index 6f46377..5ce3722 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "amphp/amp": "^3", "amphp/byte-stream": "^2", "amphp/cache": "^2", - "amphp/parallel": "^2.1", + "amphp/parallel": "2.x-dev as 2.3", "amphp/sync": "^2", "revolt/event-loop": "^1" }, diff --git a/src/Driver/ParallelFilesystemDriver.php b/src/Driver/ParallelFilesystemDriver.php index 58ff3c4..969bdd0 100644 --- a/src/Driver/ParallelFilesystemDriver.php +++ b/src/Driver/ParallelFilesystemDriver.php @@ -7,6 +7,7 @@ use Amp\File\Internal; use Amp\Future; use Amp\Parallel\Worker\ContextWorkerPool; +use Amp\Parallel\Worker\DelegatingWorkerPool; use Amp\Parallel\Worker\TaskFailureThrowable; use Amp\Parallel\Worker\Worker; use Amp\Parallel\Worker\WorkerException; @@ -22,27 +23,43 @@ final class ParallelFilesystemDriver implements FilesystemDriver /** @var positive-int Maximum number of workers to use for open files. */ private readonly int $workerLimit; - /** @var \SplObjectStorage Worker storage. */ - private \SplObjectStorage $workerStorage; + /** @var \WeakMap */ + private \WeakMap $workerStorage; - /** @var Future Pending worker request */ - private Future $pendingWorker; + /** @var Future|null Pending worker request */ + private ?Future $pendingWorker = null; /** * @param WorkerPool|null $pool Custom worker pool to use for file workers. If null, a new pool is created. - * @param int $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead of - * using this parameter, provide a pool with a limited number of workers. + * @param int|null $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead + * of using this parameter, provide a pool with a limited number of workers or use {@see DelegatingWorkerPool}. */ - public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT) - { + public function __construct(?WorkerPool $pool = null, ?int $workerLimit = null) + { + /** @var \WeakMap For Psalm. */ + $this->workerStorage = new \WeakMap(); + + if ($workerLimit !== null) { + \trigger_error( + 'The $workerLimit parameter is deprecated and will be removed in the next major version.' . + ' To limit the number of workers used from the given pool, use ' . DelegatingWorkerPool::class . + ' instead.', + \E_USER_DEPRECATED, + ); + } + + $workerLimit ??= match ($pool::class) { + ContextWorkerPool::class => \min(self::DEFAULT_WORKER_LIMIT, $pool->getLimit()), + DelegatingWorkerPool::class => $pool->getLimit(), + default => self::DEFAULT_WORKER_LIMIT, + }; + if ($workerLimit <= 0) { throw new \ValueError("Worker limit must be a positive integer"); } $this->pool = $pool ?? new ContextWorkerPool($workerLimit); $this->workerLimit = $workerLimit; - $this->workerStorage = new \SplObjectStorage(); - $this->pendingWorker = Future::complete(); } public function openFile(string $path, string $mode): ParallelFile @@ -51,12 +68,12 @@ public function openFile(string $path, string $mode): ParallelFile $workerStorage = $this->workerStorage; $worker = new Internal\FileWorker($worker, static function (Worker $worker) use ($workerStorage): void { - if (!$workerStorage->contains($worker)) { + if (!isset($workerStorage[$worker])) { return; } if (($workerStorage[$worker] -= 1) === 0 || !$worker->isRunning()) { - $workerStorage->detach($worker); + unset($workerStorage[$worker]); } }); @@ -73,26 +90,19 @@ public function openFile(string $path, string $mode): ParallelFile private function selectWorker(): Worker { - $this->pendingWorker->await(); // Wait for any currently pending request for a worker. + $this->pendingWorker?->await(); // Wait for any currently pending request for a worker. if ($this->workerStorage->count() < $this->workerLimit) { $this->pendingWorker = async($this->pool->getWorker(...)); $worker = $this->pendingWorker->await(); - if ($this->workerStorage->contains($worker)) { - // amphp/parallel v1.x may return an already used worker from the pool. - $this->workerStorage[$worker] += 1; - } else { - // amphp/parallel v2.x should always return an unused worker. - $this->workerStorage->attach($worker, 1); - } + $this->workerStorage[$worker] = 1; return $worker; } $max = \PHP_INT_MAX; - foreach ($this->workerStorage as $storedWorker) { - $count = $this->workerStorage[$storedWorker]; + foreach ($this->workerStorage as $storedWorker => $count) { if ($count <= $max) { $worker = $storedWorker; $max = $count; @@ -102,7 +112,7 @@ private function selectWorker(): Worker \assert(isset($worker) && $worker instanceof Worker); if (!$worker->isRunning()) { - $this->workerStorage->detach($worker); + unset($this->workerStorage[$worker]); return $this->selectWorker(); }