最新消息: 电脑我帮您提供丰富的电脑知识,编程学习,软件下载,win7系统下载。

NodeJS流不等待异步

IT培训 admin 8浏览 0评论

NodeJS流不等待异步

我在测试NodeJS流时遇到了一个问题。在运行stream.pipeline后,我似乎无法让我的项目等待Duplex和Transform流的输出,即使它返回了一个promise。也许我错过了一些东西,但我相信脚本应该在继续之前等待函数返回。我正在努力工作的项目中最重要的部分是:

// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _read(size: number): void {
        var chunk = this.read();
        console.log(`Recieved ${chunk}`);
        this.push(chunk);
    }
    public _write(chunk: Message, encoding: string, 
        callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
        if (chunk.data === null) {
            callback(new Error("Message.Data is null"));
        } else {
            callback();
        }
    }
}

export class SystemStream extends Transform {
    public type: MessageType = MessageType.Global;
    public data: Array<Message> = new Array<Message>();
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string, 
        callback: TransformCallback): void {
        if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
            console.log(`Adding ${chunk}`);
            this.data.push(chunk);
            chunk = new Message(chunk.data, MessageType.Removed, true);
            callback(undefined, chunk); // TODO: Is this correct?
        } else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
            this.data.push(chunk);
            callback(undefined, chunk);
        } else { // Not ours
            callback(undefined, chunk);
        }
    }
}

export class EngineStream extends SystemStream {
    public type: MessageType = MessageType.Engine;
}

export class IOStream extends SystemStream {
    public type: MessageType = MessageType.IO;
}

let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();

let pipeline = promisify(Stream.pipeline);

async function start() {
    console.log("Running Message System");
    console.log("Writing new messages");
    ms.write(new Message("Hello"));
    ms.write(new Message("world!"));
    ms.write(new Message("Engine data", MessageType.Engine));
    ms.write(new Message("IO data", MessageType.IO));
    ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
    ms.end(new Message("Final message in the stream"));
    console.log("Piping data");
    await pipeline(
        ms,
        es,
        io
    );
}

Promise.all([start()]).then(() => {
    console.log(`Engine Messages to parse: ${es.data.toString()}`);
    console.log(`IO Messages to parse: ${io.data.toString()}`);
});

输出应该类似于:

Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data

任何帮助将不胜感激。谢谢!

注意:我在我的其他帐户中发布了这个,而不是我的实际帐户。为复制道歉。

编辑:我最初私有的回购,但已公开,以帮助澄清答案。更多用法可以在feature/inital_system branch上找到。签出时可以使用npm start运行。

编辑:我把我的自定义流放在这里是为了冗长。我认为我的轨道比以前更好,但现在获得一个“空”对象接收到了管道。

回答如下:

正如the documentation所述,stream.pipeline以回调为基础并未返回承诺。

它有自定义的promisified版本,可以使用util.promisify访问:

const pipeline = util.promisify(stream.pipeline);

...

await pipeline(...);

NodeJS流不等待异步

我在测试NodeJS流时遇到了一个问题。在运行stream.pipeline后,我似乎无法让我的项目等待Duplex和Transform流的输出,即使它返回了一个promise。也许我错过了一些东西,但我相信脚本应该在继续之前等待函数返回。我正在努力工作的项目中最重要的部分是:

// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _read(size: number): void {
        var chunk = this.read();
        console.log(`Recieved ${chunk}`);
        this.push(chunk);
    }
    public _write(chunk: Message, encoding: string, 
        callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
        if (chunk.data === null) {
            callback(new Error("Message.Data is null"));
        } else {
            callback();
        }
    }
}

export class SystemStream extends Transform {
    public type: MessageType = MessageType.Global;
    public data: Array<Message> = new Array<Message>();
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string, 
        callback: TransformCallback): void {
        if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
            console.log(`Adding ${chunk}`);
            this.data.push(chunk);
            chunk = new Message(chunk.data, MessageType.Removed, true);
            callback(undefined, chunk); // TODO: Is this correct?
        } else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
            this.data.push(chunk);
            callback(undefined, chunk);
        } else { // Not ours
            callback(undefined, chunk);
        }
    }
}

export class EngineStream extends SystemStream {
    public type: MessageType = MessageType.Engine;
}

export class IOStream extends SystemStream {
    public type: MessageType = MessageType.IO;
}

let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();

let pipeline = promisify(Stream.pipeline);

async function start() {
    console.log("Running Message System");
    console.log("Writing new messages");
    ms.write(new Message("Hello"));
    ms.write(new Message("world!"));
    ms.write(new Message("Engine data", MessageType.Engine));
    ms.write(new Message("IO data", MessageType.IO));
    ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
    ms.end(new Message("Final message in the stream"));
    console.log("Piping data");
    await pipeline(
        ms,
        es,
        io
    );
}

Promise.all([start()]).then(() => {
    console.log(`Engine Messages to parse: ${es.data.toString()}`);
    console.log(`IO Messages to parse: ${io.data.toString()}`);
});

输出应该类似于:

Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data

任何帮助将不胜感激。谢谢!

注意:我在我的其他帐户中发布了这个,而不是我的实际帐户。为复制道歉。

编辑:我最初私有的回购,但已公开,以帮助澄清答案。更多用法可以在feature/inital_system branch上找到。签出时可以使用npm start运行。

编辑:我把我的自定义流放在这里是为了冗长。我认为我的轨道比以前更好,但现在获得一个“空”对象接收到了管道。

回答如下:

正如the documentation所述,stream.pipeline以回调为基础并未返回承诺。

它有自定义的promisified版本,可以使用util.promisify访问:

const pipeline = util.promisify(stream.pipeline);

...

await pipeline(...);

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论