Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reuse global worker pool for files #84

Merged
merged 6 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/Driver/ParallelFilesystemDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Amp\Future;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\TaskFailureThrowable;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\WorkerException;
use Amp\Parallel\Worker\WorkerPool;
use function Amp\async;
use function Amp\Parallel\Worker\workerPool;

final class ParallelFilesystemDriver implements FilesystemDriver
{
public const DEFAULT_WORKER_LIMIT = 8;

private WorkerPool $pool;
private readonly WorkerPool $pool;

/** @var int Maximum number of workers to use for open files. */
private int $workerLimit;
/** @var positive-int Maximum number of workers to use for open files. */
private readonly int $workerLimit;

/** @var \SplObjectStorage<Worker, int> Worker storage. */
private \SplObjectStorage $workerStorage;
Expand All @@ -29,11 +29,17 @@ final class ParallelFilesystemDriver implements FilesystemDriver
private Future $pendingWorker;

/**
* @param int $workerLimit Maximum number of workers to use from the pool for open files.
* @param WorkerPool|null $pool Custom worker pool to use for file workers. If null, a new pool is created.
* @param int $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead of
* using this parameter, provide a pool with a limited number of workers.
*/
public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
{
$this->pool = $pool ?? workerPool();
if ($workerLimit <= 0) {
throw new \ValueError("Worker limit must be a positive integer");
}

$this->pool = $pool ?? new ContextWorkerPool($workerLimit);
trowski marked this conversation as resolved.
Show resolved Hide resolved
$this->workerLimit = $workerLimit;
$this->workerStorage = new \SplObjectStorage();
$this->pendingWorker = Future::complete();
Expand Down
21 changes: 3 additions & 18 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\File;

use Amp\File\Driver\BlockingFilesystemDriver;
use Amp\File\Driver\EioFilesystemDriver;
use Amp\File\Driver\ParallelFilesystemDriver;
use Amp\File\Driver\StatusCachingFilesystemDriver;
Expand All @@ -27,24 +26,14 @@ function filesystem(?FilesystemDriver $driver = null): Filesystem
return $map[$loop];
}

$defaultDriver = createDefaultDriver();
$driver = createDefaultDriver();

if (!\defined("AMP_WORKER")) { // Prevent caching in workers, cache in parent instead.
$defaultDriver = new StatusCachingFilesystemDriver($defaultDriver);
$driver = new StatusCachingFilesystemDriver($driver);
}

$filesystem = new Filesystem($defaultDriver);
} else {
$filesystem = new Filesystem($driver);
}

if (\defined("AMP_WORKER") && $driver instanceof ParallelFilesystemDriver) {
throw new \Error("Cannot use the parallel driver within a worker");
}

$map[$loop] = $filesystem;

return $filesystem;
return $map[$loop] = new Filesystem($driver);
}

/**
Expand All @@ -62,10 +51,6 @@ function createDefaultDriver(): FilesystemDriver
return new EioFilesystemDriver($driver);
}

if (\defined("AMP_WORKER")) { // Prevent spawning infinite workers.
return new BlockingFilesystemDriver;
}

return new ParallelFilesystemDriver;
Comment on lines -65 to 54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we prevent infinite ParallelFilesystemDriver instances now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, so tasks can use async file access. @azjezz mentioned this was an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But inside the worker we'll spawn another worker now and so on and never actually handle filesystem calls, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, FileTask which handles file system calls will use the blocking driver for the call. If the worker executes a user task, then another pool of workers might be spawned. I don't see infinite workers being likely (unless someone executes a Task within a Task, but that would be strange), but each worker having its own set of workers would be a possibility. Since by default we limit the number of workers to 8, the resource usage should still be within reason even on systems without eio or uv.

}

Expand Down
Loading