Nodejs learning record.
Asynchronous context tracking
AsyncLocalStorage
Use Context
When you are using asynchronous operations during the execution process, and need pass some context.
Examples
import http from "node:http";
import { AsyncLocalStorage } from "node:async_hooks";
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
/**
* will always get the id implicitly stored by `asyncLocalStorage.run`
*/
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : "-"}:`, msg);
}
let idSeq = 0;
http
.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId("start");
// Imagine any chain of async operations here
setTimeout(() => {
logWithId("finish");
res.end();
}, (2 - idSeq) * 1000);
});
})
.listen(8000);
http.get("http://localhost:8000");
http.get("http://localhost:8000");
// Prints:
// 0: start
// 1: start
// 1: finish
// 0: finish
Real World Case(Koa)
Implementation
class Application {
listen(...args) {
debug("listen");
const server = http.createServer(this.callback());
return server.listen(...args);
}
callback() {
const fn = this.compose(this.middleware);
if (!this.listenerCount("error")) this.on("error", this.onerror);
return (req, res) => {
const ctx = this.createContext(req, res);
return this.ctxStorage.run(ctx, async () => {
return this.handleRequest(ctx, fn);
});
};
}
/* export ctx in app currentContext prop*/
get currentContext() {
if (this.ctxStorage) return this.ctxStorage.getStore();
}
}
Usage
const app = new Koa({ asyncLocalStorage: true });
app.use(async (ctx) => {
assert(ctx === app.currentContext);
await new Promise((resolve) => {
setTimeout(() => {
assert(ctx === app.currentContext);
resolve();
}, 1);
});
await new Promise((resolve) => {
assert(ctx === app.currentContext);
setImmediate(() => {
assert(ctx === app.currentContext);
resolve();
});
});
assert(ctx === app.currentContext);
app.currentContext.body = "ok";
});
AsyncResource
Use Context
manage the lifecycle of asynchronous resources.
Buffer
UseContext
Tools used to handle binary data.
Examples
const http = require("http");
const fs = require("fs");
const server = http.createServer((req, res) => {
if (req.method === "POST" && req.url === "/upload") {
let data = [];
req.on("data", (chunk) => {
data.push(chunk);
});
req.on("end", () => {
const buffer = Buffer.concat(data);
fs.writeFile("uploaded-image.jpg", buffer, (err) => {
if (err) {
res.writeHead(500);
res.end("Server error");
return;
}
const base64Image = buffer.toString("base64");
res.writeHead(200, { "Content-Type": "text/plain" });
res.end(base64Image);
});
});
} else {
res.writeHead(404);
res.end("Not Found");
}
});
server.listen(3000, () => {
console.log("Server is running at http://localhost:3000");
});
C++ Addon
Set up
Install package node-addon-api bindings
to use #include <napi.h>
, node-gyp
to compile.
pnpm i node-addon-api bindings @types/bindings node-gyp
Clone nodejs source code
git clone --depth 1 https://github.com/nodejs/node.git
Set up head file in .vscode/c_cpp_properties.json
(for vscode c++ extension)
{
"configurations": [
{
"name": "Mac",
"includePath": [
"${workspaceFolder}/**",
"/Users/lianwenwu/work/github/node/deps/v8/include",
"/Users/lianwenwu/work/github/node/src",
"node_modules/node-addon-api/"
],
"defines": [],
"macFrameworkPath": [
"/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/System/Library/Frameworks"
],
"compilerPath": "/usr/bin/clang",
"cStandard": "c17",
"cppStandard": "c++17",
"intelliSenseMode": "macos-clang-arm64"
}
],
"version": 4
}
Create binding.gyp
{
"targets": [
{
"target_name": "hello",
"sources": ["hello_world.cc"],
'include_dirs': ["<!(node -p \"require('node-addon-api').include_dir\")"],
'cflags!': ['-fno-exceptions'],
'cflags_cc!': ['-fno-exceptions'],
'conditions': [
["OS=='win'", {
"defines": [
"_HAS_EXCEPTIONS=1"
],
"msvs_settings": {
"VCCLCompilerTool": {
"ExceptionHandling": 1
},
},
}],
["OS=='mac'", {
'xcode_settings': {
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES',
'CLANG_CXX_LIBRARY': 'libc++',
'MACOSX_DEPLOYMENT_TARGET': '10.7',
},
}],
],
}
],
}
Create hello_world.cc
#include <napi.h>
using namespace Napi;
class HelloAddon : public Addon<HelloAddon> {
public:
HelloAddon(Env env, Object exports) {
DefineAddon(exports, {
InstanceMethod("hello", &HelloAddon::Hello, napi_enumerable)
});
}
private:
Value Hello(const CallbackInfo& info) {
return String::New(info.Env(), "world");
}
};
NODE_API_ADDON(HelloAddon)
Create index.mjs
import bindings from "bindings";
const addon = bindings({
bindings: "hello.node",
module_root: process.cwd(),
});
console.log(addon.hello());
Compile and run
npx node-gyp configure build
node index.mjs
Child process
Use Context
Primarily used to execute shell commands and run external programs.
Exec
import { exec } from "child_process";
exec("ls", (err, stdout, stderr) => {
if (err) {
console.error(`exec error: ${err}`);
return;
}
console.log(`stdout: ${stdout}`);
console.error(`stderr: ${stderr}`);
});
Spawn
import { spawn } from "child_process";
/**
* spawn a new process
*/
spawn("ls", {
stdio: "inherit",
});
Fork
// child.ts
process.on("message", (msg) => {
console.log("Message from parent:", msg);
});
let counter = 0;
setInterval(() => {
process.send?.({ counter: counter++ });
}, 1000);
// parent.ts
import { fork } from "child_process";
const child = fork("./child");
child.send({ hello: "world" });
child.on("message", (msg) => {
console.log("Message from child", msg);
});
The difference between spawn
and fork
is that the latter will create an IPC
channel to communicate with parent process.
Note
In the real world, we generally use the cross-spawn
package because the spawn
function in Node.js does not perform well on Windows.
Cluster
Use Context
When deploying Node.js to a server, to fully utilize the server's CPU, we generally use the PM2 cluster mode for deploying Node.js.
If you want to monitor it, you can use easy-monitor.
Usage
import cluster from "cluster";
import http from "http";
import os from "os";
const numCPUs = os.cpus().length;
console.log("cpus", numCPUs);
if (cluster.isPrimary) {
console.log(`Master ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs / 2; i++) {
cluster.fork();
}
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case, it is an HTTP server
http
.createServer((req, res) => {
res.writeHead(200);
res.end("hello world\n");
})
.listen(8000);
console.log(`Worker ${process.pid} started`);
}
Crypto(ˈkrɪptoʊ)
Use Context
Encryption and decryption, and scenarios related to confidentiality.
crypto
has built-in DH
and ECDH
algorithm modules.
Password gen
import crypto from "crypto";
const password = "myPassword";
const salt = crypto.randomBytes(16).toString("hex");
crypto.pbkdf2(password, salt, 10000, 64, "sha512", (err, derivedKey) => {
if (err) throw err;
console.log(derivedKey.toString("hex")); // Prints the hashed password
});
Cipheriv and decipheriv
Cipher
has two useful method update()
and final()
, When using streams,
the pipe()
method will internally call these two methods.
import crypto from "crypto";
import fs from "fs";
const algorithm = "aes-256-cbc";
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);
const data = "rqwreqwerqw41234124121";
fs.writeFileSync("source.txt", data, {
encoding: "utf-8",
});
const cipher = crypto.createCipheriv(algorithm, key, iv);
const input = fs.createReadStream("source.txt", "utf-8");
const output = fs.createWriteStream("encrypted.txt");
input.pipe(cipher).pipe(output);
output.on("finish", () => {
const decipher = crypto.createDecipheriv(algorithm, key, iv);
const input1 = fs.createReadStream("encrypted.txt");
const output1 = fs.createWriteStream("decrypted.txt", "utf-8");
input1.pipe(decipher).pipe(output1);
});
Create and verify Sign
import crypto from "crypto";
const { privateKey, publicKey } = crypto.generateKeyPairSync("rsa", {
modulusLength: 2048,
});
// Sign
const sign = crypto.createSign("SHA256");
sign.update("some data to sign");
const signature = sign.sign(privateKey, "hex");
console.log(signature);
// Verify
const verify = crypto.createVerify("SHA256");
verify.update("some data to sign");
console.log(verify.verify(publicKey, signature, "hex")); // Prints: true
Diagnostics(ˌdaɪəɡˈnɒstɪks) channel
Use Context
Logger.
import { channel, subscribe } from "diagnostics_channel";
/**
* export
**/
const channelName = Symbol("channel");
const c = channel(channelName);
subscribe(channelName, (message) => {
console.log("Received message:", message);
});
c.publish({
data: "Hello, world!",
});
Error
Use Context
Error tracing.
CaptureStackTrace
Creates a .stack property on targetObject, which when accessed returns a string representing the location in the code at which Error.captureStackTrace() was called.
const fun1 = () => {
fun2();
};
const fun2 = () => {
fun3();
};
const fun3 = () => {
log_stack();
};
function log_stack() {
let err = {};
// will not contain info about fun2
Error.captureStackTrace(err, fun2);
console.log(err.stack);
}
fun1();
StackTraceLimit
The Error.stackTraceLimit property specifies the number of stack frames collected by a stack trace (whether generated by new Error().stack or Error.captureStackTrace(obj)).
const fun1 = () => {
fun2();
};
const fun2 = () => {
fun3();
};
const fun3 = () => {
log_stack();
};
function log_stack() {
let err = {};
Error.stackTraceLimit = 2;
Error.captureStackTrace(err);
console.log(err.stack);
}
fun1();
Inspector
Use Context
Analyzing and detecting the runtime behavior of an application.
Cpu Profile
import { Session } from "node:inspector";
import fs from "node:fs";
import { promisify } from "node:util";
const session = new Session();
session.connect();
const post = promisify(session.post).bind(session);
(async () => {
await post("Profiler.enable");
await post("Profiler.start");
const { profile } = await post("Profiler.stop");
fs.writeFileSync("./profile.cpuprofile", JSON.stringify(profile));
})();
HeapSnapShot
import { Session } from "node:inspector";
import fs from "node:fs";
import { promisify } from "node:util";
const session = new Session();
session.connect();
const fd = fs.openSync("profile.heapsnapshot", "w");
session.on("HeapProfiler.addHeapSnapshotChunk", (m) => {
fs.writeSync(fd, m.params.chunk);
});
const post = promisify(session.post).bind(session);
(async () => {
await post("HeapProfiler.takeHeapSnapshot");
session.disconnect();
fs.closeSync(fd);
})();
How to analyze
.heapsnapshot
file? We can use chrome devtools memory tab to load it.
HeapSnapShot Format References
- https://zhengrenzhe.dev/posts/v8-snapshot/
- https://learn.microsoft.com/zh-cn/microsoft-edge/devtools-guide-chromium/memory-problems/heap-snapshot-schema
- https://juejin.cn/post/6940439962722500616
- https://github.com/childrentime/heapquery-js
- https://chromedevtools.github.io/devtools-protocol/v8/Profiler/
Modules
Commonjs
Interoperability
In CommonJS modules, it is not possible to import ES modules except dynamic import
.
// when using import expression, will always treated cjs as es module
await import("cjs");
Caching
Modules are cached after the first time they are loaded. Calling require('./foo')
multiple times will only execs once. We can use delete require.cache[filePath]
combine require('./filepath')
to hot reloading module. A npm packages
clear-module is doing this.
The module wrapper
Before a module's code is executed, Node.js will wrap it with a function wrapper that looks like the following:
(function (exports, require, module, __filename, __dirname) {
// Module code actually lives in here
});
exports
A shortcut be equivalent to module.exports
.
require
-
require.main
The Module object representing the entry script loaded when the Node.js process launched. -
require.resolve
Use the internal require() machinery to look up the location of a module, but rather than loading the module, just return the resolved filename. When writing CLI tools, we usually userequire.resolve
to ensure that we only import packages from our own project.
The module
object
In each module, the module free variable is a reference to the object representing the current module. For convenience, module.exports is also accessible via the exports module-global. module is not actually a global but rather local to each module.
ECMAScript modules
ECMAScript Modules uses asynchronous imports, support top-level await
. When using import
to import modules,
it is necessary to include the file extension. We usually use TypeScript,
so it is rare to see such statements.
Interoperability
import { default as cjs } from "cjs"; // equal to module.exports object
import cjsSugar from "cjs"; // sugar syntax above
import * as m from "cjs"; // equal to module object
console.log(m);
console.log(m === (await import("cjs"))); // dynamic import equal to module object
import { name } from "cjs"; // equal to module.exports.name
Caching
ES modules have their own independent module cache. Currently, Node.js does not lock ability of hot module replacement for ESM modules.
Scope Equivalent
import.meta.url
: The absolute file: URL of the module.import.meta.resolve
: a module-relative resolution function scoped to each module, returning the URL string
import { fileURLToPath } from "url";
import { dirname } from "path";
import { createRequire } from "node:module";
// equal to require.resolve
const dependencyAsset = import.meta.resolve("component-lib");
const require = createRequire(import.meta.url);
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
Custom Import
Net
Use Context
The node:net module provides an asynchronous network API for creating stream-based TCP or IPC servers (net.createServer()
) and clients (net.createConnection()
).
Prerequisite(priːˈrekwəzɪt) knowledge
IPC
we have several ways to make interprocess communication.
Method | OS |
---|---|
Signal | Most |
Unix Domain Socket | Unix Like |
Message Queue | Most |
Anonymous Pipe | Most |
Named Queue | Most |
Shared Memory | Most |
In the abstraction layer implementation of the net
module, Unix systems have chosen Unix domain sockets, while Windows systems have chosen named pipes as the communication mechanism.
Unix
In Unix, everything is treated as a file, and Unix domain sockets are also based on a .socket
file for communication.
// server-ipc.ts
import net from "net";
const server = net.createServer((socket) => {
console.log("Client connected.");
socket.on("data", (data) => {
console.log("Received:", data.toString());
socket.write("Server says: Hello!");
});
socket.on("end", () => {
console.log("Client disconnected.");
});
});
server.listen("/tmp/my-ipc.sock", () => {
console.log("Server listening on UNIX domain socket:", server.address());
});
// client-ipc.ts
import net from "net";
const client = net.createConnection("/tmp/my-ipc.sock", () => {
console.log("Connected to server.");
client.write("Hello, server!");
});
client.on("data", (data) => {
console.log("Received:", data.toString());
client.end();
});
client.on("end", () => {
console.log("Disconnected from server.");
});
Windows
In Windows, The path must refer to an entry in \\?\pipe\
or \\.\pipe\
. Any characters are permitted, but the latter may do some processing of pipe names, such as resolving .. sequences. Despite how it might look, the pipe namespace is flat.
net.createServer().listen(path.join("\\\\?\\pipe", process.cwd(), "myctl"));
Tcp Examples
// server-tcp.ts
import net from 'net';
const server = net.createServer((socket) => {
console.log('Client connected');
socket.on('data', (data) => {
console.log('Received:', data.toString());
socket.write('Server says: Hello!');
});
socket.on('end', () => {
console.log('Client disconnected.);
});
});
server.listen(3000, '127.0.0.1', () => {
console.log('Server started');
});
// client-tcp.ts
import net from 'net';
const client = net.connect(3000, '127.0.0.1', () => {
console.log('Connected to server.');
client.write('Hello, server!');
});
client.on('data', (data) => {
console.log('Received:', data.toString());
client.end();
});
client.on('end', () => {
console.log('Disconnected from server.');
});
Performance Hooks
Prerequisite(priːˈrekwəzɪt) knowledge
Resource Timing
Web Applications mostly consist of a set of downloadable resources. Here resources usually refer to HTML documents, XHR objects, links (such as a stylesheet) or SVG elements.
Performance Timeline
The Performance Timeline API uses PerformanceEntry.entryType to describe the type of the interface represented by this PerformanceEntry object, which represents performance measurements.
High-Resolution Time
The performance.now()
method returns a high-resolution time relative to the start of the page load, typically in milliseconds. It provides a monotonic clock for measuring time intervals and performance analysis. A key characteristic of a monotonic clock is that it is not affected by system time adjustments (such as daylight saving time) or manual changes made by the user. This makes it more reliable and accurate for performance measurement and timing.
Negative example: use Date.now
to measure may not be accurate.
User Timing
We can use performance.mark
and performance.measure
to measure user timing. The measure
method returns the time between two marks.
import async_hooks from "node:async_hooks";
import { performance, PerformanceObserver } from "node:perf_hooks";
const set = new Set();
const hook = async_hooks.createHook({
init(id, type) {
if (type === "Timeout") {
performance.mark(`Timeout-${id}-Init`);
set.add(id);
}
},
destroy(id) {
if (set.has(id)) {
set.delete(id);
performance.mark(`Timeout-${id}-Destroy`);
performance.measure(
`Timeout-${id}`,
`Timeout-${id}-Init`,
`Timeout-${id}-Destroy`
);
}
},
});
hook.enable();
const obs = new PerformanceObserver((list, observer) => {
console.log(list.getEntries()[0]);
performance.clearMarks();
performance.clearMeasures();
observer.disconnect();
});
obs.observe({ entryTypes: ["measure"], buffered: true });
setTimeout(() => {}, 1000);
PerformanceEntry
In Node.js, there are some environment-specific entry types.
- 'node' (Node.js only)
- 'mark' (available on the Web)
- 'measure' (available on the Web)
- 'gc' (Node.js only)
- 'function' (Node.js only)
- 'http2' (Node.js only)
- 'http' (Node.js only)
- 'dns' (Node.js only)
- 'net' (Node.js only)
import { PerformanceObserver } from "node:perf_hooks";
import http from "node:http";
const obs = new PerformanceObserver((items) => {
items.getEntries().forEach((item) => {
console.log(item);
});
});
obs.observe({ entryTypes: ["http"] });
const PORT = 8080;
http
.createServer((req, res) => {
res.end("ok");
})
.listen(PORT, () => {
http.get(`http://127.0.0.1:${PORT}`);
});
Process
unhandledRejection and rejectionHandled
UnhandledRejection and rejectionHandled are a pair of events used to monitor unhandled and handled promises in a program.
The unhandledRejection event is used to capture unhandled promises. It is triggered when a promise is rejected but not caught by any .catch() or Promise.prototype.catch() handler.
The rejectionHandled event is used to capture handled promises. It is triggered when a previously unhandled promise is caught by adding a .catch() handler.
By listening to these two events, we can track and handle uncaught and caught promises in a program.
The uncaughtException event by default captures general JavaScript exceptions. However, when running Node.js with the --unhandled-rejections=strict flag, it also captures unhandled promise rejections.
// try node --unhandled-rejections=strict example.js and ndoe example.js
// example.js
process.on("uncaughtException", (e) => {
console.error("uncaughtException", e);
});
process.on("unhandledRejection", (e) => {
console.info("unhandledRejection:", e);
});
process.on("rejectionHandled", (e) => {
console.info("rejectionHandled", e);
});
// emit unhandledRejection
var p = new Promise(function (resolve, reject) {
reject("rejected");
});
setImmediate(function () {
// emit rejectionHandled in next macrotask
p.catch(function (reason) {
console.info("promise catch:", reason);
});
});
argv
The process.argv property returns an array containing the command-line arguments passed when the Node.js process was launched.
// node example.mjs one two=three four
import { argv } from "node:process";
argv.forEach((val, index) => {
console.log(`${index}: ${val}`);
});
argv0
The first element of argv will be process.execPath
. See process.argv0 if access to the original value of argv[0] is needed
bash -c 'exec -a customArgv0 node'
process.argv[0]
process.argv0
nextTick
The queueMicrotask()
API is an alternative to process.nextTick() that also defers execution of a function using the same microtask queue used to execute the then, catch, and finally handlers of resolved promises. Within Node.js, every time the "next tick queue" is drained, the microtask queue is drained immediately after.
Difference between cjs and esm
import { nextTick } from "node:process";
nextTick(() => console.log(1));
Promise.resolve().then(() => console.log(2));
queueMicrotask(() => console.log(3));
Stream
Object mode
All streams created by Node.js APIs operate exclusively on strings and Buffer (or Uint8Array) objects. It is possible, however, for stream implementations to work with other types of JavaScript values (with the exception of null, which serves a special purpose within streams). Such streams are considered to operate in "object mode".
Stream instances are switched into object mode using the objectMode option when the stream is created. Attempting to switch an existing stream into object mode is not safe.
Buffering
For a Writable Stream, the value of highWaterMark
determines when the stream's flow control mechanism is triggered. When the amount of data being written exceeds highWaterMark
, the writable stream pauses receiving data until the data in its internal buffer decreases to below the low watermark, and then it resumes receiving data.
For a Readable Stream, the value of highWaterMark determines when the stream starts generating read operations. When the amount of data in the internal buffer of the readable stream reaches highWaterMark
, the readable stream stops reading data from the underlying resource (such as a file or network) until the consumer reads the data from the buffer, causing the data in the buffer to decrease below the low watermark.
In object mode, the meaning of highWaterMark is slightly different. It is no longer measured in bytes but represents the number of objects the stream can hold. This is because in object mode, the stream transmits JavaScript objects instead of raw byte data.
WriteableStream
Drain
If a call to stream.write(chunk)
returns false(when buffer size is at highWaterMark
), the 'drain' event will be emitted when it is appropriate to resume writing data to the stream.
import fs from "node:fs";
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once("drain", write);
}
}
}
const writable = fs.createWriteStream("./output.txt");
writeOneMillionTimes(writable, Buffer.from("hello worlddd"));
Cork & UnCork
The writable.cork()
method forces all written data to be buffered in memory.
When using writable.cork()
and writable.uncork()
to manage the buffering of writes to a stream, defer calls to writable.uncork()
using process.nextTick()
. Doing so allows batching of all writable.write()
calls that occur within a given Node.js event loop phase.
import fs from "node:fs";
const writableStream = fs.createWriteStream("output.txt");
writableStream.cork();
// 写入一些数据到可写流
writableStream.write("Data 1\n");
writableStream.write("Data 2\n");
writableStream.write("Data 3\n");
process.nextTick(() => {
writableStream.uncork();
});
Readable streams
Reading Mode
Readable streams effectively operate in one of two modes: flowing and paused.
-
In flowing mode, data is read from the underlying system automatically and provided to an application as quickly as possible using events via the EventEmitter interface.
-
In paused mode, the
stream.read()
method must be called explicitly to read chunks of data from the stream.
Adding a data
event handler or Calling the stream.pipe()
method will switch mode to flow.
stream.resume()
and stream.pause()
are a pair of methods used to toggle the mode.
Adding a readable
event handler automatically makes the stream stop flowing, and the data has to be consumed via readable.read()
. If the readable
event handler is removed, then the stream will start flowing again if there is a data
event handler.
Reading Data Using flow mode
import fs from "node:fs";
const readStream = fs.createReadStream("./output.txt");
readStream.on("data", (data) => {
console.log("data", data);
});
readStream.on("end", () => {
console.log("stream close");
});
Reading Data Using paused mode
import fs from "node:fs";
const readStream = fs.createReadStream("./output.txt");
readStream.on("readable", () => {
console.log("read", readStream.read());
});
readStream.on("end", () => {
console.log("stream close");
});
Duplex
Duplex streams are streams that implement both the Readable
and Writable
interfaces.
import { Duplex } from "stream";
const myDuplexStream = new Duplex({
write(chunk, encoding, callback) {
console.log(`Writing data: ${chunk}`);
callback();
},
read(size) {
const data = String.fromCharCode(this.currentCharCode++);
if (this.currentCharCode > 90) {
this.push(null); // 数据读取完毕,结束流
} else {
console.log(`Reading data: ${data}`);
this.push(data);
}
},
});
// ASCII 'A'
myDuplexStream.currentCharCode = 65;
myDuplexStream.on("data", (chunk) => {
console.log(`Received data: ${chunk}`);
});
myDuplexStream.write("Data 1");
myDuplexStream.write("Data 2");
myDuplexStream.end();
allowHalfOpen
If false then the stream will automatically end the writable side when the readable side ends. Set initially by the allowHalfOpen constructor option, which defaults to true.
Transfrom
Transform streams are Duplex streams where the output is in some way related to the input.
Transfrom
All Transform stream implementations must provide a _transform()
method to accept input and produce output.
const { pipe, abort } = renderToPipeableStream(jsx, {
bootstrapScripts: [...jsArr],
onShellReady() {
const injectableTransform = new Transform({
// transform react html stream to inject style and store script
transform(_chunk, _encoding, callback) {
try {
let chunk = _chunk;
if (criticalStyles.size !== 0) {
if (resultArr.length !== 0) {
const result = resultArr[0];
const applyScript = `<script async>
const event = new CustomEvent('${STREAMING_DESERIALIZATION_EVENT}', {
detail: ${serialize(result, { isJSON: true })}
});document.dispatchEvent(event);</script>`;
chunk = joinChunk(applyScript, chunk);
}
const styles = [...criticalStyles.keys()]
.map((key) => {
const style = criticalStyles.get(key);
return `<style type="text/css" id=${key}>${style}</style>`;
})
.join("");
const applyStyle = `<script async>document.head.insertAdjacentHTML("beforeend", ${JSON.stringify(
styles
)});</script>`;
chunk = joinChunk(applyStyle, chunk);
}
this.push(chunk);
callback();
} catch (e) {
if (e instanceof Error) {
callback(e);
} else {
callback(new Error("Received unkown error when streaming"));
}
}
},
});
const styles = [...criticalStyles.keys()]
.map((key) => {
const style = criticalStyles.get(key);
return `<style type="text/css" id=${key}>${style}</style>`;
})
.join("");
res.statusCode = 200;
res.setHeader("content-type", "text/html");
res.write(`<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>React App</title>
${styles}
<script>window.data=${serialize(store, { isJSON: true })}</script>
</head>
<body>
<div id="main">`);
criticalStyles.clear();
pipe(injectableTransform).pipe(res);
},
onShellError(error) {
console.log(error);
res.statusCode = 500;
res.setHeader("content-type", "text/html");
res.end("<h1>Something went wrong</h1>");
},
onError(error) {
console.error(error);
},
});
function joinChunk<Chunk extends { toString: () => string }>(
before = "",
chunk: Chunk,
after = ""
) {
return `${before}${chunk.toString()}${after}`;
}
Flush
This will be called when there is no more written data to be consumed, but before the end
event is emitted signaling the end of the Readable stream.
import { Transform } from 'stream';
const myTransformStream = new Transform({
transform(chunk, encoding, callback) {
const uppercasedChunk = chunk.toString().toUpperCase();
this.push(uppercasedChunk);
callback();
},
flush(callback) {
this.push('Extra Data');
callback();
}
});
myTransformStream.on('data', (chunk) => {
console.log(`Received data: ${chunk}`);
});
myTransformStream.write('Data 1');
myTransformStream.write('Data 2');
myTransformStream.end();
TLS
Prerequisite(priːˈrekwəzɪt) knowledge
-
Introduction (ClientHello): Your browser sends a “ClientHello” message to the server when you request a secure website. This message contains essential information, including the random number, SSL/TLS versions it supports and the cipher suites it can use.
-
Server’s Response (ServerHello): The server replies with a “ServerHello” message, including the highest SSL/TLS version and cipher suite both parties support.
-
Server’s Credentials: The server presents its digital certificate, verified by a Certificate Authority (CA) such as
Let's Encrypt
like an ID card providing its authenticity. -
Client’s Verification and Key Generation: Your browser validates the server’s certificate. Once verified, it uses the server’s public key to encrypt a
premaster secret
, a unique session key, and sends it back to the server. -
Establishing a Secure Connection: The server decrypts the premaster secret with its private key. The server and client then compute the session key, which will be used for symmetric encryption of all communication.
Examples
# generate a self-signed CERTIFICATE
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj '/CN=localhost'
// server.ts
import tls from 'tls';
import fs from 'fs';
const options = {
key: fs.readFileSync('key.pem'),
cert: fs.readFileSync('cert.pem'),
};
const server = tls.createServer(options, (socket) => {
console.log('server connected',
socket.authorized ? 'authorized' : 'unauthorized');
socket.write('welcome!\n');
socket.setEncoding('utf8');
socket.pipe(socket);
});
server.listen(8000, () => {
console.log('server bound');
});
// client.ts
import tls from 'tls';
import fs from 'fs';
const options = {
// identify server
ca: [ fs.readFileSync('cert.pem') ],
// client can also pass key and cert, if the server need to identify it.
};
const socket = tls.connect(8000, options, () => {
console.log('client connected',
socket.authorized ? 'authorized' : 'unauthorized');
socket.write('hello I am client!\n');
});
socket.setEncoding('utf8');
socket.on('data', (data) => {
console.log(data);
});
Udp
import dgram from 'node:dgram';
const server = dgram.createSocket('udp4');
server.on('message', (msg, rinfo) => {
console.log(`server receive: ${msg} from ${rinfo.address}:${rinfo.port}`);
});
server.on('listening', () => {
const address = server.address();
console.log(`server is listening at ${address.address}:${address.port}`);
});
server.bind(41234);
// listening 0.0.0.0:41234
const client = dgram.createSocket('udp4');
const message = Buffer.from('this is a message');
client.send(message, 41234, 'localhost', (err) => {
client.close();
});
Util
import util from 'node:util';
import fs from 'node:fs';
// inspect
const obj = { name: 'John', age: 30, city: 'New York' };
console.log(util.inspect(obj));
// promisify
const readFile = util.promisify(fs.readFile);
async function main() {
const data = await readFile('./example.txt', 'utf8');
console.log(data);
}
main();
//callbackify
function myAsyncFunction() {
return new Promise((resolve, reject) => {
resolve('Hello World!');
});
}
const callbackFunction = util.callbackify(myAsyncFunction);
callbackFunction((err, ret) => {
if (err) throw err;
console.log(ret);
});
Workers
import { fileURLToPath } from "node:url";
import { Worker, isMainThread, workerData } from "worker_threads";
const __filename = fileURLToPath(import.meta.url);
if (isMainThread) {
const sharedBuffer = new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);
const worker1 = new Worker(__filename, { workerData: sharedBuffer });
const worker2 = new Worker(__filename, { workerData: sharedBuffer });
worker1.on("exit", () => {
console.log(sharedArray);
});
worker2.on("exit", () => {
console.log(sharedArray);
});
} else {
const sharedArray = new Int32Array(workerData);
for (let i = 0; i < 10000; i++) {
// thread race condition
sharedArray[0]++;
// solution
// Atomics.add(sharedArray, 0, 1);
}
}