当前位置:   NODEJS > 正文

等待最后一个stream.on('data')事件中的异步函数回调

stream,redis,node.js,json,DevBox,在线流程图,编程,编程问答,程序员,开发者工具,开发工具,json解析,二维码生成,unix时间戳,在线开发工具,前端开发工具,开发人员工具,站长工具

我正在使用fast-csv使用a来迭代CSV文件stream.对于CSV文件的每一行,我想在redis中创建一个作业,我使用kue.解析一行是同步函数.整件事看起来像这样:

var csvStream = fastCsv(_config.csvOptions)
  .on('data', function(data) {
    var stream = this;
    stream.pause();

    var payload = parseRow(data, _config);
    console.log(payload); // the payload is always printed to the console

    var job = kue.create('csv-row', {
        payload: payload
      })
      .save(function(err) {
        if (!err) console.log('Enqueued at ' + job.id);
        else console.log('Redis error ' + JSON.stringify(err));

        stream.resume();
      });
  })
  .on('end', function() {
    callback(); // this ends my parsing
  });

console.log(payload);我的CSV文件的每一行都有简单的显示,但是没有创建作业.即,回调中的输出save都没有被打印,并且作业不在我的redis中.

我假设,因为它是CSV文件的最后一行,流已经发出end,所以kue.create()在进程终止之前无法执行最后一行?

有没有办法停止流,end直到kue完成?



1> 小智..:

您可以使用异步库解决此问题.您可以将以下模式用于任何流.

var AsyncLib = require('async');

var worker = function (payload, cb) {
    //do something with payload and call callback
    return cb();
};

var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);

var stream = //some readable stream;

stream.on('data', function(data) {
    //no need to pause and resume
    var payload = '//some payload';
    streamQueue.push(payload);
})
.on('end', function() {
    //register drain event on end and callback
    streamQueue.drain = function () {
        callback();
    };
});

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/NODEJS/detail/11100
推荐阅读
相关标签
  

闽ICP备14008679号