九、原生 Node.js 流
原文:
exploringjs.com/nodejs-shell-scripting/ch_nodejs-streams.html 译者:飞龙
协议:CC BY-NC-SA 4.0
-
9.1?总结:异步迭代和异步生成器
-
9.2?流
-
9.2.1?管道
-
9.2.2?文本编码
-
9.2.3?辅助函数:
readableToString() -
9.2.4?一些初步说明
-
-
9.3?可读流
-
9.3.1?创建可读流
-
9.3.2?通过
for-await-of 从可读流中读取块 -
9.3.3?通过模块
'node:readlines' 从可读流中读取行
-
-
9.4?通过异步生成器转换可读流(ch_nodejs-streams.html#transforming-Readable-via-async-generator)
- 9.4.1?从异步可迭代对象中的块转换为编号行
-
9.5?可写流
-
9.5.1?为文件创建可写流
-
9.5.2?写入可写流
-
-
9.6?快速参考:与流相关的功能(ch_nodejs-streams.html#quick-reference-stream-related-functionality)
-
9.7?进一步阅读和本章的来源
本章是对 Node 的原生流的介绍。它们支持异步迭代,这使它们更容易使用,这也是我们在本章中主要使用的。
请注意,跨平台的web 流在§10“在 Node.js 上使用 web 流”中有所涵盖。我们在本书中主要使用这些。因此,如果您愿意,可以跳过当前章节。
9.1?总结:异步迭代和异步生成器
异步迭代是一种异步检索数据容器内容的协议(意味着当前的“任务”在检索项目之前可能会暂停)。
异步生成器有助于异步迭代。例如,这是一个异步生成器函数:
/** * @returns an asynchronous iterable */ async function* asyncGenerator(asyncIterable) { for await (const item of asyncIterable) { // input if (···) { yield '> ' + item; // output } } }
-
for-await-of 循环遍历输入的asyncIterable 。这个循环也适用于普通的异步函数。 -
yield 将值提供给此生成器返回的异步可迭代对象。
在本章的其余部分,请仔细注意函数是异步函数还是异步生成器函数:
/** @returns a Promise */ async function asyncFunction() { /*···*/ } /** @returns an async iterable */ async function* asyncGeneratorFunction() { /*···*/ }
9.2?流
流是一种模式,其核心思想是“分而治之”大量数据:如果我们将其分割成较小的部分并一次处理一部分,我们就可以处理它。
Node.js 支持几种流,例如:
-
可读流是我们可以从中读取数据的流。换句话说,它们是数据的来源。一个例子是可读文件流,它允许我们读取文件的内容。
-
可写流是我们可以写入数据的流。换句话说,它们是数据的接收端。一个例子是可写文件流,它允许我们向文件写入数据。
-
转换流既可读又可写。作为可写流,它接收数据块,转换(更改或丢弃)它们,然后将它们作为可读流输出。
9.2.1?管道
为了在多个步骤中处理流数据,我们可以管道(连接)流:
-
输入通过可读流接收。
-
每个处理步骤都是通过转换流执行的。
-
对于最后的处理步骤,我们有两个选项:
-
我们可以将最近的可读流中的数据写入可写流。也就是说,可写流是我们管道的最后一个元素。
-
我们可以以其他方式处理最近的可读流中的数据。
-
部分(2)是可选的。
9.2.2 文本编码
创建文本流时,最好始终指定编码:
-
Node.js 文档中有支持的编码及其默认拼写的列表 - 例如:
-
‘utf8’
-
‘utf16le’
-
‘base64’
-
-
也允许一些不同的拼写。您可以使用
Buffer.isEncoding() 来检查哪些是:> buffer.Buffer.isEncoding('utf8') true > buffer.Buffer.isEncoding('utf-8') true > buffer.Buffer.isEncoding('UTF-8') true > buffer.Buffer.isEncoding('UTF:8') false
编码的默认值是
9.2.3 辅助函数:readableToString()
我们偶尔会使用以下辅助函数。您不需要理解它的工作原理,只需(大致)了解它的作用。
import * as stream from 'stream'; /** * Reads all the text in a readable stream and returns it as a string, * via a Promise. * @param {stream.Readable} readable */ function readableToString(readable) { return new Promise((resolve, reject) => { let data = ''; readable.on('data', function (chunk) { data += chunk; }); readable.on('end', function () { resolve(data); }); readable.on('error', function (err) { reject(err); }); }); }
此函数是通过基于事件的 API 实现的。稍后我们将看到一个更简单的方法 - 通过异步迭代。
9.2.4 一些初步说明
-
在本章中,我们只会使用文本流。
-
在示例中,我们偶尔会遇到
await 被用于顶层。在这种情况下,我们假设我们在模块内或在异步函数的主体内。 -
每当有换行符时,我们都支持:
-
Unix:
' (LF)
' -
Windows:
' (CR LF)当前平台的换行符可以通过模块
'os 中的常量EOL 访问。
-
9.3 可读流
9.3.1 创建可读流
9.3.1.1 从文件创建可读流
我们可以使用
import * as fs from 'fs'; const readableStream = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); assert.equal( await readableToString(readableStream), 'This is a test! ');
9.3.1.2 Readable.from() : 从可迭代对象创建可读流
静态方法
import * as stream from 'stream'; function* gen() { yield 'One line '; yield 'Another line '; } const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'}); assert.equal( await readableToString(readableStream), 'One line Another line ');
9.3.1.2.1 从字符串创建可读流
import {Readable} from 'stream'; const str = 'Some text!'; const readable = Readable.from(str, {encoding: 'utf8'}); assert.equal( await readableToString(readable), 'Some text!');
目前,
9.3.2 通过for-await-of 从可读流中读取块
每个可读流都是异步可迭代的,这意味着我们可以使用
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test! '
9.3.2.1 在字符串中收集可读流的内容
以下函数是本章开头所见函数的简化重新实现。
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
请注意,在这种情况下,我们必须使用异步函数,因为我们想要返回一个 Promise。
9.3.3 通过模块'node:readlines' 从可读流中读取行
内置模块
import * as fs from 'node:fs'; import * as readline from 'node:readline/promises'; const filePath = process.argv[2]; // first command line argument const rl = readline.createInterface({ input: fs.createReadStream(filePath, {encoding: 'utf-8'}), }); for await (const line of rl) { console.log('>', line); } rl.close();
9.4 通过异步生成器转换可读流
异步迭代提供了一个优雅的替代方案,用于在多个步骤中处理流式数据的转换流:
-
输入是一个可读流。
-
第一个转换是通过一个异步生成器执行的,该生成器遍历可读流并在适当时产生。
-
可选地,我们可以通过使用更多的异步生成器来进一步转换。
-
最后,我们有几种处理最后一个生成器返回的异步可迭代对象的选项:
-
我们可以通过
Readable.from() 将其转换为可读流(稍后可以传输到可写流)。 -
我们可以使用异步函数来处理它。
-
等等。
-
总之,这些是这样的处理管道的组成部分:
可读的
→ 第一个异步生成器 [→ … → 最后一个异步生成器]
→ 可读或异步函数
9.4.1 从块到异步可迭代对象中的编号行
在下一个示例中,我们将看到一个刚刚解释过的处理管道的示例。
import {Readable} from 'stream'; /** * @param chunkIterable An asynchronous or synchronous iterable * over “chunks” (arbitrary strings) * @returns An asynchronous iterable over “lines” * (strings with at most one newline that always appears at the end) */ async function* chunksToLines(chunkIterable) { let previous = ''; for await (const chunk of chunkIterable) { let startSearch = previous.length; previous += chunk; while (true) { // Works for EOL === ' ' and EOL === ' ' const eolIndex = previous.indexOf(' ', startSearch); if (eolIndex < 0) break; // Line includes the EOL const line = previous.slice(0, eolIndex+1); yield line; previous = previous.slice(eolIndex+1); startSearch = 0; } } if (previous.length > 0) { yield previous; } } async function* numberLines(lineIterable) { let lineNumber = 1; for await (const line of lineIterable) { yield lineNumber + ' ' + line; lineNumber++; } } async function logLines(lineIterable) { for await (const line of lineIterable) { console.log(line); } } const chunks = Readable.from( 'Text with multiple lines. ', {encoding: 'utf8'}); await logLines(numberLines(chunksToLines(chunks))); // (A) // Output: // '1 Text with ' // '2 multiple ' // '3 lines. '
处理管道在 A 行设置。步骤是:
-
chunksToLines() : 从具有块的异步可迭代对象转换为具有行的异步可迭代对象。 -
numberLines() : 从具有行的异步可迭代对象转换为具有编号行的异步可迭代对象。 -
logLines() : 记录异步可迭代对象中的项目。
观察:
-
chunksToLines() 和numberLines() 的输入和输出都是异步可迭代对象。这就是为什么它们是异步生成器(由async 和* 指示)。 -
logLines() 的输入是异步可迭代对象。这就是为什么它是一个异步函数(由async 指示)。
9.5 可写流
9.5.1 创建文件的可写流
我们可以使用
const writableStream = fs.createWriteStream( 'tmp/log.txt', {encoding: 'utf8'});
9.5.2 向可写流写入数据
在本节中,我们将探讨向可写流写入数据的方法:
-
通过其方法
.write() 直接向可写流写入数据。 -
使用模块
stream 中的函数pipeline() 将可读流传输到可写流。
为了演示这些方法,我们使用它们来实现相同的函数
可读流的
9.5.2.1 writable.write(chunk)
在向流中写入数据时,有两种基于回调的机制可以帮助我们:
-
事件
'drain' 表示背压已经解除。 -
函数
finished() 在流:-
不再可读或可写
-
已经遇到错误或过早关闭事件
-
在下一个示例中,我们将这些机制转换为 Promise,以便我们可以通过异步函数使用它们:
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text. '], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text. ');
我们使用了以下两种模式:
-
在处理背压的情况下向可写流写入数据(B 行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
-
关闭可写流并等待写入完成(C 行):
writable.end(); await finished(writable);
9.5.2.2 通过stream.pipeline() 将可读流传输到可写流
在 A 行,我们使用
import * as stream from 'stream'; import * as fs from 'fs'; const pipeline = util.promisify(stream.pipeline); async function writeIterableToFile(iterable, filePath) { const readable = stream.Readable.from( iterable, {encoding: 'utf8'}); const writable = fs.createWriteStream(filePath); await pipeline(readable, writable); // (A) } await writeIterableToFile( ['One', ' line of text. '], 'tmp/log.txt'); // ···
9.5.2.3 不推荐:readable.pipe(destination)
可读的
9.6 快速参考:与流相关的功能
模块
-
const EOL: string (自 0.7.8 起)(https://nodejs.org/api/os.html#os_os_eol)包含当前平台使用的行尾字符序列。
模块
-
Buffer.isEncoding(encoding: string): boolean (自 0.9.1 起)(https://nodejs.org/api/buffer.html#buffer_class_method_buffer_isencoding_encoding)如果
encoding 正确命名了受支持的 Node.js 文本编码之一,则返回true 。支持的编码包括:-
'utf8' -
'utf16le' -
'ascii' -
'latin1 -
'base64' -
'hex' (每个字节表示为两个十六进制字符)
-
模块
-
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (自 10.0.0 起)(https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator)可读流是异步可迭代的。例如,您可以在异步函数或异步生成器中使用
for-await-of 循环来迭代它们。 -
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void> (自 10.0.0 起)当读取/写入完成或出现错误时,返回的 Promise 将被解决。
此 promisified 版本的创建方式如下:
const finished = util.promisify(stream.finished);
-
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void> (自 10.0.0 起)流之间的管道。当管道完成或出现错误时,返回的 Promise 将被解决。
此 promisified 版本的创建方式如下:
const pipeline = util.promisify(stream.pipeline);
-
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (自 12.3.0 起)将可迭代对象转换为可读流。
interface ReadableOptions { highWaterMark?: number; encoding?: string; objectMode?: boolean; read?(this: Readable, size: number): void; destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void; autoDestroy?: boolean; }
这些选项与
Readable 构造函数的选项相同,并在此处有文档记录。
模块
-
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (自 2.3.0 起)创建可读流。还有更多选项可用。
-
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream (自 2.3.0 起)使用
.flags 选项,您可以指定是要写入还是追加,以及文件存在或不存在时会发生什么。还有更多选项可用。
本节中的静态类型信息基于Definitely Typed。
9.7 进一步阅读和本章的来源
-
Node.js 文档中的“流与异步生成器和异步迭代器兼容性”章节
-
“JavaScript for impatient programmers”中的“异步函数”章节
-
“JavaScript for impatient programmers”中的“异步迭代”章节
评论
十、在 Node.js 上使用 web 流
原文:
exploringjs.com/nodejs-shell-scripting/ch_web-streams.html 译者:飞龙
协议:CC BY-NC-SA 4.0
-
10.1?什么是 web 流?
-
10.1.1?流的种类
-
10.1.2?管道链
-
10.1.3?背压
-
10.1.4?Node.js 中对 web 流的支持
-
-
10.2?从 ReadableStreams 读取
-
10.2.1?通过 Readers 消费 ReadableStreams
-
10.2.2?通过异步迭代消费 ReadableStreams
-
10.2.3?将 ReadableStreams 管道到 WritableStreams
-
-
10.3?通过包装将数据源转换为 ReadableStreams
-
10.3.1?实现底层源的第一个示例
-
10.3.2?使用 ReadableStream 包装推送源或拉取源
-
-
10.4?写入 WritableStreams
-
10.4.1?通过 Writers 写入 WritableStreams
-
10.4.2?管道到 WritableStreams
-
-
10.5?通过包装将数据汇转换为 WritableStreams
-
10.5.1?示例:跟踪 ReadableStream
-
10.5.2?示例:收集写入字符串的 WriteStream 块
-
-
10.6?使用 TransformStreams
- 10.6.1?标准 TransformStreams
-
10.7?实现自定义 TransformStreams
-
10.7.1?示例:将任意块的流转换为行流
-
10.7.2?提示:异步生成器也非常适合转换流
-
-
10.8?更深入地了解背压
-
10.8.1?信号背压
-
10.8.2?对背压的反应
-
-
10.9?字节流
-
10.9.1?可读字节流
-
10.9.2?示例:填充随机数据的无限可读字节流
-
10.9.3?示例:压缩可读字节流
-
10.9.4?示例:通过
fetch() 读取网页
-
-
10.10?Node.js 特定的辅助函数
-
10.11?进一步阅读
Web 流 是一种标准的 流,现在在所有主要的 web 平台上都得到支持:web 浏览器、Node.js 和 Deno。(流是一种从各种来源顺序读取和写入数据的抽象,例如文件、托管在服务器上的数据等。)
例如,全局函数
本章涵盖了 Node.js 上的 web 流,但我们所学的大部分内容都适用于支持它们的所有 web 平台。
10.1 什么是网络流?
让我们首先概述一下网络流的一些基本知识。之后,我们将快速转移到示例。
流是一种用于访问数据的数据结构,例如:
-
文件
-
托管在 Web 服务器上的数据
-
等等。
它们的两个好处是:
-
我们可以处理大量数据,因为流允许我们将它们分割成较小的片段(所谓的chunks),我们可以一次处理一个。
-
我们可以在处理不同数据时使用相同的数据结构,流。这样可以更容易地重用代码。
Web streams(“web”通常被省略)是一个相对较新的标准,起源于 Web 浏览器,但现在也受到 Node.js 和 Deno 的支持(如此MDN 兼容性表所示)。
在网络流中,chunks 通常是:
-
文本流:字符串
-
二进制流:Uint8Arrays(一种 TypedArray)
10.1.1 流的种类
有三种主要类型的网络流:
-
一个 ReadableStream 用于从source读取数据。执行此操作的代码称为consumer。
-
一个 WritableStream 用于向sink写入数据。执行此操作的代码称为producer。
-
TransformStream 由两个流组成:
-
它从其writable side接收输入,即 WritableStream。
-
它将输出发送到其readable side,即 ReadableStream。
这个想法是通过“管道传输”TransformStream 来转换数据。也就是说,我们将数据写入可写端,并从可读端读取转换后的数据。以下 TransformStreams 内置在大多数 JavaScript 平台中(稍后会详细介绍):
-
因为 JavaScript 字符串是 UTF-16 编码的,所以在 JavaScript 中,UTF-8 编码的数据被视为二进制数据。
TextDecoderStream 将这样的数据转换为字符串。 -
TextEncoderStream 将 JavaScript 字符串转换为 UTF-8 数据。 -
CompressionStream 将二进制数据压缩为 GZIP 和其他压缩格式。 -
DecompressionStream 从 GZIP 和其他压缩格式中解压缩二进制数据。
-
ReadableStreams,WritableStreams 和 TransformStreams 可用于传输文本或二进制数据。在本章中,我们将主要进行前者。 字节流用于二进制数据,在最后简要提到。
10.1.2 管道链路
Piping是一种操作,它让我们将一个 ReadableStream 连接到一个 WritableStream:只要 ReadableStream 产生数据,此操作就会读取该数据并将其写入 WritableStream。如果我们连接了两个流,我们就可以方便地将数据从一个位置传输到另一个位置(例如复制文件)。但是,我们也可以连接多于两个流,并获得可以以各种方式处理数据的管道链路。这是一个管道链路的例子:
-
它以一个 ReadableStream 开始。
-
接下来是一个或多个 TransformStreams。
-
链路以 WritableStream 结束。
通过将前者连接到后者的可写端,将一个 ReadableStream 连接到 TransformStream。类似地,通过将前者的可读端连接到后者的可写端,将一个 TransformStream 连接到另一个 TransformStream。并且通过将前者的可读端连接到后者的可写端,将一个 TransformStream 连接到一个 WritableStream。
10.1.3 背压
管道链路中的一个问题是,成员可能会收到比它目前能处理的更多数据。 背压是解决这个问题的一种技术:它使数据的接收者能够告诉发送者应该暂时停止发送数据,以便接收者不会被压倒。
另一种看待背压的方式是作为一个信号,通过管道链路向后传播,从被压倒的成员到链路的开始。例如,考虑以下管道链路:
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
这是背压通过这个链路传播的方式:
-
最初,WriteableStream 发出信号,表明它暂时无法处理更多数据。
-
管道停止从 TransformStream 中读取。
-
输入在 TransformStream 中积累(被缓冲)。
-
TransformStream 发出满的信号。
-
管道停止从 ReadableStream 中读取。
我们已经到达管道链的开头。因此,在 ReadableStream 中没有数据积累(也被缓冲),WritableStream 有时间恢复。一旦它恢复,它会发出信号表明它已准备好再次接收数据。该信号也会通过链返回,直到它到达 ReadableStream,数据处理恢复。
在这第一次对背压的探讨中,为了让事情更容易理解,省略了一些细节。这些将在以后进行讨论。
10.1.4?Node.js 中对 web 流的支持
在 Node.js 中,Web 流可以从两个来源获得:
-
来自模块
'node:stream/web' -
通过全局变量(就像在 Web 浏览器中)
目前,只有一个 API 在 Node.js 中直接支持 web 流 – Fetch API:
const response = await fetch('https://example.com'); const readableStream = response.body;
对于其他事情,我们需要使用模块
-
Node.js 的 Readable 可以转换为 WritableStreams,反之亦然:
-
Readable.toWeb(nodeReadable) -
Readable.fromWeb(webReadableStream, options?)
-
-
Node.js 的 Writable 可以转换为 ReadableStreams,反之亦然:
-
Writable.toWeb(nodeWritable) -
Writable.fromWeb(webWritableStream, options?)
-
-
Node.js 的 Duplex 可以转换为 TransformStreams,反之亦然:
-
Duplex.toWeb(nodeDuplex) -
Duplex.fromWeb(webTransformStream, options?)
-
还有一个 API 部分支持 web 流:FileHandles 有方法
10.2?从 ReadableStreams 中读取
ReadableStreams 让我们从各种来源读取数据块。它们具有以下类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次被解释):
interface ReadableStream<TChunk> { getReader(): ReadableStreamDefaultReader<TChunk>; readonly locked: boolean; [Symbol.asyncIterator](): AsyncIterator<TChunk>; cancel(reason?: any): Promise<void>; pipeTo( destination: WritableStream<TChunk>, options?: StreamPipeOptions ): Promise<void>; pipeThrough<TChunk2>( transform: ReadableWritablePair<TChunk2, TChunk>, options?: StreamPipeOptions ): ReadableStream<TChunk2>; // Not used in this chapter: tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>]; } interface StreamPipeOptions { signal?: AbortSignal; preventClose?: boolean; preventAbort?: boolean; preventCancel?: boolean; }
这些属性的解释:
-
.getReader() 返回一个 Reader – 通过它我们可以从 ReadableStream 中读取。ReadableStreams 返回 Readers 类似于可迭代对象返回迭代器。 -
.locked : 一次只能有一个活动的 Reader 读取 ReadableStream。当一个 Reader 正在使用时,ReadableStream 被锁定,无法调用.getReader() 。 -
[Symbol.asyncIterator](https://exploringjs.com/impatient-js/ch_async-iteration.html) : 这个方法使得 ReadableStreams 可以异步迭代。目前只在一些平台上实现。 -
.cancel(reason) 取消流,因为消费者对它不再感兴趣。reason 被传递给 ReadableStream 的底层源的.cancel() 方法(稍后会详细介绍)。返回的 Promise 在此操作完成时实现。 -
.pipeTo() 将其 ReadableStream 的内容传送到 WritableStream。返回的 Promise 在此操作完成时实现。.pipeTo() 确保背压、关闭、错误等都正确地通过管道链传播。我们可以通过它的第二个参数指定选项:-
.signal 让我们向这个方法传递一个 AbortSignal,这使我们能够通过 AbortController 中止管道传输。 -
.preventClose : 如果为true ,它会阻止在 ReadableStream 关闭时关闭 WritableStream。当我们想要将多个 ReadableStream 管道到同一个 WritableStream 时,这是有用的。 -
其余选项超出了本章的范围。它们在web 流规范中有文档记录。
-
-
.pipeThrough() 将其 ReadableStream 连接到一个 ReadableWritablePair(大致是一个 TransformStream,稍后会详细介绍)。它返回生成的 ReadableStream(即 ReadableWritablePair 的可读端)。
以下小节涵盖了三种消费 ReadableStreams 的方式:
-
通过 Readers 进行读取
-
通过异步迭代进行读取
-
将 ReadableStreams 连接到 WritableStreams
10.2.1 通过 Reader 消费 ReadableStreams
我们可以使用Readers从 ReadableStreams 中读取数据。它们具有以下类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次被解释):
interface ReadableStreamGenericReader { readonly closed: Promise<undefined>; cancel(reason?: any): Promise<void>; } interface ReadableStreamDefaultReader<TChunk> extends ReadableStreamGenericReader { releaseLock(): void; read(): Promise<ReadableStreamReadResult<TChunk>>; } interface ReadableStreamReadResult<TChunk> { done: boolean; value: TChunk | undefined; }
这些属性的解释:
-
.closed :此 Promise 在流关闭后被满足。如果流出现错误或者在流关闭之前 Reader 的锁被释放,它将被拒绝。 -
.cancel() :在活动的 Reader 中,此方法取消关联的 ReadableStream。 -
.releaseLock() 停用 Reader 并解锁其流。 -
.read() 返回一个 Promise,用于 ReadableStreamReadResult(一个包装的块),它有两个属性:-
.done 是一个布尔值,只要可以读取块,就为false ,在最后一个块之后为true 。 -
.value 是块(或在最后一个块之后是undefined )。
-
如果您了解迭代的工作原理,ReadableStreamReadResult 可能会很熟悉:ReadableStreams 类似于可迭代对象,Readers 类似于迭代器,而 ReadableStreamReadResults 类似于迭代器方法
以下代码演示了使用 Readers 的协议:
const reader = readableStream.getReader(); // (A) assert.equal(readableStream.locked, true); // (B) try { while (true) { const {done, value: chunk} = await reader.read(); // (C) if (done) break; // Use `chunk` } } finally { reader.releaseLock(); // (D) }
**获取 Reader。**我们不能直接从
读取块。
10.2.1.1 示例:通过 ReadableStream 读取文件
在下面的示例中,我们从文本文件
import * as fs from 'node:fs'; import {Readable} from 'node:stream'; const nodeReadable = fs.createReadStream( 'data.txt', {encoding: 'utf-8'}); const webReadableStream = Readable.toWeb(nodeReadable); // (A) const reader = webReadableStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) break; console.log(value); } } finally { reader.releaseLock(); } // Output: // 'Content of text file '
我们将 Node.js Readable 转换为 web ReadableStream(行 A)。然后我们使用先前解释的协议来读取块。
10.2.1.2 示例:使用 ReadableStream 内容组装字符串
在下一个示例中,我们将所有 ReadableStream 的块连接成一个字符串并返回它:
/** * Returns a string with the contents of `readableStream`. */ async function readableStreamToString(readableStream) { const reader = readableStream.getReader(); try { let result = ''; while (true) { const {done, value} = await reader.read(); if (done) { return result; // (A) } result += value; } } finally { reader.releaseLock(); // (B) } }
方便的是,
10.2.2 通过异步迭代消费 ReadableStreams
ReadableStreams 也可以通过异步迭代进行消费:
const iterator = readableStream[Symbol.asyncIterator](); let exhaustive = false; try { while (true) { let chunk; ({done: exhaustive, value: chunk} = await iterator.next()); if (exhaustive) break; console.log(chunk); } } finally { // If the loop was terminated before we could iterate exhaustively // (via an exception or `return`), we must call `iterator.return()`. // Check if that was the case. if (!exhaustive) { iterator.return(); } }
值得庆幸的是,
for await (const chunk of readableStream) { console.log(chunk); }
10.2.2.1 示例:使用异步迭代读取流
让我们重新尝试从文件中读取文本。这次,我们使用异步迭代而不是 Reader:
import * as fs from 'node:fs'; import {Readable} from 'node:stream'; const nodeReadable = fs.createReadStream( 'text-file.txt', {encoding: 'utf-8'}); const webReadableStream = Readable.toWeb(nodeReadable); for await (const chunk of webReadableStream) { console.log(chunk); } // Output: // 'Content of text file'
10.2.2.2 示例:使用 ReadableStream 内容组装字符串
我们以前使用 Reader 来组装一个包含 ReadableStream 内容的字符串。有了异步迭代,代码变得更简单了:
/** * Returns a string with the contents of `readableStream`. */ async function readableStreamToString2(readableStream) { let result = ''; for await (const chunk of readableStream) { result += chunk; } return result; }
10.2.2.3 注意事项:浏览器不支持对 ReadableStreams 进行异步迭代
目前,Node.js 和 Deno 支持对 ReadableStreams 进行异步迭代,但 Web 浏览器不支持:有一个 GitHub 问题链接到错误报告。
鉴于尚不完全清楚浏览器将如何支持异步迭代,包装比填充更安全。以下代码基于Chromium bug 报告中的建议:
async function* getAsyncIterableFor(readableStream) { const reader = readableStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) return; yield value; } } finally { reader.releaseLock(); } }
10.2.3 将可读流导入可写流
可读流有两种管道方法:
-
readableStream.pipeTo(writeableStream) 同步返回一个 Promisep 。它异步读取readableStream 的所有块,并将它们写入writableStream 。完成后,它会实现p 。当我们探索可写流时,我们将看到
.pipeTo() 的示例,因为它提供了一种方便的方式将数据传输到其中。 -
readableStream.pipeThrough(transformStream) 将readableStream 导入transformStream.writable 并返回transformStream.readable (每个 TransformStream 都有这些属性,它们指向其可写侧和可读侧)。另一种看待这个操作的方式是,我们通过连接transformStream 到readableStream 创建一个新的可读流。当我们探索 TransformStreams 时,我们将看到
.pipeThrough() 的示例,因为这是它们主要使用的方法。
10.3 将数据源通过包装转换为可读流
如果我们想通过一个可读流读取外部源,我们可以将其包装在一个适配器对象中,并将该对象传递给
new ReadableStream(underlyingSource?, queuingStrategy?)
这是底层源的类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface UnderlyingSource<TChunk> { start?( controller: ReadableStreamController<TChunk> ): void | Promise<void>; pull?( controller: ReadableStreamController<TChunk> ): void | Promise<void>; cancel?(reason?: any): void | Promise<void>; // Only used in byte streams and ignored in this section: type: 'bytes' | undefined; autoAllocateChunkSize: bigint; }
这是当可读流调用这些方法时:
-
在调用
ReadableStream 的构造函数后立即调用.start(controller) 。 -
每当可读流的内部队列中有空间时,都会调用
.pull(controller) 。直到队列再次满了为止,它会被重复调用。此方法只会在.start() 完成后调用。如果.pull() 没有入队任何内容,它将不会再次被调用。 -
如果可读流的消费者通过
readableStream.cancel() 或reader.cancel() 取消它,将调用.cancel(reason) 。reason 是传递给这些方法的值。
这些方法中的每一个都可以返回一个 Promise,并且在 Promise 解决之前不会采取进一步的步骤。如果我们想要做一些异步操作,这是有用的。
type ReadableStreamController<TChunk> = | ReadableStreamDefaultController<TChunk> | ReadableByteStreamController<TChunk> // ignored here ; interface ReadableStreamDefaultController<TChunk> { enqueue(chunk?: TChunk): void; readonly desiredSize: number | null; close(): void; error(err?: any): void; }
现在,块是字符串。我们稍后将介绍字节流,其中 Uint8Arrays 很常见。这些方法的作用是:
-
.enqueue(chunk) 将chunk 添加到可读流的内部队列。 -
.desiredSize 指示.enqueue() 写入的队列中有多少空间。如果队列已满,则为零,如果超过了最大大小,则为负。因此,如果期望大小为零或负,则我们必须停止入队。-
如果流关闭,其期望大小为零。
-
如果流处于错误模式,其期望大小为
null 。
-
-
.close() 关闭可读流。消费者仍然可以清空队列,但之后,流将结束。底层源调用此方法很重要-否则,读取其流将永远不会结束。 -
.error(err) 将流置于错误模式:以后与它的所有交互都将以错误值err 失败。
10.3.1 实现底层源的第一个示例
在我们实现底层源的第一个示例中,我们只提供了
const readableStream = new ReadableStream({ start(controller) { controller.enqueue('First line '); // (A) controller.enqueue('Second line '); // (B) controller.close(); // (C) }, }); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'First line ' // 'Second line '
我们使用控制器创建一个具有两个块(行 A 和行 B)的流。关闭流很重要(行 C)。否则,
请注意,这种入队的方式并不完全安全:存在超出内部队列容量的风险。我们很快将看到如何避免这种风险。
10.3.2 使用 ReadableStream 包装推送源或拉取源
一个常见的场景是将推送源或拉取源转换为 ReadableStream。源是推送还是拉取决定了我们将如何与 UnderlyingSource 连接到 ReadableStream:
-
推送源:这样的源在有新数据时通知我们。我们使用
.start() 来设置监听器和支持数据结构。如果我们收到太多数据,期望的大小不再是正数,我们必须告诉我们的源暂停。如果以后调用了.pull() ,我们可以取消暂停。对外部源在期望的大小变为非正数时暂停的反应称为应用背压。 -
拉取源:我们向这样的源请求新数据-通常是异步的。因此,我们通常在
.start() 中不做太多事情,并在调用.pull() 时检索数据。
接下来我们将看到两种来源的例子。
10.3.2.1 示例:从具有背压支持的推送源创建一个 ReadableStream
在下面的示例中,我们将一个 ReadableStream 包装在一个套接字周围-它向我们推送数据(它调用我们)。这个例子来自 web 流规范:
function makeReadableBackpressureSocketStream(host, port) { const socket = createBackpressureSocket(host, port); return new ReadableStream({ start(controller) { socket.ondata = event => { controller.enqueue(event.data); if (controller.desiredSize <= 0) { // The internal queue is full, so propagate // the backpressure signal to the underlying source. socket.readStop(); } }; socket.onend = () => controller.close(); socket.onerror = () => controller.error( new Error('The socket errored!')); }, pull() { // This is called if the internal queue has been emptied, but the // stream’s consumer still wants more data. In that case, restart // the flow of data if we have previously paused it. socket.readStart(); }, cancel() { socket.close(); }, }); }
10.3.2.2 示例:从拉取源创建一个 ReadableStream
工具函数
/** * @param iterable an iterable (asynchronous or synchronous) */ function iterableToReadableStream(iterable) { return new ReadableStream({ start() { if (typeof iterable[Symbol.asyncIterator] === 'function') { this.iterator = iterable[Symbol.asyncIterator](); } else if (typeof iterable[Symbol.iterator] === 'function') { this.iterator = iterable[Symbol.iterator](); } else { throw new Error('Not an iterable: ' + iterable); } }, async pull(controller) { if (this.iterator === null) return; // Sync iterators return non-Promise values, // but `await` doesn’t mind and simply passes them on const {value, done} = await this.iterator.next(); if (done) { this.iterator = null; controller.close(); return; } controller.enqueue(value); }, cancel() { this.iterator = null; controller.close(); }, }); }
让我们使用一个异步生成器函数来创建一个异步可迭代对象,并将该可迭代对象转换为一个 ReadableStream:
async function* genAsyncIterable() { yield 'how'; yield 'are'; yield 'you'; } const readableStream = iterableToReadableStream(genAsyncIterable()); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'how' // 'are' // 'you'
const syncIterable = ['hello', 'everyone']; const readableStream = iterableToReadableStream(syncIterable); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'hello' // 'everyone'
可能会有一个静态的辅助方法
10.4 向 WritableStreams 写入
WritableStreams 让我们向各种接收器写入数据块。它们具有以下类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface WritableStream<TChunk> { getWriter(): WritableStreamDefaultWriter<TChunk>; readonly locked: boolean; close(): Promise<void>; abort(reason?: any): Promise<void>; }
这些属性的解释:
-
.getWriter() 返回一个 Writer-通过它我们可以向 WritableStream 写入数据的对象。 -
.locked :WritableStream 一次只能有一个活动的 Writer。当一个 Writer 正在使用时,WritableStream 被锁定,无法调用.getWriter() 。 -
.close() 关闭流:-
底层接收器(稍后会详细介绍)在关闭之前仍将接收所有排队的块。
-
从现在开始,所有的写入尝试都将无声地失败(没有错误)。
-
该方法返回一个 Promise,如果接收器成功写入所有排队的块并关闭,将实现该 Promise。如果在这些步骤中发生任何错误,它将被拒绝。
-
-
.abort() 中止流:-
它将流置于错误模式。
-
返回的 Promise 在接收器成功关闭时实现,如果发生错误则拒绝。
-
以下小节涵盖了向 WritableStreams 发送数据的两种方法:
-
通过 Writers 向 WritableStreams 写入
-
将数据传输到 WritableStreams
10.4.1 通过 Writers 向 WritableStreams 写入
我们可以使用Writers向 WritableStreams 写入。它们具有以下类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface WritableStreamDefaultWriter<TChunk> { readonly desiredSize: number | null; readonly ready: Promise<undefined>; write(chunk?: TChunk): Promise<void>; releaseLock(): void; close(): Promise<void>; readonly closed: Promise<undefined>; abort(reason?: any): Promise<void>; }
这些属性的解释:
-
.desiredSize 指示 WriteStream 队列中有多少空间。如果队列已满,则为零,如果超过最大大小,则为负数。因此,如果期望的大小为零或负数,我们必须停止写入。-
如果流关闭,它的期望大小为零。
-
如果流处于错误模式,它的期望大小为
null 。
-
-
.ready 返回一个 Promise,在期望的大小从非正数变为正数时实现。这意味着没有背压活动,可以写入数据。如果期望的大小后来再次变为非正数,则会创建并返回一个新的待处理 Promise。 -
.write() 将一个块写入流。它返回一个 Promise,在写入成功后实现,如果有错误则拒绝。 -
.releaseLock() 释放 Writer 对其流的锁定。 -
.close() 具有与关闭 Writer 流相同的效果。 -
.closed 返回一个 Promise,在流关闭时被实现。 -
.abort() 具有与中止 Writer 流相同的效果。
以下代码显示了使用 Writers 的协议:
const writer = writableStream.getWriter(); // (A) assert.equal(writableStream.locked, true); // (B) try { // Writing the chunks (explained later) } finally { writer.releaseLock(); // (C) }
我们不能直接向
有三种写入块的方法。
10.4.1.1 写入方法 1:等待.write() (处理背压效率低下)
第一种写入方法是等待每个
await writer.write('Chunk 1'); await writer.write('Chunk 2'); await writer.close();
由
由
这种写入方法的一个缺点是等待写入成功意味着队列没有被使用。因此,数据吞吐量可能会较低。
10.4.1.2 写入方法 2:忽略.write() 拒绝(忽略背压)
在第二种写入方法中,我们忽略了
writer.write('Chunk 1').catch(() => {}); // (A) writer.write('Chunk 2').catch(() => {}); // (B) await writer.close(); // reports errors
在 A 行和 B 行调用
通过使用一个忽略 Promise 拒绝的辅助函数,可以改进先前的代码:
ignoreRejections( writer.write('Chunk 1'), writer.write('Chunk 2'), ); await writer.close(); // reports errors function ignoreRejections(...promises) { for (const promise of promises) { promise.catch(() => {}); } }
这种方法的一个缺点是忽略了背压:我们只是假设队列足够大,可以容纳我们写入的所有内容。
10.4.1.3 写入方法 3:等待.ready (高效处理背压)
在这种写入方法中,我们通过等待 Writer getter
await writer.ready; // reports errors // How much room do we have? console.log(writer.desiredSize); writer.write('Chunk 1').catch(() => {}); await writer.ready; // reports errors // How much room do we have? console.log(writer.desiredSize); writer.write('Chunk 2').catch(() => {}); await writer.close(); // reports errors
10.4.1.4 示例:通过 Writer 写入文件
在这个例子中,我们通过 WritableStream 创建一个文本文件
import * as fs from 'node:fs'; import {Writable} from 'node:stream'; const nodeWritable = fs.createWriteStream( 'new-file.txt', {encoding: 'utf-8'}); // (A) const webWritableStream = Writable.toWeb(nodeWritable); // (B) const writer = webWritableStream.getWriter(); try { await writer.write('First line '); await writer.write('Second line '); await writer.close(); } finally { writer.releaseLock() }
在 A 行,我们为文件
10.4.2 向 WritableStreams 进行管道传输
除了使用 Writers,我们还可以通过将 ReadableStreams 传输到 WritableStreams 来向 WritableStreams 写入:
await readableStream.pipeTo(writableStream);
由
10.4.2.1 管道传输是异步进行的
管道传输是在当前任务完成或暂停后执行的。以下代码演示了这一点:
const readableStream = new ReadableStream({ // (A) start(controller) { controller.enqueue('First line '); controller.enqueue('Second line '); controller.close(); }, }); const writableStream = new WritableStream({ // (B) write(chunk) { console.log('WRITE: ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE WritableStream'); }, }); console.log('Before .pipeTo()'); const promise = readableStream.pipeTo(writableStream); // (C) promise.then(() => console.log('Promise fulfilled')); console.log('After .pipeTo()'); // Output: // 'Before .pipeTo()' // 'After .pipeTo()' // 'WRITE: "First line "' // 'WRITE: "Second line "' // 'CLOSE WritableStream' // 'Promise fulfilled'
在 A 行我们创建一个 ReadableStream。在 B 行我们创建一个 WritableStream。
我们可以看到
10.4.2.2?示例:将数据管道到文件的可写流
在下面的示例中,我们为一个文件创建一个 WritableStream,并将一个 ReadableStream 管道传递给它:
const webReadableStream = new ReadableStream({ // (A) async start(controller) { controller.enqueue('First line '); controller.enqueue('Second line '); controller.close(); }, }); const nodeWritable = fs.createWriteStream( // (B) 'data.txt', {encoding: 'utf-8'}); const webWritableStream = Writable.toWeb(nodeWritable); // (C) await webReadableStream.pipeTo(webWritableStream); // (D)
在 A 行,我们创建了一个 ReadableStream。在 B 行,我们为文件
10.4.2.3?示例:将两个 ReadableStreams 写入到一个 WritableStream
在下面的示例中,我们将两个 ReadableStreams 写入单个 WritableStream。
function createReadableStream(prefix) { return new ReadableStream({ async start(controller) { controller.enqueue(prefix + 'chunk 1'); controller.enqueue(prefix + 'chunk 2'); controller.close(); }, }); } const writableStream = new WritableStream({ write(chunk) { console.log('WRITE ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE'); }, abort(err) { console.log('ABORT ' + err); }, }); await createReadableStream('Stream 1: ') .pipeTo(writableStream, {preventClose: true}); // (A) await createReadableStream('Stream 2: ') .pipeTo(writableStream, {preventClose: true}); // (B) await writableStream.close(); // Output // 'WRITE "Stream 1: chunk 1"' // 'WRITE "Stream 1: chunk 2"' // 'WRITE "Stream 2: chunk 1"' // 'WRITE "Stream 2: chunk 2"' // 'CLOSE'
我们告诉
10.5?将数据接收端通过包装转换为可写流
如果我们想通过 WritableStream 写入到外部接收端,我们可以将其包装在一个适配器对象中,并将该对象传递给
new WritableStream(underlyingSink?, queuingStrategy?)
这是底层接收端的类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface UnderlyingSink<TChunk> { start?( controller: WritableStreamDefaultController ): void | Promise<void>; write?( chunk: TChunk, controller: WritableStreamDefaultController ): void | Promise<void>; close?(): void | Promise<void>;; abort?(reason?: any): void | Promise<void>; }
这些属性的解释:
-
.start(controller) 在我们调用WritableStream 的构造函数后立即调用。如果我们做一些异步操作,我们可以返回一个 Promise。在这个方法中,我们可以准备写入。 -
.write(chunk, controller) 当一个新的块准备写入外部接收端时调用。我们可以通过返回一个 Promise 来施加反压,一旦反压消失就会实现。 -
.close() 在调用writer.close() 后调用,并且所有排队的写入都成功。在这个方法中,我们可以在写入后进行清理。 -
如果调用了
writeStream.abort() 或writer.abort() ,则会调用.abort(reason) 。reason 是传递给这些方法的值。
interface WritableStreamDefaultController { readonly signal: AbortSignal; error(err?: any): void; }
-
.signal 是一个 AbortSignal,如果我们想在流被中止时中止写入或关闭操作,我们可以监听它。 -
.error(err) 错误 WritableStream:它被关闭,并且以后所有与它的交互都会失败,错误值为err 。
10.5.1?示例:跟踪一个可读流
在下一个示例中,我们将一个 ReadableStream 管道到一个 WritableStream,以便检查 ReadableStream 如何生成块:
const readableStream = new ReadableStream({ start(controller) { controller.enqueue('First chunk'); controller.enqueue('Second chunk'); controller.close(); }, }); await readableStream.pipeTo( new WritableStream({ write(chunk) { console.log('WRITE ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE'); }, abort(err) { console.log('ABORT ' + err); }, }) ); // Output: // 'WRITE "First chunk"' // 'WRITE "Second chunk"' // 'CLOSE'
10.5.2?示例:收集写入到 WriteStream 的块到一个字符串中
在下一个示例中,我们创建了
class StringWritableStream extends WritableStream { #string = ''; constructor() { super({ // We need to access the `this` of `StringWritableStream`. // Hence the arrow function (and not a method). write: (chunk) => { this.#string += chunk; }, }); } getString() { return this.#string; } } const stringStream = new StringWritableStream(); const writer = stringStream.getWriter(); try { await writer.write('How are'); await writer.write(' you?'); await writer.close(); } finally { writer.releaseLock() } assert.equal( stringStream.getString(), 'How are you?' );
这种方法的一个缺点是我们混合了两个 API:
function StringcreateWritableStream() { let string = ''; return { stream: new WritableStream({ write(chunk) { string += chunk; }, }), getString() { return string; }, }; } const stringStream = StringcreateWritableStream(); const writer = stringStream.stream.getWriter(); try { await writer.write('How are'); await writer.write(' you?'); await writer.close(); } finally { writer.releaseLock() } assert.equal( stringStream.getString(), 'How are you?' );
这个功能也可以通过类来实现(而不是作为对象的工厂函数)。
10.6?使用 TransformStreams
一个 TransformStream:
-
通过其writable side接收输入,即 WritableStream。
-
然后可能会或可能不会转换这个输入。
-
结果可以通过一个 ReadableStream 来读取,它的可读端。
使用 TransformStreams 最常见的方式是“管道传递”它们:
const transformedStream = readableStream.pipeThrough(transformStream);
interface ReadableWritablePair<RChunk, WChunk> { readable: ReadableStream<RChunk>; writable: WritableStream<WChunk>; }
10.6.1?标准 TransformStreams
Node.js 支持以下标准 TransformStreams:
-
编码(WHATWG 标准) –
TextEncoderStream 和TextDecoderStream :-
这些流支持 UTF-8,但也支持许多“旧编码”。
-
一个 Unicode 代码点被编码为多达四个 UTF-8 代码单元(字节)。在字节流中,编码的代码点可能会跨越块。
TextDecoderStream 可以正确处理这些情况。 -
大多数 JavaScript 平台都可以使用(
TextEncoderStream ,TextDecoderStream )。
-
-
压缩流(W3C 草案社区组报告) –
CompressionStream ,DecompressionStream :-
当前支持的压缩格式:
deflate (ZLIB 压缩数据格式),deflate-raw (DEFLATE 算法),gzip (GZIP 文件格式)。 -
在许多 JavaScript 平台上都可以使用(
CompressionStream ,DecompressionStream )。
-
10.6.1.1 示例:解码一系列 UTF-8 编码的字节流
在下面的示例中,我们解码了一系列 UTF-8 编码的字节流:
const response = await fetch('https://example.com'); const readableByteStream = response.body; const readableStream = readableByteStream .pipeThrough(new TextDecoderStream('utf-8')); for await (const stringChunk of readableStream) { console.log(stringChunk); }
请注意,单独翻译每个字节块(例如通过
10.6.1.2 示例:创建一个用于标准输入的可读文本流
以下 Node.js 模块记录通过标准输入发送给它的所有内容:
// echo-stdin.mjs import {Readable} from 'node:stream'; const webStream = Readable.toWeb(process.stdin) .pipeThrough(new TextDecoderStream('utf-8')); for await (const chunk of webStream) { console.log('>>>', chunk); }
我们可以通过存储在
请注意,我们逐步处理标准输入:一旦另一个块可用,我们就会记录它。换句话说,我们不会等到标准输入完成。当数据要么很大要么只是间歇性发送时,这是很有用的。
10.7 实现自定义 TransformStreams
我们可以通过将 Transformer 对象传递给
interface Transformer<TInChunk, TOutChunk> { start?( controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; transform?( chunk: TInChunk, controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; flush?( controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; }
这些属性的解释:
-
.start(controller) 在我们调用TransformStream 的构造函数之后立即调用。在这里,我们可以在转换开始之前准备好一些东西。 -
.transform(chunk, controller) 执行实际的转换。它接收一个输入块,并可以使用其参数controller 来排队一个或多个转换后的输出块。它也可以选择不排队任何内容。 -
.flush(controller) 在所有输入块成功转换后调用。在这里,我们可以在转换完成后执行清理工作。
这些方法中的每一个都可以返回一个 Promise,并且在 Promise 解决之前不会采取进一步的步骤。如果我们想要执行一些异步操作,这是很有用的。
参数
interface TransformStreamDefaultController<TOutChunk> { enqueue(chunk?: TOutChunk): void; readonly desiredSize: number | null; terminate(): void; error(err?: any): void; }
-
.enqueue(chunk) 将chunk 添加到 TransformStream 的可读端(输出)。 -
.desiredSize 返回可读端(输出)的 TransformStream 内部队列的期望大小。 -
.terminate() 关闭可读端(输出)并错误可写端(输入)的 TransformStream。如果转换器对可写端(输入)的剩余块不感兴趣并希望跳过它们,则可以使用它。 -
.error(err) 错误 TransformStream:以后所有与它的交互都将以错误值err 失败。
TransformStream 中的背压如何?该类将背压从其可读端(输出)传播到其可写端(输入)。假设转换不会改变数据量太多。因此,Transform 可以忽略背压。但是,可以通过
10.7.1?示例:将任意块的流转换为行流
'
'
class ChunksToLinesTransformer { #previous = ''; transform(chunk, controller) { let startSearch = this.#previous.length; this.#previous += chunk; while (true) { // Works for EOL === ' ' and EOL === ' ' const eolIndex = this.#previous.indexOf(' ', startSearch); if (eolIndex < 0) break; // Line includes the EOL const line = this.#previous.slice(0, eolIndex+1); controller.enqueue(line); this.#previous = this.#previous.slice(eolIndex+1); startSearch = 0; } } flush(controller) { // Clean up and enqueue any text we’re still holding on to if (this.#previous.length > 0) { controller.enqueue(this.#previous); } } } class ChunksToLinesStream extends TransformStream { constructor() { super(new ChunksToLinesTransformer()); } } const stream = new ReadableStream({ async start(controller) { controller.enqueue('multiple lines of text'); controller.close(); }, }); const transformStream = new ChunksToLinesStream(); const transformed = stream.pipeThrough(transformStream); for await (const line of transformed) { console.log('>>>', JSON.stringify(line)); } // Output: // '>>> "multiple "' // '>>> "lines of "' // '>>> "text"'
请注意,Deno 的内置
提示:我们也可以通过异步生成器进行这种转换。它将异步迭代 ReadableStream 并返回一个包含行的异步可迭代对象。其实现在§9.4“通过异步生成器转换可读流”中显示。
10.7.2 提示:异步生成器也非常适合转换流
由于 ReadableStreams 是异步可迭代的,我们可以使用异步生成器来转换它们。这导致非常优雅的代码:
const stream = new ReadableStream({ async start(controller) { controller.enqueue('one'); controller.enqueue('two'); controller.enqueue('three'); controller.close(); }, }); async function* prefixChunks(prefix, asyncIterable) { for await (const chunk of asyncIterable) { yield '> ' + chunk; } } const transformedAsyncIterable = prefixChunks('> ', stream); for await (const transformedChunk of transformedAsyncIterable) { console.log(transformedChunk); } // Output: // '> one' // '> two' // '> three'
10.8?仔细观察背压
让我们仔细观察背压。考虑以下管道链:
rs.pipeThrough(ts).pipeTo(ws);
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
观察:
-
rs 的基础源可以被视为在rs 之前的管道链成员。 -
ws 的基础接收器可以被视为在ws 之后的管道链成员。 -
每个流都有一个内部缓冲区:ReadableStreams 在其基础源之后进行缓冲。WritableStreams 在其基础接收器之前进行缓冲。
假设
-
ws 发出满的信号。 -
pipeTo 停止从ts.readable 读取。 -
ts.readable 发出满的信号。 -
ts 停止从ts.writable 移动块到ts.readable 。 -
ts.writable 发出满的信号。 -
pipeTo 停止从rs 读取。 -
rs 向其基础源发出满的信号。 -
基础源暂停。
这个例子说明我们需要两种功能:
-
接收数据的实体需要能够发出背压信号。
-
发送数据的实体需要对信号做出反应,施加背压。
让我们探索这些功能在 web 流 API 中是如何实现的。
10.8.1?发出背压
背压由接收数据的实体发出信号。Web 流有两个这样的实体:
-
WritableStream 通过 Writer 方法
.write() 接收数据。 -
当其基础源调用 ReadableStreamDefaultController 方法
.enqueue() 时,ReadableStream 接收数据。
在这两种情况下,输入都通过队列进行缓冲。施加背压的信号是队列已满。让我们看看如何检测到这一点。
这些是队列的位置:
-
一个 WritableStream 的队列在 WritableStreamDefaultController 中内部存储(参见 web 流标准)。
-
一个 ReadableStream 的队列在 ReadableStreamDefaultController 中内部存储(参见 web 流标准)。
队列的期望大小是一个数字,表示队列中还有多少空间:
-
如果队列中仍有空间,则为正。
-
如果队列已达到其最大大小,则为零。
-
如果队列已超过其最大大小,则为负。
因此,如果期望的大小为零或更少,我们必须施加背压。它可以通过包含队列的对象的 getter
期望的大小是如何计算的?通过指定所谓的排队策略的对象。
-
方法
.size(chunk) 返回chunk 的大小。- 队列的当前大小是它包含的块的大小之和。
-
属性
.highWaterMark 指定队列的最大大小。
队列的期望大小是高水位标记减去队列的当前大小。
10.8.2?对背压的反应
发送数据的实体需要对信号背压做出反应,通过施加背压。
10.8.2.1?通过 Writer 写入 WritableStream 的代码
-
我们可以在
writer.ready 中等待 Promise。在等待期间,我们被阻塞,期望的背压得到了实现。一旦队列中有空间,Promise 就会被实现。当writer.desiredSize 的值大于零时,实现会被触发。 -
或者,我们可以等待
writer.write() 返回的 Promise。如果我们这样做,队列甚至不会被填满。
如果我们愿意,我们还可以根据
10.8.2.2?ReadableStream 的底层源
可以传递给 ReadableStream 的底层源对象包装了外部源。在某种程度上,它也是管道链的成员;在其 ReadableStream 之前的成员。
-
只有在队列中有空间时,才会要求底层拉取源提供新数据。在没有空间时,会自动施加背压,因为没有数据被拉取。
-
在入队后,底层推送源应检查
controller.desiredSize :如果为零或更少,则应通过暂停其外部源来施加背压。
10.8.2.3?WritableStream 的底层接收端
可以传递给 WritableStream 的底层接收端对象包装了外部接收端。在某种程度上,它也是管道链的成员;在其 WritableStream 之后的成员。
每个外部接收端以不同的方式(在某些情况下根本不)信号背压。底层接收端可以通过从方法
10.8.2.4?一个 transformStream(.writable → .readable )
TransformStream 通过为前者实现底层接收端和为后者实现底层源,将其可写端连接到其可读端。它具有一个内部插槽
-
可写端的底层接收器的
.write() 方法会异步等待,直到没有内部背压,然后将另一个块提供给 TransformStream 的转换器(web streams 标准:TransformStreamDefaultSinkWriteAlgorithm )。然后转换器可以通过其 TransformStreamDefaultController 加入一些内容。请注意,.write() 返回一个 Promise,在方法完成时会被满足。在此之前,WriteStream 通过其队列缓冲传入的写请求。因此,可写端的背压通过该队列及其期望的大小来表示。 -
如果通过 TransformStreamDefaultController 将一个块加入队列,并且可读端的队列变满了,TransformStream 的背压就会被激活(web streams 标准:
TransformStreamDefaultControllerEnqueue )。 -
如果从读取器中读取了一些内容,
ReadableStream 的背压可能会被取消(web streams 标准:ReadableStreamDefaultReaderRead ):-
如果队列中现在有空间,可能是时候调用底层源的
.pull() 了(web streams 标准:.[[PullSteps]] )。 -
可读端的底层源的
.pull() 会取消背压(web streams 标准:TransformStreamDefaultSourcePullAlgorithm )。
-
10.8.2.5 .pipeTo() (ReadableStream → WritableStream)
10.9 字节流
到目前为止,我们只使用过文本流,流的块是字符串。但是 web streams API 也支持字节流,用于二进制数据,其中块是 Uint8Arrays(TypedArrays):
-
ReadableStream 有一个特殊的'bytes' 模式。 -
WritableStream 本身不关心块是字符串还是 Uint8Arrays。因此,实例是文本流还是字节流取决于底层接收器可以处理什么类型的块。 -
TransformStream 可以处理什么类型的块也取决于其 Transformer。
接下来,我们将学习如何创建可读的字节流。
10.9.1 可读的字节流
-
如果
.type 被省略或没有提供底层源,则新实例是一个文本流。 -
如果
.type 是字符串'bytes' ,则新实例是一个字节流:const readableByteStream = new ReadableStream({ type: 'bytes', async start() { /*...*/ } // ... });
如果一个 ReadableStream 处于
在默认模式下,底层源可以返回任何类型的块。在字节模式下,块必须是 ArrayBufferViews,即 TypedArrays(例如 Uint8Arrays)或 DataViews。
此外,可读的字节流可以创建两种读取器:
-
.getReader() 返回一个ReadableStreamDefaultReader 的实例。 -
.getReader({mode: 'byob'}) 返回一个ReadableStreamBYOBReader 的实例。
“BYOB” 代表 “Bring Your Own Buffer”,意味着我们可以传递一个缓冲区(ArrayBufferView)给
此外,可读的字节流具有不同的控制器:它们是
10.9.2 示例:填充随机数据的无限可读的字节流
在下一个示例中,创建一个无限可读的字节流,用随机数据填充其块(灵感来自:
import {promisify} from 'node:util'; import {randomFill} from 'node:crypto'; const asyncRandomFill = promisify(randomFill); const readableByteStream = new ReadableStream({ type: 'bytes', async pull(controller) { const byobRequest = controller.byobRequest; await asyncRandomFill(byobRequest.view); byobRequest.respond(byobRequest.view.byteLength); }, }); const reader = readableByteStream.getReader({mode: 'byob'}); const buffer = new Uint8Array(10); // (A) const firstChunk = await reader.read(buffer); // (B) console.log(firstChunk);
由于
我们在 A 行创建的缓冲区在 B 行之后被传输,因此无法读取。
10.9.3 示例:压缩可读的字节流
在下面的示例中,我们创建一个可读的字节流,并将其通过一个将其压缩为 GZIP 格式的流:
const readableByteStream = new ReadableStream({ type: 'bytes', start(controller) { // 256 zeros controller.enqueue(new Uint8Array(256)); controller.close(); }, }); const transformedStream = readableByteStream.pipeThrough( new CompressionStream('gzip')); await logChunks(transformedStream); async function logChunks(readableByteStream) { const reader = readableByteStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) break; console.log(value); } } finally { reader.releaseLock(); } }
10.9.4 示例:通过fetch() 读取网页
const response = await fetch('https://example.com'); const readableByteStream = response.body; const readableStream = readableByteStream.pipeThrough( new TextDecoderStream('utf-8')); for await (const stringChunk of readableStream) { console.log(stringChunk); }
10.10 Node.js 特定的辅助函数
Node.js 是唯一支持以下辅助函数的 Web 平台,它称之为实用消费者:
import { arrayBuffer, blob, buffer, json, text, } from 'node:stream/consumers';
这些函数将 Web ReadableStreams、Node.js Readables 和 AsyncIterators 转换为被满足的 Promise:
-
ArrayBuffers(
arrayBuffer() ) -
Blobs(
blob() ) -
Node.js 缓冲区(
buffer() ) -
JSON 对象(
json() ) -
字符串(
text() )
假定二进制数据为 UTF-8 编码:
import * as streamConsumers from 'node:stream/consumers'; const readableByteStream = new ReadableStream({ type: 'bytes', start(controller) { // TextEncoder converts strings to UTF-8 encoded Uint8Arrays const encoder = new TextEncoder(); const view = encoder.encode('"??"'); assert.deepEqual( view, Uint8Array.of(34, 240, 159, 152, 128, 34) ); controller.enqueue(view); controller.close(); }, }); const jsonData = await streamConsumers.json(readableByteStream); assert.equal(jsonData, '??');
字符串流按预期工作:
import * as streamConsumers from 'node:stream/consumers'; const readableByteStream = new ReadableStream({ start(controller) { controller.enqueue('"??"'); controller.close(); }, }); const jsonData = await streamConsumers.json(readableByteStream); assert.equal(jsonData, '??');
10.11 进一步阅读
本节提到的所有材料都是本章的来源。
本章不涵盖 Web 流 API 的每个方面。您可以在此处找到更多信息:
-
“WHATWG 流标准” by Adam Rice, Domenic Denicola, Mattias Buelens, and 吉野剛史 (Takeshi Yoshino)
-
“Web Streams API” in Node.js 文档
更多材料:
-
Web 流 API:
-
“在 Node.js 中实现 Web 流 API” by James M. Snell
-
“流 API” 在 MDN 上
-
“流-权威指南” by Thomas Steiner
-
-
背压:
-
“Node.js 流中的背压” by Vladimir Topolev
-
“流中的背压” in Node.js 文档
-
-
Unicode(代码点,UTF-8,UTF-16 等):“Unicode 简介”章节 in “JavaScript for impatient programmers”
-
“异步迭代”章节 in “JavaScript for impatient programmers”
-
“Typed Arrays:处理二进制数据”章节 in “JavaScript for impatient programmers”
评论
十一、流配方
exploringjs.com/nodejs-shell-scripting/ch_stream-recipes.html
-
11.1 写入标准输出(stdout)
-
11.1.1 通过
console.log() 写入 stdout -
11.1.2 通过 Node.js 流写入 stdout
-
11.1.3 通过 Web 流写入 stdout
-
-
11.2 写入标准错误(stderr)
-
11.3 从标准输入(stdin)读取
-
11.3.1 通过 Node.js 流从 stdin 读取
-
11.3.2 通过 Web 流从 stdin 读取
-
11.3.3 通过模块
'node:readline' 从 stdin 读取
-
-
11.4 Node.js 流配方
-
11.5 Web 流配方
11.1 写入标准输出(stdout)
这是写入 stdout 的三个选项:
-
我们可以通过
console.log() 写入它。 -
我们可以通过 Node.js 流写入它。
-
我们可以通过 Web 流写入它。
11.1.1 通过console.log() 写入 stdout
'
console.log('String: %s Number: %d Percent: %%', 'abc', 123); const obj = {one: 1, two: 2}; console.log('JSON: %j Object: %o', obj, obj); // Output: // 'String: abc Number: 123 Percent: %' // 'JSON: {"one":1,"two":2} Object: { one: 1, two: 2 }'
第一个参数之后的所有参数始终显示在输出中,即使没有足够的占位符。
11.1.2 通过 Node.js 流写入 stdout
process.stdout.write('two'); process.stdout.write(' words'); process.stdout.write(' ');
前面的代码等同于:
console.log('two words');
请注意,这种情况下末尾没有换行符,因为
如果我们使用
以下配方适用于
11.1.3 通过 Web 流写入 stdout
我们可以将
import {Writable} from 'node:stream'; const webOut = Writable.toWeb(process.stdout); const writer = webOut.getWriter(); try { await writer.write('First line '); await writer.write('Second line '); await writer.close(); } finally { writer.releaseLock() }
以下配方适用于
11.2 写入标准错误(stderr)
写入 stderr 的工作方式与写入 stdout 类似:
-
我们可以通过
console.error() 写入它。 -
我们可以通过 Node.js 流写入它。
-
我们可以通过 Web 流写入它。
有关更多信息,请参阅前一节。
11.3 从标准输入(stdin)读取
这些是从 stdin 读取的选项:
-
我们可以通过 Node.js 流从中读取。
-
我们可以通过 Web 流从中读取。
-
我们可以使用模块
'node:readline' 。
11.3.1 通过 Node.js 流从 stdin 读取
// Switch to text mode (otherwise we get chunks of binary data) process.stdin.setEncoding('utf-8'); for await (const chunk of process.stdin) { console.log('>', chunk); }
以下配方适用于
11.3.2 通过 Web 流从 stdin 读取
我们首先必须将
import {Readable} from 'node:stream'; // Switch to text mode (otherwise we get chunks of binary data) process.stdin.setEncoding('utf-8'); const webIn = Readable.toWeb(process.stdin); for await (const chunk of webIn) { console.log('>', chunk); }
以下配方适用于
11.3.3 通过模块'node:readline' 从 stdin 读取
内置模块
import * as fs from 'node:fs'; import * as readline from 'node:readline/promises'; const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); const filePath = await rl.question('Please enter a file path: '); fs.writeFileSync(filePath, 'Hi!', {encoding: 'utf-8'}) rl.close();
有关模块
-
§9.3.3“通过模块’node:readlines’从可读流中读取行”
-
官方文档。
11.4 Node.js 流配方
可读流:
-
§9.3.1.2“
Readable.from() : 从可迭代对象创建可读流” -
§9.3.2“通过
for-await-of 从可读流中读取块”- §9.3.2.1“在字符串中收集可读流的内容”
-
§9.3.3“通过模块’node:readlines’从可读流中读取行”
-
§9.4“通过异步生成器转换可读流”
- §9.4.1“在异步可迭代对象中从块转换为编号行”
可写流:
-
§9.5.2“写入可写流”
-
§9.5.2.2“通过
stream.pipeline() 将可读流传输到可写流”
11.5 网络流配方
从中创建一个 ReadableStream:
-
字符串:§10.3.1“实现基础源的第一个示例”
-
可迭代对象:§10.3.2.2“示例:从拉取源创建一个 ReadableStream”
从 ReadableStream 中读取:
-
§10.2.1“通过读取器消耗 ReadableStreams”
-
§10.2.2“通过异步迭代消耗 ReadableStreams”
- §10.2.2.2“示例:组装包含 ReadableStream 内容的字符串”
-
§10.2.3“将 ReadableStreams 传输到 WritableStreams”
转换 ReadableStreams:
-
§10.6“使用 TransformStreams”
-
§10.7.2“提示:异步生成器也非常适合转换流”
-
§10.7.1“示例:将任意块的流转换为行流”
使用 WritableStreams:
-
§10.4“写入可写流”
-
§10.5.2“示例:在字符串中收集写入到 WriteStream 的块”
评论
十二、在子进程中运行 shell 命令
原文:
exploringjs.com/nodejs-shell-scripting/ch_nodejs-child-process.html 译者:飞龙
协议:CC BY-NC-SA 4.0
-
12.1?本章概述](ch_nodejs-child-process.html#overview-of-this-chapter)
-
12.1.1?Windows vs. Unix
-
12.1.2?我们在示例中经常使用的功能
-
-
12.2?异步生成进程:
spawn() -
12.2.1?
spawn() 的工作原理 -
12.2.2?何时执行 shell 命令?](ch_nodejs-child-process.html#when-is-the-shell-command-executed)
-
12.2.3?仅命令模式 vs. 参数模式](ch_nodejs-child-process.html#spawn-argument-modes)
-
12.2.4?向子进程的 stdin 发送数据
-
12.2.5?手动进行管道传输](ch_nodejs-child-process.html#piping-manually)
-
12.2.6?处理不成功的退出(包括错误)
-
12.2.7?等待子进程退出](ch_nodejs-child-process.html#waiting-for-the-exit-of-a-child-process)
-
12.2.8?终止子进程](ch_nodejs-child-process.html#terminating-child-processes)
-
-
12.3?同步生成进程:
spawnSync() -
12.3.1?何时执行 shell 命令?](ch_nodejs-child-process.html#when-is-the-shell-command-executed-1)
-
12.3.2?从 stdout 读取](ch_nodejs-child-process.html#reading-from-stdout)
-
12.3.3?向子进程的 stdin 发送数据
-
12.3.4?处理不成功的退出(包括错误)
-
-
12.4?基于
spawn() 的异步辅助函数-
12.4.1?
exec() -
12.4.2?
execFile()
-
-
12.5?基于
spawnAsync() 的同步辅助函数-
12.5.1?
execSync() -
12.5.2?
execFileSync()
-
-
12.6?有用的库](ch_nodejs-child-process.html#useful-libraries)
-
12.6.1?tinysh:用于生成 shell 命令的辅助程序
-
12.6.2?node-powershell:通过 Node.js 执行 Windows PowerShell 命令
-
-
12.7?在模块
'node:child_process' 的功能之间进行选择
在本章中,我们将探讨如何通过模块
12.1?本章概述
模块
-
一个异步版本的
spawn() 。 -
一个同步版本的
spawnSync() 。
我们将首先探讨
-
基于
spawn() :-
exec() -
execFile()
-
-
基于
spawnSync() :-
execSync() -
execFileSync()
-
12.1.1?Windows vs. Unix
本章中显示的代码在 Unix 上运行,但我也在 Windows 上进行了测试-其中大部分代码需要进行轻微更改(例如以
'
'
12.1.2?我们在示例中经常使用的功能
以下功能在示例中经常出现。这就是为什么在这里解释一次:
-
断言:对于原始值使用
assert.equal() ,对于对象使用assert.deepEqual() 。示例中从未显示必要的导入:import * as assert from 'node:assert/strict';
-
函数
Readable.toWeb() 将 Node 的原生stream.Readable 转换为 web 流(ReadableStream 的实例)。这在§10“在 Node.js 上使用 web 流”中有解释。示例中始终导入Readable 。 -
异步函数
readableStreamToString() 会消耗可读的 web 流并返回一个字符串(包装在 Promise 中)。这在 web 流章节中有解释。假定这个函数在示例中是可用的。
12.2?异步生成进程:spawn()
12.2.1?spawn() 的工作原理
spawn( command: string, args?: Array<string>, options?: Object ): ChildProcess
接下来,有关
12.2.1.1 参数:command
-
仅命令模式:省略
args ,command 包含整个 shell 命令。我们甚至可以使用 shell 功能,如在多个可执行文件之间进行管道传输,将 I/O 重定向到文件,变量和通配符。options.shell 必须为true ,因为我们需要一个 shell 来处理 shell 功能。
-
参数模式:
command 仅包含命令的名称,args 包含其参数。-
如果
options.shell 为true ,则参数中的许多元字符会被解释,并且通配符和变量名称等功能会起作用。 -
如果
options.shell 为false ,则字符串会直接使用,我们不必转义元字符。
-
这两种模式在本章后面进行了演示。
12.2.1.2 参数:options
以下
-
.shell: boolean|string (默认值:false )是否应使用 shell 来执行命令?
-
在 Windows 上,此选项几乎总是应为
true 。例如,否则无法执行.bat 和.cmd 文件。 -
在 Unix 上,只有核心 shell 功能(例如管道,I/O 重定向,文件名通配符和变量)在
.shell 为false 时不可用。 -
如果
.shell 为true ,我们必须小心处理用户输入并对其进行清理,因为很容易执行任意代码。如果我们想将其用作非元字符,则还必须转义元字符。 -
我们还可以将
.shell 设置为 shell 可执行文件的路径。然后 Node.js 将使用该可执行文件来执行命令。如果我们将.shell 设置为true ,Node.js 将使用:-
Unix:
'/bin/sh' -
Windows:
process.env.ComSpec
-
-
-
.cwd: string | URL 指定在执行命令时要使用的当前工作目录(CWD)。
-
.stdio: Array<string|Stream>|string 配置标准 I/O 的设置方式。下面会有解释。
-
.env: Object (默认值:process.env )让我们为子进程指定 shell 变量。提示:
-
查看
process.env (例如在 Node.js REPL 中)以查看存在哪些变量。 -
我们可以使用扩展运算符来非破坏性地覆盖现有变量 - 或者如果尚不存在,则创建它:
{env: {...process.env, MY_VAR: 'Hi!'}}
-
-
.signal: AbortSignal 如果我们创建了一个 AbortController
ac ,我们可以将ac.signal 传递给spawn() ,并通过ac.abort() 中止子进程。这在本章后面有演示。 -
.timeout: number 如果子进程的执行时间超过
.timeout 毫秒,则会被终止。
12.2.1.3 options.stdio
子进程的每个标准 I/O 流都有一个数字 ID,称为文件描述符:
-
标准输入(stdin)的文件描述符为 0。
-
标准输出(stdout)的文件描述符为 1。
-
标准错误(stderr)的文件描述符为 2。
可能会有更多的文件描述符,但这很少见。
-
'pipe' :-
索引 0:将
childProcess.stdin 管道连接到子进程的 stdin。请注意,尽管其名称如此,但前者是属于父进程的流。 -
索引 1:将子进程的 stdout 管道连接到
childProcess.stdout 。 -
索引 2:将子进程的 stderr 管道连接到
childProcess.stderr 。
-
-
'ignore' :忽略子进程的流。 -
'inherit' :将子进程的流管道连接到父进程的相应流。- 例如,如果我们希望子进程的 stderr 被记录到控制台,我们可以在索引 2 处使用
'inherit' 。
- 例如,如果我们希望子进程的 stderr 被记录到控制台,我们可以在索引 2 处使用
-
原生 Node.js 流:管道到该流或从该流。
-
还支持其他值,但这超出了本章的范围。
除了通过数组指定
-
'pipe' 等同于['pipe', 'pipe', 'pipe'] (options.stdio 的默认值)。 -
'ignore' 等同于['ignore', 'ignore', 'ignore'] 。 -
'inherit' 等同于['inherit', 'inherit', 'inherit'] 。
12.2.1.4?结果:ChildProcess 的实例
有趣的数据属性:
-
.exitCode: number | null 包含子进程退出时的代码:
-
0(零)表示正常退出。
-
大于零的数字表示发生了错误。
-
null 表示进程尚未退出。
-
-
.signalCode: string | null 子进程被杀死的 POSIX 信号,或者如果没有被杀死则为
null 。有关更多信息,请参阅下面的.kill() 方法的描述。 -
流:根据标准 I/O 的配置方式(请参阅前面的小节),以下流变得可用:
-
.stdin -
.stdout -
.stderr
-
-
.pid: number | undefined 子进程的进程标识符(PID)。如果生成失败,
.pid 为undefined 。在调用spawn() 后立即可用此值。
有趣的方法:
-
.kill(signalCode?: number | string = 'SIGTERM'): boolean 向子进程发送 POSIX 信号(通常导致进程终止):
-
signal 的 man 页面包含值的列表。 -
Windows 不支持信号,但 Node.js 模拟了其中一些 - 例如:
SIGINT ,SIGTERM 和SIGKILL 。有关更多信息,请参阅Node.js 文档。
此方法在本章后面进行了演示。
-
有趣的事件:
-
.on('exit', (exitCode: number|null, signalCode: string|null) => {}) 此事件在子进程结束后发出:
-
回调参数为我们提供了退出代码或信号代码:其中一个始终为非空。
-
由于多个进程可能共享相同的流,因此其标准 I/O 流可能仍然打开。事件
'close' 在子进程退出后通知我们所有 stdio 流都已关闭。
-
-
.on('error', (err: Error) => {}) 如果进程无法被生成(请参阅示例后面)或子进程无法被杀死,则最常见地发出此事件。在此事件之后可能会或可能不会发出
'exit' 事件。
我们稍后将看到如何将事件转换为可以等待的 Promise。
12.2.2?shell 命令何时执行?
在使用异步
import {spawn} from 'node:child_process'; spawn( 'echo', ['Command starts'], { stdio: 'inherit', shell: true, } ); console.log('After spawn()');
这是输出:
After spawn() Command starts
12.2.3?仅命令模式 vs. 参数模式
在本节中,我们以两种方式指定相同的命令调用:
-
仅命令模式:我们通过第一个参数
command 提供整个调用。 -
参数模式:我们通过第一个参数
command 提供命令,通过第二个参数args 提供参数。
12.2.3.1 仅命令模式
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo "Hello, how are you?"', { shell: true, // (A) stdio: ['ignore', 'pipe', 'inherit'], // (B) } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); // Result on Unix assert.equal( await readableStreamToString(stdout), 'Hello, how are you? ' // (C) ); // Result on Windows: '"Hello, how are you?" '
每个带参数的仅命令生成都需要
在 B 行,我们告诉
-
忽略标准输入。
-
将子进程的标准输出管道到
childProcess.stdout (属于父进程的流)。 -
将子进程的标准错误输出管道到父进程的标准错误输出。
在这种情况下,我们只对子进程的输出感兴趣。因此,一旦我们处理了输出,我们就完成了。在其他情况下,我们可能需要等到子进程退出。如何做到这一点,稍后会有演示。
在仅命令模式下,我们看到 shell 的更多特殊之处 - 例如,Windows 命令 shell 输出包括双引号(最后一行)。
12.2.3.2 参数模式
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo', ['Hello, how are you?'], { shell: true, stdio: ['ignore', 'pipe', 'inherit'], } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); // Result on Unix assert.equal( await readableStreamToString(stdout), 'Hello, how are you? ' ); // Result on Windows: 'Hello, how are you? '
12.2.3.3 args 中的元字符
让我们探讨一下如果
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; async function echoUser({shell, args}) { const childProcess = spawn( `echo`, args, { stdio: ['ignore', 'pipe', 'inherit'], shell, } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); return readableStreamToString(stdout); } // Results on Unix assert.equal( await echoUser({shell: false, args: ['$USER']}), // (A) '$USER ' ); assert.equal( await echoUser({shell: true, args: ['$USER']}), // (B) 'rauschma ' ); assert.equal( await echoUser({shell: true, args: [String.raw`$USER`]}), // (C) '$USER ' );
-
如果我们不使用 shell,例如美元符号(
$ )等元字符没有效果(A 行)。 -
在 shell 中,
$USER 被解释为一个变量(B 行)。 -
如果我们不想要这个,我们必须通过反斜杠转义美元符号(C 行)。
其他元字符(如星号(
这是 Unix shell 元字符的两个例子。Windows shell 有它们自己的元字符和它们自己的转义方式。
12.2.3.4 一个更复杂的 shell 命令
让我们使用更多的 shell 特性(这需要仅命令模式):
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; import {EOL} from 'node:os'; const childProcess = spawn( `(echo cherry && echo apple && echo banana) | sort`, { stdio: ['ignore', 'pipe', 'inherit'], shell: true, } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(stdout), 'apple banana cherry ' );
12.2.4 将数据发送到子进程的标准输入
到目前为止,我们只读取了子进程的标准输出。但是我们也可以将数据发送到标准输入:
import {Readable, Writable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( `sort`, // (A) { stdio: ['pipe', 'pipe', 'inherit'], } ); const stdin = Writable.toWeb(childProcess.stdin); // (B) const writer = stdin.getWriter(); // (C) try { await writer.write('Cherry '); await writer.write('Apple '); await writer.write('Banana '); } finally { writer.close(); } const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(stdout), 'Apple Banana Cherry ' );
我们使用 shell 命令
在 B 行,我们使用
如何通过写入器(C 行)向 WritableStream 写入也在网络流章节中有解释。
12.2.5 手动进行管道传输
我们之前让 shell 执行以下命令:
(echo cherry && echo apple && echo banana) | sort
在下面的例子中,我们手动进行管道传输,从 echo(A 行)到 sorting(B 行):
import {Readable, Writable} from 'node:stream'; import {spawn} from 'node:child_process'; const echo = spawn( // (A) `echo cherry && echo apple && echo banana`, { stdio: ['ignore', 'pipe', 'inherit'], shell: true, } ); const sort = spawn( // (B) `sort`, { stdio: ['pipe', 'pipe', 'inherit'], shell: true, } ); //==== Transferring chunks from echo.stdout to sort.stdin ==== const echoOut = Readable.toWeb( echo.stdout.setEncoding('utf-8')); const sortIn = Writable.toWeb(sort.stdin); const sortInWriter = sortIn.getWriter(); try { for await (const chunk of echoOut) { // (C) await sortInWriter.write(chunk); } } finally { sortInWriter.close(); } //==== Reading sort.stdout ==== const sortOut = Readable.toWeb( sort.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(sortOut), 'apple banana cherry ' );
例如
12.2.6 处理不成功的退出(包括错误)
有三种主要的不成功的退出方式:
-
子进程无法生成。
-
Shell 中发生了错误。
-
一个进程被终止。
12.2.6.1 子进程无法生成
以下代码演示了如果子进程无法生成会发生什么。在这种情况下,原因是 shell 的路径没有指向可执行文件(A 行)。
import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo hello', { stdio: ['inherit', 'inherit', 'pipe'], shell: '/bin/does-not-exist', // (A) } ); childProcess.on('error', (err) => { // (B) assert.equal( err.toString(), 'Error: spawn /bin/does-not-exist ENOENT' ); });
这是我们第一次使用事件来处理子进程。在 B 行,我们为
12.2.6.2 Shell 中发生了错误
如果 shell 代码包含错误,我们不会收到
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'does-not-exist', { stdio: ['inherit', 'inherit', 'pipe'], shell: true, } ); childProcess.on('exit', async (exitCode, signalCode) => { // (A) assert.equal(exitCode, 127); assert.equal(signalCode, null); const stderr = Readable.toWeb( childProcess.stderr.setEncoding('utf-8')); assert.equal( await readableStreamToString(stderr), '/bin/sh: does-not-exist: command not found ' ); } ); childProcess.on('error', (err) => { // (B) console.error('We never get here!'); });
12.2.6.3 进程被终止
如果在 Unix 上终止进程,退出代码是
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'kill $$', // (A) { stdio: ['inherit', 'inherit', 'pipe'], shell: true, } ); console.log(childProcess.pid); // (B) childProcess.on('exit', async (exitCode, signalCode) => { assert.equal(exitCode, null); // (C) assert.equal(signalCode, 'SIGTERM'); // (D) const stderr = Readable.toWeb( childProcess.stderr.setEncoding('utf-8')); assert.equal( await readableStreamToString(stderr), '' // (E) ); });
请注意,没有错误输出(E 行)。
子进程不是自己终止(A 行),我们也可以暂停它更长时间,然后通过我们在 B 行记录的进程 ID 手动终止它。
如果我们在 Windows 上杀死一个子进程会发生什么?
-
exitCode 是1 。 -
signalCode 是null 。
12.2.7 等待子进程退出
有时我们只想等到命令执行完毕。这可以通过事件和 Promise 来实现。
12.2.7.1 通过事件等待
import * as fs from 'node:fs'; import {spawn} from 'node:child_process'; const childProcess = spawn( `(echo first && echo second) > tmp-file.txt`, { shell: true, stdio: 'inherit', } ); childProcess.on('exit', (exitCode, signalCode) => { // (A) assert.equal(exitCode, 0); assert.equal(signalCode, null); assert.equal( fs.readFileSync('tmp-file.txt', {encoding: 'utf-8'}), 'first second ' ); });
我们使用标准的 Node.js 事件模式,并为
12.2.7.2 通过 Promises 等待
import * as fs from 'node:fs'; import {spawn} from 'node:child_process'; const childProcess = spawn( `(echo first && echo second) > tmp-file.txt`, { shell: true, stdio: 'inherit', } ); const {exitCode, signalCode} = await onExit(childProcess); // (A) assert.equal(exitCode, 0); assert.equal(signalCode, null); assert.equal( fs.readFileSync('tmp-file.txt', {encoding: 'utf-8'}), 'first second ' );
我们在 A 行使用的辅助函数
export function onExit(eventEmitter) { return new Promise((resolve, reject) => { eventEmitter.once('exit', (exitCode, signalCode) => { if (exitCode === 0) { // (B) resolve({exitCode, signalCode}); } else { reject(new Error( `Non-zero exit: code ${exitCode}, signal ${signalCode}`)); } }); eventEmitter.once('error', (err) => { // (C) reject(err); }); }); }
如果
-
exitCode 不是零(B 行)。发生了这种情况:-
如果有 shell 错误。那么
exitCode 大于零。 -
如果在 Unix 上杀死子进程。那么
exitCode 是null ,signalCode 是非空的。- 在 Windows 上杀死子进程会产生一个 shell 错误。
-
-
一个
'error' 事件被触发(C 行)。如果孩子进程无法被生成,就会发生这种情况。
12.2.8 终止子进程
12.2.8.1 通过 AbortController 终止子进程
在这个例子中,我们使用 AbortController 来终止一个 shell 命令:
import {spawn} from 'node:child_process'; const abortController = new AbortController(); // (A) const childProcess = spawn( `echo Hello`, { stdio: 'inherit', shell: true, signal: abortController.signal, // (B) } ); childProcess.on('error', (err) => { assert.equal( err.toString(), 'AbortError: The operation was aborted' ); }); abortController.abort(); // (C)
我们创建一个 AbortController(A 行),将其信号传递给
子进程是异步启动的(在当前代码片段执行后)。这就是为什么我们可以在进程甚至开始之前中止,以及为什么在这种情况下我们看不到任何输出。
12.2.8.2 通过 .kill() 终止子进程
在下一个例子中,我们通过方法
import {spawn} from 'node:child_process'; const childProcess = spawn( `echo Hello`, { stdio: 'inherit', shell: true, } ); childProcess.on('exit', (exitCode, signalCode) => { assert.equal(exitCode, null); assert.equal(signalCode, 'SIGTERM'); }); childProcess.kill(); // default argument value: 'SIGTERM'
再次,在孩子进程开始之前我们就杀死了它(异步!),并且没有输出。
12.3 同步生成进程:spawnSync()
spawnSync( command: string, args?: Array<string>, options?: Object ): Object
参数大多与
-
.input: string | TypedArray | DataView 如果这个属性存在,它的值将被发送到子进程的标准输入。
-
.encoding: string (默认:'buffer' )指定用于所有标准 I/O 流的编码。
该函数返回一个对象。它最有趣的属性是:
-
.stdout: Buffer | string 包含写入子进程标准输出流的内容。
-
.stderr: Buffer | string 包含写入子进程标准错误流的内容。
-
.status: number | null 包含子进程的退出代码或
null 。退出代码或信号代码中的一个是非空的。 -
.signal: string | null 包含孩子进程的信号代码或
null 。退出代码或信号代码中的一个是非空的。 -
.error?: Error 只有在生成失败时才会创建这个属性,然后包含一个错误对象。
使用异步的
12.3.1 shell 命令何时执行?
使用同步的
import {spawnSync} from 'node:child_process'; spawnSync( 'echo', ['Command starts'], { stdio: 'inherit', shell: true, } ); console.log('After spawnSync()');
这是输出:
Command starts After spawnSync()
12.3.2 从标准输出读取
以下代码演示了如何读取标准输出:
import {spawnSync} from 'node:child_process'; const result = spawnSync( `echo rock && echo paper && echo scissors`, { stdio: ['ignore', 'pipe', 'inherit'], // (A) encoding: 'utf-8', // (B) shell: true, } ); console.log(result); assert.equal( result.stdout, // (C) 'rock paper scissors ' ); assert.equal(result.stderr, null); // (D)
在 A 行,我们使用
因此,我们只能得到标准输出的结果属性(C 行),标准错误的属性是
由于我们无法访问
12.3.3?向子进程的 stdin 发送数据
我们可以通过选项属性
import {spawnSync} from 'node:child_process'; const result = spawnSync( `sort`, { stdio: ['pipe', 'pipe', 'inherit'], encoding: 'utf-8', input: 'Cherry Apple Banana ', // (A) } ); assert.equal( result.stdout, 'Apple Banana Cherry ' );
12.3.4?处理不成功的退出(包括错误)
有三种主要的不成功的退出情况(当退出代码不为零时):
-
子进程无法被生成。
-
shell 中发生错误。
-
进程被终止。
12.3.4.1?子进程无法生成
如果生成失败,
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'echo hello', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: '/bin/does-not-exist', } ); assert.equal( result.error.toString(), 'Error: spawnSync /bin/does-not-exist ENOENT' );
12.3.4.2?shell 中发生错误
如果在 shell 中发生错误,退出代码
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'does-not-exist', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: true, } ); assert.equal(result.status, 127); assert.equal(result.signal, null); assert.equal( result.stderr, '/bin/sh: does-not-exist: command not found ' );
12.3.4.3?进程被终止
如果在 Unix 上终止子进程,
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'kill $$', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: true, } ); assert.equal(result.status, null); assert.equal(result.signal, 'SIGTERM'); assert.equal(result.stderr, ''); // (A)
请注意,没有输出发送到标准错误流(A 行)。
如果我们在 Windows 上终止一个子进程:
-
result.status 为 1 -
result.signal 为null -
result.stderr 为''
12.4?基于spawn() 的异步辅助函数
在本节中,我们将看到基于
-
exec() -
execFile()
在本章中,我们忽略了
fork() 生成一个新的 Node.js 进程,并调用一个指定的模块,建立了一个 IPC 通信通道,允许在父进程和子进程之间发送消息。
12.4.1?exec()
exec( command: string, options?: Object, callback?: (error, stdout, stderr) => void ): ChildProcess
-
除了返回一个 ChildProcess,
exec() 还通过回调函数传递结果:错误对象或 stdout 和 stderr 的内容。 -
错误原因:子进程无法生成,shell 错误,子进程被终止。
- 相比之下,
spawn() 只在子进程无法被生成时发出'error' 事件。另外两种失败是通过退出代码和(在 Unix 上)信号代码来处理的。
- 相比之下,
-
没有参数
args 。 -
options.shell 的默认值为true 。
import {exec} from 'node:child_process'; const childProcess = exec( 'echo Hello', (error, stdout, stderr) => { if (error) { console.error('error: ' + error.toString()); return; } console.log('stdout: ' + stdout); // 'stdout: Hello ' console.error('stderr: ' + stderr); // 'stderr: ' } );
-
ChildProcess 成为返回的 Promise 的属性。
-
Promise 的解决方式如下:
-
完成值:
{stdout, stderr} -
拒绝值:与回调函数的参数
error 相同,但有两个额外的属性:.stdout 和.stderr 。
-
import * as util from 'node:util'; import * as child_process from 'node:child_process'; const execAsync = util.promisify(child_process.exec); try { const resultPromise = execAsync('echo Hello'); const {childProcess} = resultPromise; const obj = await resultPromise; console.log(obj); // { stdout: 'Hello ', stderr: '' } } catch (err) { console.error(err); }
12.4.2?execFile()
与
-
支持参数
args 。 -
options.shell 的默认值为false 。
与
12.5?基于spawnAsync() 的同步辅助函数
12.5.1?execSync()
execSync( command: string, options?: Object ): Buffer | string
-
只返回 stdout 的内容。
-
三种失败通过异常报告:子进程无法生成,shell 错误,子进程被终止。
- 相比之下,
spawnSync() 的结果只有一个.error 属性,如果子进程无法被生成。另外两种失败是通过退出代码和(在 Unix 上)信号代码来处理的。
- 相比之下,
-
没有参数
args 。 -
options.shell 的默认值为true 。
import {execSync} from 'node:child_process'; try { const stdout = execSync('echo Hello'); console.log('stdout: ' + stdout); // 'stdout: Hello ' } catch (err) { console.error('Error: ' + err.toString()); }
12.5.2?execFileSync()
与
-
支持参数
args 。 -
options.shell 的默认值是false 。
12.6?有用的库
12.6.1?tinysh:生成 shell 命令的辅助程序
tinysh由 Anton Medvedev 是一个帮助生成 shell 命令的小型库-例如:
import sh from 'tinysh'; console.log(sh.ls('-l')); console.log(sh.cat('README.md'));
我们可以通过使用
sh.tee.call({input: 'Hello, world!'}, 'file.txt');
我们可以使用任何属性名称,tinysh 会使用该名称执行 shell 命令。它通过代理实现了这一壮举。这是实际库的略微修改版本:
import {execFileSync} from 'node:child_process'; const sh = new Proxy({}, { get: (_, bin) => function (...args) { // (A) return execFileSync(bin, args, { encoding: 'utf-8', shell: true, ...this // (B) } ); }, });
在 A 行中,我们可以看到如果从
在 B 行中传播
12.6.2?node-powershell:通过 Node.js 执行 Windows PowerShell 命令
在 Windows 上使用node-powershell 库的示例如下:
import { PowerShell } from 'node-powershell'; PowerShell.$`echo "hello from PowerShell"`;
12.7?在模块'node:child_process' 的函数之间进行选择
一般约束:
-
在执行命令时,其他异步任务是否应该运行?
- 使用任何异步函数。
-
您是否只执行一个命令(没有后台异步任务)?
- 使用任何同步函数。
-
您想通过流访问子进程的 stdin 或 stdout 吗?
- 只有异步函数才能让您访问流:在这种情况下,
spawn() 更简单,因为它没有提供传递错误和标准 I/O 内容的回调。
- 只有异步函数才能让您访问流:在这种情况下,
-
您想在字符串中捕获 stdout 或 stderr 吗?
-
异步选项:
exec() 和execFile() -
同步选项:
spawnSync() ,execSync() ,execFileSync()
-
异步函数-在
-
exec() 和execFile() 有两个好处:-
由于它们都通过第一个回调参数报告,因此更容易处理失败。
-
获取 stdout 和 stderr 作为字符串更容易-由于回调。
-
-
如果这些好处对您不重要,您可以选择
spawn() 。它的签名更简单,没有(可选的)回调。
同步函数-在
-
execSync() 和execFileSync() 有两个特点:-
它们返回一个包含 stdout 内容的字符串。
-
由于它们都通过异常报告,因此更容易处理失败。
-
-
如果您需要比
execSync() 和execFileSync() 通过它们的返回值和异常提供的更多信息,则选择spawnSync() 。
在
-
options.shell 在exec() 中的默认值为true ,但在execFile() 中为false 。 -
execFile() 支持args ,exec() 不支持。
评论
第四部分:处理包
原文:
exploringjs.com/nodejs-shell-scripting/pt_packages.html 译者:飞龙
协议:CC BY-NC-SA 4.0
接下来:13 安装 npm 包并运行 bin 脚本
十三、安装 npm 包并运行 bin 脚本
原文:
exploringjs.com/nodejs-shell-scripting/ch_installing-packages.html 译者:飞龙
协议:CC BY-NC-SA 4.0
-
13.1?全局安装 npm 注册表包
-
13.1.1?哪些包是全局安装的?
npm ls -g (ch_installing-packages.html#which-packages-are-installed-globally-npm-ls–g) -
13.1.2?全局安装的包在哪里?
npm root -g (ch_installing-packages.html#where-are-packages-installed-globally-npm-root–g) -
13.1.3?全局安装的 shell 脚本在哪里?
npm bin -g (ch_installing-packages.html#where-are-shell-scripts-installed-globally-npm-bin–g) -
13.1.4?全局安装的包在哪里?npm 安装前缀
-
13.1.5?更改全局安装包的位置
-
-
13.2?在本地安装 npm 注册表包
- 13.2.1?在本地安装 bin 脚本
-
13.3?安装未发布的包
-
13.3.1?
npm link : 全局安装未发布的包 -
13.3.2?
npm link : 在本地安装全局链接的包 -
13.3.3?
npm link : 撤消链接 -
13.3.4?通过本地路径安装未发布的包(ch_installing-packages.html#installing-unpublished-packages-via-local-paths)
-
13.3.5?安装未发布包的其他方法
-
-
13.4?
npx : 在不安装的情况下运行 npm 包中的 bin 脚本- 13.4.1?npx 缓存
-
在本地安装带有 bin 脚本的包意味着将其安装为包内的依赖项。这些脚本只能在该包内访问。
-
全局安装带有 bin 脚本的包意味着将其安装在“全局位置”,以便脚本可以在任何地方访问-无论是当前用户还是系统的所有用户(取决于 npm 的设置方式)。
我们探讨了所有这些的含义以及我们如何在安装后运行 bin 脚本。
13.1?全局安装 npm 注册表包
包
"bin": { "cowsay": "./cli.js", "cowthink": "./cli.js" },
要全局安装此包,我们使用
npm install -g cowsay
注意:在 Unix 上,我们可能需要使用
sudo npm install -g cowsay
之后,我们可以在命令行中使用
请注意,只有 bin 脚本在全局可用。当 Node.js 在
13.1.1?哪些包是全局安装的? npm ls -g
我们可以检查全局安装的包以及它们的位置:
% npm ls -g /usr/local/lib ├── [email protected] ├── [email protected] └── [email protected]
在 Windows 上,安装路径是
pm
>echo %AppData% pm C:UsersjaneAppDataRoaming pm
13.1.2?全局安装的包在哪里? npm root -g
macOS 上的结果:
% npm root -g /usr/local/lib/node_modules
Windows 上的结果:
>npm root -g C:UsersjaneAppDataRoaming pm ode_modules
13.1.3?全局安装的 shell 脚本在哪里? npm bin -g
macOS 上的结果:
% npm bin -g /usr/local/bin % which cowsay /usr/local/bin/cowsay
在 Windows 命令 shell 上的结果:
>npm bin -g C:UsersjaneAppDataRoaming pm >where cowsay C:UsersjaneAppDataRoaming pmcowsay C:UsersjaneAppDataRoaming pmcowsay.cmd
没有文件名扩展名的可执行文件
Windows PowerShell 返回
C:UsersjaneAppDataRoaming pmcowsay.ps1
13.1.4 全局安装的包在哪里?npm 安装前缀
npm 的安装前缀决定了全局安装包和 bin 脚本的安装位置。
这是 macOS 上的安装前缀:
% npm config get prefix /usr/local
因此:
-
包安装在
/usr/local/lib/node_modules 中 -
Bin 脚本安装在
/usr/local/bin 中
这是 Windows 上的安装前缀:
>npm config get prefix C:UsersjaneAppDataRoaming pm
因此:
-
包安装在
C:UsersjaneAppDataRoaming 中
pm
ode_modules -
Bin 脚本安装在
C:UsersjaneAppDataRoaming 中
pm
13.1.5 改变全局安装包位置
在这一部分,我们将研究两种改变全局安装包位置的方法:
-
更改 npm 安装前缀
-
使用 Node.js 版本管理器
13.1.5.1 改变 npm 安装前缀
改变全局安装包位置的一种方法是改变 npm 的安装前缀。
Unix:
mkdir ~/npm-global npm config set prefix '~/npm-global'
Windows 命令 shell:
mkdir "%UserProfile% pm-global" npm config set prefix "%UserProfile% pm-global"
Windows PowerShell:
mkdir "$env:UserProfile pm-global" npm config set prefix "$env:UserProfile pm-global"
配置数据保存在主目录中的
从现在开始,全局安装将被添加到我们刚刚指定的目录中。
之后,我们仍然需要将
**更改 npm 前缀的一个缺点:**如果我们告诉 npm 升级自己,它现在也会安装到新位置。
13.1.5.2 使用 Node.js 版本管理器
Node.js 版本管理器可以让我们同时安装多个 Node.js 版本并在它们之间切换。流行的版本管理器包括:
-
Unix:nvm
-
跨平台:Volta
13.2 安装 npm 注册包到本地
要本地安装 npm 注册包(如
cd my-package/ npm install cowsay
这将向
"dependencies": { "cowsay": "1.5.0", ··· }
此外,该包被下载到以下目录:
my-package/node_modules/cowsay/
在 Unix 上,npm 为 bin 脚本添加了这些符号链接:
my-package/node_modules/.bin/cowsay -> ../cowsay/cli.js my-package/node_modules/.bin/cowthink -> ../cowsay/cli.js
在 Windows 上,npm 将这些文件添加到
ode_modules.bin
cowsay cowsay.cmd cowsay.ps1 cowthink cowthink.cmd cowthink.ps1
没有扩展名的文件是针对基于 Unix 的 Windows 环境(如 Cygwin、MinGW 和 MSYS)的脚本。
% npm bin /Users/john/my-package/node_modules/.bin
注意:本地安装的包始终安装在
% cd $HOME % npm root /Users/john/node_modules
John 的主目录中没有
13.2.1 运行本地安装的 bin 脚本
(本小节中的所有命令都在
13.2.1.1 直接运行 bin 脚本
我们可以从 shell 中如下运行
./node_modules/.bin/cowsay Hello
在 Unix 上,我们可以设置一个辅助程序:
alias npm-exec='PATH=$(npm bin):$PATH'
然后以下命令有效:
npm-exec cowsay Hello
13.2.1.2 通过包脚本运行 bin 脚本
我们还可以在
{ ··· "scripts": { "cowsay": "cowsay" }, ··· }
现在我们可以在 shell 中执行这个命令:
npm run cowsay Hello
这是因为 npm 在 Unix 上临时将以下条目添加到
/Users/john/my-package/node_modules/.bin /Users/john/node_modules/.bin /Users/node_modules/.bin /node_modules/.bin
在 Windows 上,类似的条目被添加到
C:Usersjanemy-package ode_modules.bin C:Usersjane ode_modules.bin C:Users ode_modules.bin C: ode_modules.bin
以下命令列出了包脚本运行时存在的环境变量及其值:
npm run env
13.2.1.3 通过 npx 运行 bin 脚本
在一个包内,可以使用 npx 来访问 bin 脚本:
npx cowsay Hello npx cowthink Hello
稍后再详细介绍 npx。
13.3?安装未发布的包
有时,我们有一个包,要么我们还没有发布,要么永远不会发布,并且想要安装它。
13.3.1?npm link :全局安装未发布的包
假设我们有一个未发布的包,其名称是
cd /tmp/unpublished-package/ npm link
如果我们这样做:
-
npm 将一个符号链接添加到全局的
node_modules (由npm root -g 返回)- 例如:/usr/local/lib/node_modules/@my-scope/unpublished-package -> ../../../../../tmp/unpublished-package
-
在 Unix 上,npm 还会从全局 bin 目录(由
npm bin -g 返回)到每个 bin 脚本添加一个符号链接。该链接不是直接的,而是通过全局node_modules 目录:/usr/local/bin/my-command -> ../lib/node_modules/@my-scope/unpublished-package/src/my-command.js
-
在 Windows 上,它添加了通常的 3 个脚本(通过相对路径引用全局
node_modules 中的链接包):C:UsersjaneAppDataRoaming pmmy-command C:UsersjaneAppDataRoaming pmmy-command.cmd C:UsersjaneAppDataRoaming pmmy-command.ps1
由于链接包的引用方式,其中的任何更改都会立即生效。当它发生变化时,无需重新链接它。
要检查全局安装是否成功,我们可以使用
13.3.2?npm link :在本地安装全局链接的包
在我们全局安装了未发布的包之后(参见前一小节),我们可以选择在我们的一个包中(可以是已发布的或未发布的)中将其安装为本地包:
cd /tmp/other-package/ npm link @my-scope/unpublished-package
这创建了以下链接:
/tmp/other-package/node_modules/@my-scope/unpublished-package -> ../../../unpublished-package
默认情况下,未发布的包不会被添加为
13.3.3?npm link :取消链接
取消本地链接:
cd /tmp/other-package/ npm uninstall @my-scope/unpublished-package
取消全局链接:
cd /tmp/unpublished-package/ npm uninstall -g
13.3.4?通过本地路径安装未发布的包
另一种在本地安装未发布的包的方法是使用
cd /tmp/other-package/ npm install ../unpublished-package
这有两个效果。
首先,创建以下符号链接:
/tmp/other-package/node_modules/@my-scope/unpublished-package -> ../../../unpublished-package
其次,将依赖项添加到
"dependencies": { "@my-scope/unpublished-package": "file:../unpublished-package", ··· }
这种安装未发布的包的方法也适用于全局:
cd /tmp/unpublished-package/ npm install -g .
13.3.5?安装未发布的包的其他方法
-
Yalc 让我们将包发布到本地的“Yalc 仓库”(类似本地注册表)。从该仓库中,我们可以将包安装为依赖项,例如,一个名为
my-package/ 的包。它们被复制到目录my-package/.yalc 中,并且file: 或link: 依赖项被添加到package.json 中。 -
relative-deps 支持package.json 中的"relativeDependencies" ,如果存在的话,会覆盖正常的依赖关系。与npm link 和本地路径安装相比:-
正常的依赖关系不需要更改。
-
相对依赖项被安装为来自 npm 注册表的依赖项(而不是通过符号链接)。
relative-deps 还有助于保持本地安装的相对依赖项及其原始依赖项同步。 -
-
npx link 是npm link 的一个更安全的版本,它不需要全局安装,还有其他好处。
13.4?npx :在不安装它们的情况下运行 npm 包中的 bin 脚本
npx 是一个与 npm 捆绑在一起的用于运行 bin 脚本的 shell 命令。
它最常见的用法是:
npx <package-name> arg1 arg2 ...
这个命令将名称为
npx cowsay Hello
这意味着我们可以在不先安装它们的情况下运行 bin 脚本。npx 最适用于一次性调用 bin 脚本- 例如,许多框架提供用于设置新项目的 bin 脚本,这些通常通过 npx 运行。
npx 第一次使用包后,它将在其缓存中可用,并且后续调用速度更快。但是,我们无法确定包在缓存中停留的时间有多长。因此,npx 不能替代全局或本地安装 bin 脚本。
如果一个包带有与其包名称不同的 bin 脚本,我们可以像这样访问它们:
npx --package=<package-name> <bin-script> arg1 arg2 ...
例如:
npx --package=cowsay cowthink Hello
13.4.1 npx 缓存
npx 的缓存位于哪里?
在 Unix 上,我们可以通过以下命令找到:
npx --package=cowsay node -p "process.env.PATH.split(':').find(p => p.includes('_npx'))"
返回类似于这样的路径:
/Users/john/.npm/_npx/8f497369b2d6166e/node_modules/.bin
在 Windows 上,我们可以使用(一行分成两行):
npx --package=cowsay node -p "process.env.Path.split(';').find(p => p.includes('_npx'))"
返回类似于这样的路径(单个路径分成两行):
C:UsersjaneAppDataLocal pm-cache\_npx 8f497369b2d6166e ode_modules.bin
请注意,npx 的缓存与 npm 用于安装模块的缓存不同:
-
Unix:
-
npm 缓存:
$HOME/.npm/_cacache/ -
npx 缓存:
$HOME/.npm/_npx/
-
-
Windows(PowerShell):
-
npm 缓存:
$env:UserProfileAppDataLocal
pm-cache\_npx -
npx 缓存:
$env:UserProfileAppDataLocal
pm-cache\_cacache
-
两个缓存的父目录可以通过以下方式确定:
npm config get cache
有关 npm 缓存的更多信息,请参阅npm 文档。
与 npx 缓存相比,npm 缓存中的数据永远不会被删除,只会被添加。我们可以在 Unix 上通过以下方式检查其大小:
du -sh $(npm config get cache)/_cacache/
在 Windows PowerShell 上:
DiskUsage /d:0 "$(npm config get cache)\_cacache"
评论