关注【索引目录】服务号,更多精彩内容等你来探索!
Broker 是分布式系统的入门级组件,原因有二:
-
易于上手运行 -
它们强制执行蜂巢/主节点模式,这种模式可以自然地扩展。
node <-->
node <--> hive / broker <--> client-facing server <--> client
node <--> <--> client
于是我想:为什么不把纯 JavaScript 代理移植到 Node.js 呢?
// broker.js
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";
Bunny({
port: 3000,
DEBUG: true,
cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file
queue: {
Durable: true,
MessageExpiry: 60 // 1 hour
}
});
说实话,底层 Node.js 真的非常令人印象深刻。虽然花了几天时间,但经过一些优化后,它确实可以运行:
-
对象 → 二进制编译器 -
SharedArrayBuffers 和线程
实际上它可以做得更好,而且(顺便一提)Node 对此提供了无缝支持:
const buffer = new SharedArrayBuffer();
const worker = new Worker(); // <-
这是一个完全在Node.js中运行的FFmpeg分布式系统的示例。
但首先,请确保已安装 FFmpeg 并将其添加到 PATH 环境变量中。在终端中进行测试:
ffmpeg -i img.jpg img.png
分布式视频转码示例
启动一个Node项目:
npm init -y && npm i bunnimq bunnimq-driver
文件夹结构:
ffmpegserver/
server.js # <- the hive
producer.js # client-facing server
consumer.js # node servers / workers
.auth # credentials for producer and consumer verification (like .env)
.auth
请在此处输入您的密钥凭证:(请参阅仓库username:password:privileges
中的权限说明)
sk:mypassword:4
jane:doeeee:1
john:doees:3
server.js
简单的非TLS设置(支持TLS -请参阅GitHub仓库):
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";
Bunny({
port: 3000,
DEBUG: true,
cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file
queue: {
Durable: true,
QueueExpiry: 0,
MessageExpiry: 3600
}
});
producer.js
这是浏览器和其他客户端与之通信的服务器。
它接收请求并将任务推送到 Hive 中。
import BunnyMQ from "bunnimq-driver";
import fs from "node:fs/promises";
const bunny = new BunnyMQ({
port: 3000,
host: "localhost",
username: "sk",
password: "mypassword",
});
如果队列不存在,则创建队列:
bunny.queueDeclare(
{
name: "transcode_queue",
config: {
QueueExpiry: 60,
MessageExpiry: 20,
AckExpiry: 10,
Durable: true,
noAck: false,
},
},
(res) => {
console.log("Queue creation:", res);
}
);
通常视频文件由客户提供。
为了演示,我们将直接从本地文件夹读取文件:
async function processVideos() {
const videos = await fs.readdir(
"C:/Users/[path to a folder with videos]/Videos/Capcut/test"
); // usually a storage bucket link
for (const video of videos) {
const job = {
id: Date.now() + Math.random().toString(36).substring(2),
input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`,
outputFormat: "webm",
};
// put into the queue
bunny.publish("transcode_queue", JSON.stringify(job), (res) => {
console.log(`Job ${job.id} published:`, res ? "ok" : "400");
});
}
}
processVideos();
consumer.js
这些是节点,也就是负责拉取任务、转码视频并汇报结果的工作人员。
import BunnyMQ from "bunnimq-driver";
import { spawn } from "child_process";
import path from "path";
const bunny = new BunnyMQ({
port: 3000,
host: "localhost",
username: "john",
password: "doees",
});
消耗转码队列:
bunny.consume("transcode_queue", async (msg) => {
console.log("Received message:", msg);
try {
const { input, outputFormat } = JSON.parse(msg);
// normalize paths
const absInput = path.resolve(input);
const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`);
console.log(
`Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y`
);
await new Promise((resolve, reject) => {
const ffmpeg = spawn(
"ffmpeg",
["-i", absInput, "-f", outputFormat, output, "-y"],
{ shell: true } // helps Windows find ffmpeg.exe
);
ffmpeg.on("error", reject);
// FFmpeg logs to stderr
ffmpeg.stderr.on("data", (chunk) => {
process.stderr.write(chunk);
});
ffmpeg.on("close", (code, signal) => {
if (code === 0) {
console.log(`Transcoding complete: ${output}`);
return resolve(
bunny.Ack((ok) => console.log("Ack sent:", ok))
);
}
reject(
new Error(
signal ? `Signaled with ${signal}` : `Exited with code ${code}`
)
);
});
});
} catch (error) {
console.error("Error processing message:", error);
if (bunny.Nack) bunny.Nack();
}
});
打开多个终端:
node .\server.js # terminal 1
node .\producer.js # terminal 2
节点可以是任意数量的终端,这就是并行、分布式的部分:
node .\consumer.js
这就是整个模式。
简单、强大,而且可以轻松扩展,因为蜂巢负责这一点。
你可以直接把这个模式应用到 RabbitMQ 上,它也能正常工作。
我当初写bunnimq纯粹是出于玩笑,因为我读了 RabbitMQ 的源代码,了解了它的工作原理……
关注【索引目录】服务号,更多精彩内容等你来探索!

