jsernews 1.0.1

<~>

ts 180 days ago.

原文链接:https://blog.safia.rocks/post/167700208169/node-module-deep-dive-writeablestream

又来了!今天我将深入研究另一个 Node 模块。我想我会深入 WritableStream 对象。WritableStream 暴露了允许您写入流的方法。它会公开如 closedrainpipe 等事件以及 corkendwrite 等几个方法。在深入研究 WritableStream 对象之前,快速了解流的概念会有所帮助。

好的!现在我们已经建立了这个基础,是时候深入代码了。我将对此版本WritableStream 进行代码演练。当我浏览该文件时,我很高兴地发现,在整个代码库中有不少注释,用以阐明代码库的不同部分。这些解释性注释使得阅读代码库时的代码解析更加容易。我做的第一件事是检查在 WritableState 对象上定义的属性。代码库中的注释在描述每个属性的功能方面做得非常好,所以我将避免在这里详细介绍它们。通过阅读代码,看起来 WritableState 对象包含有关当前 WritableStream 的信息(这很有道理!)。

WritableState 上定义了单个函数,该函数旨在将 Writable 上的当前缓冲区作为列表获取。

WritableState.prototype.getBuffer = function getBuffer() {
  var current = this.bufferedRequest;
  var out = [];
  while (current) {
    out.push(current);
    current = current.next;
  }
  return out;
};

Writable 流的定义概述了该函数的一些属性。也就是说,程序员可以为 Writable 指定特殊的 writedestroyfinal 函数。

function Writable(options) {
  // Writable ctor is applied to Duplexes, too.
  // `realHasInstance` is necessary because using plain `instanceof`
  // would return false, as no `_writableState` property is attached.

  // Trying to use the custom `instanceof` for Writable here will also break the
  // Node.js LazyTransform implementation, which has a non-trivial getter for
  // `_writableState` that would lead to infinite recursion.
  if (!(realHasInstance.call(Writable, this)) &&
      !(this instanceof Stream.Duplex)) {
    return new Writable(options);
  }

  this._writableState = new WritableState(options, this);

  // legacy.
  this.writable = true;

  if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;
  }

  Stream.call(this);
}

Writable 原型上定义的第一个函数引入了一个相当异想天开的注释。

// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
  this.emit('error', new errors.Error('ERR_STREAM_CANNOT_PIPE'));
};

你无法从一个 Writable 的流中读取数据,所以当然,你想要通过一个 WritableStream 进行管道输出是毫无意义的。

接下来是 write 函数定义。它有三个参数:一个要写入的数据 chunk,数据的 encoding 和一个写完成后要执行的 cb(回调)。

Writable.prototype.write = function(chunk, encoding, cb) {
  var state = this._writableState;
  var ret = false;
  var isBuf = !state.objectMode && Stream._isUint8Array(chunk);

  if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
    chunk = Stream._uint8ArrayToBuffer(chunk);
  }

  if (typeof encoding === 'function') {
    cb = encoding;
    encoding = null;
  }

  if (isBuf)
    encoding = 'buffer';
  else if (!encoding)
    encoding = state.defaultEncoding;

  if (typeof cb !== 'function')
    cb = nop;

  if (state.ended)
    writeAfterEnd(this, cb);
  else if (isBuf || validChunk(this, state, chunk, cb)) {
    state.pendingcb++;
    ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
  }

  return ret;
};

该函数获取 WritableStream 的当前状态,并检查正在写入流的数据是否由 Buffer 或对象组成,并将此区别存储在 isBuf 中。如果要写入流的数据预期为一个 Buffer,但传递的 chunk 不是一个 Buffer,则该函数假定它是一个整数数组并将其转换为一个 Buffer。之后,有一些逻辑确保参数映射正确。换句话说,用户可以不将 encoding 参数传递给该函数。当这种情况发生时,传递的第二个参数实际上是要调用的回调。如果流已结束,该函数将调用 writeAfterEnd 函数,该函数将向用户发出错误,因为您无法写入已关闭的流。

function writeAfterEnd(stream, cb) {
  var er = new errors.Error('ERR_STREAM_WRITE_AFTER_END');
  // TODO: defer error events consistently everywhere, not just the cb
  stream.emit('error', er);
  process.nextTick(cb, er);
}

除此之外,如果数据是缓冲区,该函数将调用 writeOrBuffer 函数。

// if we're already writing something, then just put this
// in the queue, and wait our turn.  Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
  if (!isBuf) {
    var newChunk = decodeChunk(state, chunk, encoding);
    if (chunk !== newChunk) {
      isBuf = true;
      encoding = 'buffer';
      chunk = newChunk;
    }
  }
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  var ret = state.length < state.highWaterMark;
  // we must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = {
      chunk,
      encoding,
      isBuf,
      callback: cb,
      next: null
    };
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

这里有很多事情要做,所以让我们一点一点地介绍一下。函数中的前几行检查传入的 chunk 是否不是缓冲区。如果不是,则使用 decodeChunk 解码 chunk,使用 Buffer.from 函数从字符串中创建 chunk

function decodeChunk(state, chunk, encoding) {
  if (!state.objectMode &&
      state.decodeStrings !== false &&
      typeof chunk === 'string') {
    chunk = Buffer.from(chunk, encoding);
  }
  return chunk;
}

然后通过评估流的长度是否超过 highWaterMark 来检查是否已达到流的容量,并适当设置 needDrain 参数。之后,它将存储在状态中的 lastBufferedRequest 的值更新为作为参数传递的 Buffer,并调用将块写入流的 doWrite 函数。

接下来定义的 corkuncork 函数,定义如下。cork 函数增加了 corked 计数器。corked 计数器实际上是一个布尔值,当它具有非零值时,意味着有需要缓冲的写入。uncork 函数递减 corked 参数并清除缓冲区。

 Writable.prototype.cork = function() {
  var state = this._writableState;

  state.corked++;
};

Writable.prototype.uncork = function() {
  var state = this._writableState;

  if (state.corked) {
    state.corked--;

    if (!state.writing &&
        !state.corked &&
        !state.finished &&
        !state.bufferProcessing &&
        state.bufferedRequest)
      clearBuffer(this, state);
  }
}

接下来是一个简短的函数,允许用户在 WritableStream 上设置默认编码,或者如果用户提供了无效编码,则会引发错误。

Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  // node::ParseEncoding() requires lower case.
  if (typeof encoding === 'string')
    encoding = encoding.toLowerCase();
  if (!Buffer.isEncoding(encoding))
    throw new errors.TypeError('ERR_UNKNOWN_ENCODING', encoding);
  this._writableState.defaultEncoding = encoding;
  return this;
};

最后一个 chunk 需要写入流时调用 end 函数。它通过调用我们上面探讨的 write 函数来写入块,完全释放它,并通过调用 endWriteable 清除 WritableState

Writable.prototype.end = function(chunk, encoding, cb) {
  var state = this._writableState;

  if (typeof chunk === 'function') {
    cb = chunk;
    chunk = null;
    encoding = null;
  } else if (typeof encoding === 'function') {
    cb = encoding;
    encoding = null;
  }

  if (chunk !== null && chunk !== undefined)
    this.write(chunk, encoding);

  // .end() fully uncorks
  if (state.corked) {
    state.corked = 1;
    this.uncork();
  }

  // ignore unnecessary end() calls.
  if (!state.ending && !state.finished)
    endWritable(this, state, cb);
};

就是这样!我仔细阅读了 WritableStream 对象的主要部分。我会承认,在阅读代码之前,我对隐藏起来的细节有点不知所措。逐个仔细阅读代码函数,为我解决了很多问题。