-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Request: Seamless stream and/or async iterable support #110
Comments
maybe duplicate of #108 |
@mcollina This would make it possible to do e.g. await pipeline(piscina.move(src), piscina.transform(workerPath), dst) |
Unfortunately we can't make streams seamlessly transferable/cloneable using move. The only objects that can be made transferable are core objects that extend from BaseObject. It would be possible to create a Transform implementation whose _transform dispatched to Piscina and continued on after each task was done. So something like.. const piscina = new Piscina({ /** ... **/ });
const workerTransform = new MyTransformer(piscina);
await pipeline(src, workerTransform, dst); I have been thinking about adding a new type of Stream in core that complements MessagePort to implement a highly efficient cross-thread stream model. Basically something like: const { MessageDuplex } = require('worker_threads')
const mc = new MessageChannel();
mc.port1.onmessage = async ({data}) => {
for await (const chunk of data)
data.write(transformIt(chunk));
};
const d = new MessageDuplex();
d.on('data', console.log);
mc.port2.postMessage(d);
d.write('hello');
d.end('there'); The idea here is that bottom line is that the limitations of cloning/transfering over MessagePort definitely impose some restrictions. |
Regarding this pattern.... I'm not sure this (an async generator as a worker) is a pattern that could work with the worker pool. Retaining the generator state would be problematic. Not necessarily impossible but extremely complicated. Will have to think about how we could do it and what the semantics would actually be. module.exports = async function * (source) {
for await (const buf of source) {
yield veryExpensiveProcessing(buf)
}
} |
I think all of that could be hidden away in a |
Possibly, but we still hit up against limitations on MessagePort. Lemme stew over it a bit |
#170 illustrates using transferable whatwg readable and writable streams to an individual worker. It doesn't quite do what this issue is suggesting but it definitely gets a bit closer! |
This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days. |
This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days. |
This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days. |
This issue was closed because it has been stalled for 5 days with no activity. |
Would be nice to be able to provide a stream and/or async iterable (of buffers) as argument and have is seamlessly (using transferable) accessible in the worker.
e.g.
The text was updated successfully, but these errors were encountered: