最新消息: 电脑我帮您提供丰富的电脑知识,编程学习,软件下载,win7系统下载。

在Node.js中处理大型CSV上传

IT培训 admin 6浏览 0评论

在Node.js中处理大型CSV上传

按照先前的帖子:

Node async loop - how to make this code run in sequential order?

...我正在寻找有关处理大型数据上传文件的更广泛建议。

场景:

用户上传一个包含数十万到数百万行的非常大的CSV文件。它使用multer流式传输到端点:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

每行都转换为JSON对象。然后将该对象映射到几个较小的对象,这些对象需要插入到几个不同的表中,分散在各个微服务容器中并由其访问。

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

很明显,我会用这种方法遇到内存限制。这样做最有效的方式是什么?

回答如下:

为了避免内存问题,您需要使用streams处理文件 - 用简单的词语,逐步处理。

您可以使用CSV stream parser的组合来将二进制内容作为CSV行和through2进行流式处理,这是一个允许您控制流的流的实用程序。

过程

过程如下:

  • 您获取数据流
  • 您通过CSV解析器管道它
  • 你通过一个2管道
  • 您保存数据库中的每一行
  • 保存完成后,请致电cb()继续下一个项目。

我不熟悉multer,但这是一个使用来自File的流的示例。

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

示例foo.csv CSV是这样的:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper

笔记

  • 这种方法避免了必须将整个CSV加载到内存中。一旦处理了row,它就会超出范围/变得无法缓存,因此它有资格获得垃圾收集。这就是使这种方法具有内存效率的原因。阅读Streams Handbook以获取有关流的更多信息。
  • 您可能希望每个周期保存/处理超过1行。在这种情况下,将一些rows推入一个数组,处理/保存整个数组,然后调用cb继续下一个块 - 重复该过程。
  • Streams发出可以监听的事件。 end / error事件对于回应操作是成功还是失败特别有用。
  • Express默认使用流 - 我几乎可以肯定你根本不需要multer

在Node.js中处理大型CSV上传

按照先前的帖子:

Node async loop - how to make this code run in sequential order?

...我正在寻找有关处理大型数据上传文件的更广泛建议。

场景:

用户上传一个包含数十万到数百万行的非常大的CSV文件。它使用multer流式传输到端点:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

每行都转换为JSON对象。然后将该对象映射到几个较小的对象,这些对象需要插入到几个不同的表中,分散在各个微服务容器中并由其访问。

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

很明显,我会用这种方法遇到内存限制。这样做最有效的方式是什么?

回答如下:

为了避免内存问题,您需要使用streams处理文件 - 用简单的词语,逐步处理。

您可以使用CSV stream parser的组合来将二进制内容作为CSV行和through2进行流式处理,这是一个允许您控制流的流的实用程序。

过程

过程如下:

  • 您获取数据流
  • 您通过CSV解析器管道它
  • 你通过一个2管道
  • 您保存数据库中的每一行
  • 保存完成后,请致电cb()继续下一个项目。

我不熟悉multer,但这是一个使用来自File的流的示例。

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

示例foo.csv CSV是这样的:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper

笔记

  • 这种方法避免了必须将整个CSV加载到内存中。一旦处理了row,它就会超出范围/变得无法缓存,因此它有资格获得垃圾收集。这就是使这种方法具有内存效率的原因。阅读Streams Handbook以获取有关流的更多信息。
  • 您可能希望每个周期保存/处理超过1行。在这种情况下,将一些rows推入一个数组,处理/保存整个数组,然后调用cb继续下一个块 - 重复该过程。
  • Streams发出可以监听的事件。 end / error事件对于回应操作是成功还是失败特别有用。
  • Express默认使用流 - 我几乎可以肯定你根本不需要multer

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论