大数跨境

使用Node.js构建分布式视频转码系统。

使用Node.js构建分布式视频转码系统。 索引目录
2025-12-18
0
导读:关注【索引目录】服务号,更多精彩内容等你来探索!Broker 是分布式系统的入门级组件,原因有二:易于上手运行它们强制执行蜂巢/主节点模式,这种模式可以自然地扩展。

关注【索引目录】服务号,更多精彩内容等你来探索!

Broker 是分布式系统的入门级组件,原因有二:

  • 易于上手运行
  • 它们强制执行蜂巢/主节点模式,这种模式可以自然地扩展。
node  <-->              
node  <-->  hive / broker <-->  client-facing server  <-->  client
node  <-->                                            <-->  client

于是我想:为什么不把纯 JavaScript 代理移植到 Node.js 呢?

// broker.js
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";

Bunny({
  port: 3000,
  DEBUG: true,
  cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file
  queue: {
    Durable: true,
    MessageExpiry: 60 // 1 hour
  }
});

说实话,底层 Node.js 真的非常令人印象深刻。虽然花了几天时间,但经过一些优化后,它确实可以运行:

  • 对象 → 二进制编译器
  • SharedArrayBuffers 和线程

实际上它可以做得更好,而且(顺便一提)Node 对此提供了无缝支持:

const buffer = new SharedArrayBuffer();
const worker = new Worker(); // <-

这是一个完全在Node.js中运行的FFmpeg分布式系统的示例。

但首先,请确保已安装 FFmpeg 并将其添加到 PATH 环境变量中。在终端中进行测试:

ffmpeg -i img.jpg img.png

分布式视频转码示例

启动一个Node项目:

npm init -y && npm i bunnimq bunnimq-driver

文件夹结构:

ffmpegserver/
  server.js    # <- the hive
  producer.js  # client-facing server
  consumer.js  # node servers / workers
  .auth        # credentials for producer and consumer verification (like .env)

.auth

请在此处输入您的密钥凭证:(请参阅仓库username:password:privileges
中的权限说明)

sk:mypassword:4
jane:doeeee:1
john:doees:3

server.js

简单的非TLS设置(支持TLS -请参阅GitHub仓库):

import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";

Bunny({
  port: 3000,
  DEBUG: true,
  cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file
  queue: {
    Durable: true,
    QueueExpiry: 0,
    MessageExpiry: 3600
  }
});

producer.js

这是浏览器和其他客户端与之通信的服务器。
它接收请求并将任务推送到 Hive 中。

import BunnyMQ from "bunnimq-driver";
import fs from "node:fs/promises";

const bunny = new BunnyMQ({
  port: 3000,
  host: "localhost",
  username: "sk",
  password: "mypassword",
});

如果队列不存在,则创建队列:

bunny.queueDeclare(
  {
    name: "transcode_queue",
    config: {
      QueueExpiry: 60,
      MessageExpiry: 20,
      AckExpiry: 10,
      Durable: true,
      noAck: false,
    },
  },
  (res) => {
    console.log("Queue creation:", res);
  }
);

通常视频文件由客户提供。
为了演示,我们将直接从本地文件夹读取文件:

async function processVideos() {
  const videos = await fs.readdir(
    "C:/Users/[path to a folder with videos]/Videos/Capcut/test"
  ); // usually a storage bucket link

  for (const video of videos) {
    const job = {
      id: Date.now() + Math.random().toString(36).substring(2),
      input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`,
      outputFormat: "webm",
    };

    // put into the queue
    bunny.publish("transcode_queue", JSON.stringify(job), (res) => {
      console.log(`Job ${job.id} published:`, res ? "ok" : "400");
    });
  }
}

processVideos();

consumer.js

这些是节点,也就是负责拉取任务、转码视频并汇报结果的工作人员。

import BunnyMQ from "bunnimq-driver";
import { spawn } from "child_process";
import path from "path";

const bunny = new BunnyMQ({
  port: 3000,
  host: "localhost",
  username: "john",
  password: "doees",
});

消耗转码队列:

bunny.consume("transcode_queue", async (msg) => {
  console.log("Received message:", msg);

  try {
    const { input, outputFormat } = JSON.parse(msg);

    // normalize paths
    const absInput = path.resolve(input);
    const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`);

    console.log(
      `Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y`
    );

    await new Promise((resolve, reject) => {
      const ffmpeg = spawn(
        "ffmpeg",
        ["-i", absInput, "-f", outputFormat, output, "-y"],
        { shell: true } // helps Windows find ffmpeg.exe
      );

      ffmpeg.on("error", reject);

      // FFmpeg logs to stderr
      ffmpeg.stderr.on("data", (chunk) => {
        process.stderr.write(chunk);
      });

      ffmpeg.on("close", (code, signal) => {
        if (code === 0) {
          console.log(`Transcoding complete: ${output}`);
          return resolve(
            bunny.Ack((ok) => console.log("Ack sent:", ok))
          );
        }

        reject(
          new Error(
            signal ? `Signaled with ${signal}` : `Exited with code ${code}`
          )
        );
      });
    });
  } catch (error) {
    console.error("Error processing message:", error);
    if (bunny.Nack) bunny.Nack();
  }
});

打开多个终端:

node .\server.js   # terminal 1
node .\producer.js # terminal 2

节点可以是任意数量的终端,这就是并行、分布式的部分:

node .\consumer.js

这就是整个模式。
简单、强大,而且可以轻松扩展,因为蜂巢负责这一点。

你可以直接把这个模式应用到 RabbitMQ 上,它也能正常工作。
我当初写bunnimq纯粹是出于玩笑,因为我读了 RabbitMQ 的源代码,了解了它的工作原理……


关注【索引目录】服务号,更多精彩内容等你来探索!


【声明】内容源于网络
0
0
索引目录
索引目录是一家专注于医疗、技术开发、物联网应用等领域的创新型公司。我们致力于为客户提供高质量的服务和解决方案,推动技术与行业发展。
内容 444
粉丝 0
索引目录 索引目录是一家专注于医疗、技术开发、物联网应用等领域的创新型公司。我们致力于为客户提供高质量的服务和解决方案,推动技术与行业发展。
总阅读544
粉丝0
内容444