当前位置:   article > 正文

nodejs中流(stream)的理解之可读流

可读流data事件 不写缓冲区

1.流是什么?
node.js官网文档中是这么说的 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。 本人最近研究node,特意记下,分享一下。

言归正传

1. Node.js 中有四种基本的流类型:
· Readable - 可读的流 (例如 fs.createReadStream()).
· Writable - 可写的流 (例如 fs.createWriteStream()).
· Duplex - 可读写的流 (例如 net.Socket).
· Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

那么今天的主角就是可读流(createReadStream),后续会陆续介绍其他3种流

2.可读流(createReadStream)
实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

2.1 创建可读流

  1. var rs = fs.createReadStream(path,[options]);
  2. ps:如果指定utf8编码highWaterMark要大于3个字节
  3. 复制代码

2.2 监听data事件

  1. 流切换到流动模式,数据会被尽可能快的读出
  2. rs.on('data', function (data) {
  3. console.log(data);
  4. });
  5. 复制代码

2.3 监听end事件

  1. 该事件会在读完数据后被触发
  2. rs.on('end', function () {
  3. console.log('读取完成');
  4. })
  5. 复制代码

2.4 监听error事件

  1. rs.on('error', function (err) {
  2. console.log(err);
  3. });
  4. 复制代码

2.5 监听open事件

  1. rs.on('open', function () {
  2. console.log(err);
  3. });
  4. 复制代码

2.6 监听close事件

  1. rs.on('close', function () {
  2. console.log(err);
  3. });
  4. 复制代码

2.7 设置编码

  1. 与指定{encoding:'utf8'}效果相同,设置编码
  2. rs.setEncoding('utf8');
  3. 复制代码

2.8 暂停和恢复触发data

  1. 通过pause()方法和resume()方法
  2. rs.on('data', function (data) {
  3. rs.pause();
  4. console.log(data);
  5. });
  6. setTimeout(function () {
  7. rs.resume();
  8. },2000);
  9. 复制代码

3. 流中的数据有两种模式,二进制模式和对象模式

· 二进制模式, 每个分块都是buffer或者string对象 · 对象模式, 流内部处理的是一系列普通对象.

4.可读流的两种模式

