Skip to content

Commit

Permalink
Trigger deprecation notice
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Sep 2, 2024
1 parent 4066656 commit dbb8ac1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"amphp/amp": "^3",
"amphp/byte-stream": "^2",
"amphp/cache": "^2",
"amphp/parallel": "^2.1",
"amphp/parallel": "2.x-dev as 2.3",
"amphp/sync": "^2",
"revolt/event-loop": "^1"
},
Expand Down
56 changes: 33 additions & 23 deletions src/Driver/ParallelFilesystemDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Amp\File\Internal;
use Amp\Future;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\DelegatingWorkerPool;
use Amp\Parallel\Worker\TaskFailureThrowable;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\WorkerException;
Expand All @@ -22,27 +23,43 @@ final class ParallelFilesystemDriver implements FilesystemDriver
/** @var positive-int Maximum number of workers to use for open files. */
private readonly int $workerLimit;

/** @var \SplObjectStorage<Worker, int> Worker storage. */
private \SplObjectStorage $workerStorage;
/** @var \WeakMap<Worker, int> */
private \WeakMap $workerStorage;

/** @var Future Pending worker request */
private Future $pendingWorker;
/** @var Future<Worker>|null Pending worker request */
private ?Future $pendingWorker = null;

/**
* @param WorkerPool|null $pool Custom worker pool to use for file workers. If null, a new pool is created.
* @param int $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead of
* using this parameter, provide a pool with a limited number of workers.
* @param int|null $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead
* of using this parameter, provide a pool with a limited number of workers or use {@see DelegatingWorkerPool}.
*/
public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
{
public function __construct(?WorkerPool $pool = null, ?int $workerLimit = null)
{
/** @var \WeakMap<Worker, int> For Psalm. */
$this->workerStorage = new \WeakMap();

if ($workerLimit !== null) {
\trigger_error(
'The $workerLimit parameter is deprecated and will be removed in the next major version.' .
' To limit the number of workers used from the given pool, use ' . DelegatingWorkerPool::class .
' instead.',
\E_USER_DEPRECATED,
);
}

$workerLimit ??= match ($pool::class) {
ContextWorkerPool::class => \min(self::DEFAULT_WORKER_LIMIT, $pool->getLimit()),
DelegatingWorkerPool::class => $pool->getLimit(),
default => self::DEFAULT_WORKER_LIMIT,
};

if ($workerLimit <= 0) {
throw new \ValueError("Worker limit must be a positive integer");
}

$this->pool = $pool ?? new ContextWorkerPool($workerLimit);
$this->workerLimit = $workerLimit;
$this->workerStorage = new \SplObjectStorage();
$this->pendingWorker = Future::complete();
}

public function openFile(string $path, string $mode): ParallelFile
Expand All @@ -51,12 +68,12 @@ public function openFile(string $path, string $mode): ParallelFile

$workerStorage = $this->workerStorage;
$worker = new Internal\FileWorker($worker, static function (Worker $worker) use ($workerStorage): void {
if (!$workerStorage->contains($worker)) {
if (!isset($workerStorage[$worker])) {
return;
}

if (($workerStorage[$worker] -= 1) === 0 || !$worker->isRunning()) {
$workerStorage->detach($worker);
unset($workerStorage[$worker]);
}
});

Expand All @@ -73,26 +90,19 @@ public function openFile(string $path, string $mode): ParallelFile

private function selectWorker(): Worker
{
$this->pendingWorker->await(); // Wait for any currently pending request for a worker.
$this->pendingWorker?->await(); // Wait for any currently pending request for a worker.

if ($this->workerStorage->count() < $this->workerLimit) {
$this->pendingWorker = async($this->pool->getWorker(...));
$worker = $this->pendingWorker->await();

if ($this->workerStorage->contains($worker)) {
// amphp/parallel v1.x may return an already used worker from the pool.
$this->workerStorage[$worker] += 1;
} else {
// amphp/parallel v2.x should always return an unused worker.
$this->workerStorage->attach($worker, 1);
}
$this->workerStorage[$worker] = 1;

return $worker;
}

$max = \PHP_INT_MAX;
foreach ($this->workerStorage as $storedWorker) {
$count = $this->workerStorage[$storedWorker];
foreach ($this->workerStorage as $storedWorker => $count) {
if ($count <= $max) {
$worker = $storedWorker;
$max = $count;
Expand All @@ -102,7 +112,7 @@ private function selectWorker(): Worker
\assert(isset($worker) && $worker instanceof Worker);

if (!$worker->isRunning()) {
$this->workerStorage->detach($worker);
unset($this->workerStorage[$worker]);
return $this->selectWorker();
}

Expand Down

0 comments on commit dbb8ac1

Please sign in to comment.