可读流事实上工作在下面两种模式之一:flowing 和 paused 在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。 在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。 所有初始工作模式为 paused 的 Readable 流 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况 下面通过代码简单模拟下实现的逻辑: 如图:

  1. let fs = require('fs');
  2. let EventEmitter = require('events');
  3. class ReadStream extends EventEmitter {//这里用到的是es6的原型继承
  4. constructor(path, options) {
  5. super(path, options);
  6. this.path = path;//读取文件的路径
  7. this.highWaterMark = options.highWaterMark || 64 * 1024;//读取的最高水位线
  8. this.buffer = Buffer.alloc(this.highWaterMark);//临时存放数据的容器
  9. this.flags = options.flags || 'r';//打开文件要做的操作,默认为'r'
  10. this.encoding = options.encoding;//编码
  11. this.mode = options.mode || 0o666;//读取文件的权限
  12. this.start = options.start || 0;//开始读取的索引位置
  13. this.end = options.end;//结束读取的索引位置(包括结束位置)
  14. this.pos = this.start;
  15. this.autoClose = options.autoClose || true;当流读完之后自动关闭文件
  16. this.bytesRead = 0;//实际读到的字节数
  17. this.closed = false;
  18. this.flowing;初始状态为暂停模式 为true时为流动模式, 为false时为暂停模式
  19. this.needReadable = false;//是否需要发送readable事件
  20. this.length = 0;//缓存区的总长度
  21. this.buffers = [];//这才是真正的缓存区
  22. this.on('end', function () {
  23. if (this.autoClose) {
  24. this.destroy();
  25. }
  26. });
  27. //当给这个实例添加了任意的监听函数时会触发newListener
  28. this.on('newListener', (type) => {
  29. //如果监听了data事件,流会自动切换的流动模式
  30. //流动模式不缓存,直接发射,然后读取下次的数据。如果你用流动模式而且没有消费,数据就白白丢失了
  31. if (type == 'data') {
  32. this.flowing = true;
  33. this.read();
  34. }
  35. if (type == 'readable') {
  36. this.read(0);
  37. }
  38. });
  39. this.open();
  40. }
  41. //打开文件
  42. open() {
  43. fs.open(this.path, this.flags, this.mode, (err, fd) => {
  44. if (err) {
  45. if (this.autoClose) {
  46. this.destroy();
  47. return this.emit('error', err);
  48. }
  49. }
  50. this.fd = fd;
  51. this.emit('open');
  52. });
  53. }
  54. read(n) {
  55. if (typeof this.fd != 'number') {
  56. return this.once('open', () => this.read());
  57. }
  58. n = parseInt(n, 10);
  59. if (n != n) {
  60. n = this.length;
  61. }
  62. if (this.length == 0)//重新填充
  63. this.needReadable = true;
  64. let ret;
  65. if (0 < n < this.length) {//缓存区的数据足够用,并且要读取的字节大于0
  66. ret = Buffer.alloc(n);
  67. let b;
  68. let index = 0;
  69. while (null != (b = this.buffers.shift())) {
  70. for (let i = 0; i < b.length; i++) {
  71. ret[index++] = b[i];
  72. if (index == ret.length) {//填充完毕
  73. this.length -= n;
  74. b = b.slice(i + 1);
  75. this.buffers.unshift(b);
  76. break;
  77. }
  78. }
  79. }
  80. if (this.encoding) ret = ret.toString(this.encoding);
  81. }
  82. //底层读数据的方法
  83. let _read = () => {
  84. let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
  85. fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
  86. if (err) {
  87. return
  88. }
  89. let data;
  90. if (bytesRead > 0) {//实际读到的字节数
  91. data = this.buffer.slice(0, bytesRead);
  92. this.pos += bytesRead;
  93. //让缓存区的数量加实际读到的字节数
  94. this.length += bytesRead;
  95. if (this.end && this.pos > this.end) {
  96. if (this.needReadable) {
  97. this.emit('readable');
  98. }
  99. this.emit('end');
  100. } else {
  101. this.buffers.push(data);
  102. if (this.needReadable) {
  103. this.emit('readable');
  104. this.needReadable = false;
  105. }
  106. }
  107. } else {
  108. if (this.needReadable) {
  109. this.emit('readable');
  110. }
  111. return this.emit('end');
  112. }
  113. })
  114. }
  115. if (this.length == 0 || (this.length < this.highWaterMark)) {
  116. _read(0);
  117. }
  118. return ret;
  119. }
  120. //销毁流
  121. destroy() {
  122. fs.close(this.fd, (err) => {
  123. this.emit('close');
  124. });
  125. }
  126. //暂停
  127. pause() {
  128. this.flowing = false;
  129. }
  130. //恢复
  131. resume() {
  132. this.flowing = true;
  133. this.read();
  134. }
  135. pipe(dest) {
  136. this.on('data', (data) => {
  137. let flag = dest.write(data);
  138. if (!flag) this.pause();
  139. });
  140. dest.on('drain', () => {
  141. this.resume();
  142. });
  143. this.on('end', () => {
  144. dest.end();
  145. });
  146. }
  147. }
  148. module.exports = ReadStream;
  149. 复制代码

readable时其实会立刻填充缓存区,当你消费掉一个字节后,一旦发现缓冲区的字节数小于最高水位线了,则会立马读到最高水位线highWaterMark(n)个字节填充到缓存区 read(n)是可读流的核心方法,那么方法中传参数跟不传,传几个是有讲究的: n = undefined :即n不传参数,如果处于流动模式,并且缓存区大小不为空,则返回缓存区第一个buffer的长度,否则读取整个缓存 如果读到了数据没有返回值,但是会发射data事件,数据也能取到,也就是用来清空缓存区。也不需要向底层_read()请求数据 n=0:只是填充缓存区,并不真正读取 0 < n < highWaterMark(n):可读流会从缓存区取出对应大小的数据,并且作为 read 方法返回值返回。同时判断此时缓存区大小是否小于highWaterMark(n),如果小于,那么将执行底层的 _read 方法,从数据源中读取highWaterMark(n)大小的数据填充到缓存区内。 n>highWaterMark(n):如果缓存中数据不够,便会调用_read方法去底层取数据。

尾声:

碍于篇幅和时间限制,就不再继续介绍下去。本文仅介绍了可读流的基本使用方法,如果觉得有一些地方觉得不太深入,可以去看一下官方文档,官方文档上面有更加全面和详细的介绍。

参考资料:

stream | Node.js API 文档nodejs.cn

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/811341?site
推荐阅读
相关标签
  

闽ICP备14008679号