大数跨境
0
0

基于 Apache Fluss(孵化中)与 Lance 搭建实时多模态 AI 分析系统

基于 Apache Fluss(孵化中)与 Lance 搭建实时多模态 AI 分析系统 Lance & LanceDB
2025-09-17
2
图片

本文翻译自LanceDB Blog

作者:Wayne Wang@腾讯


随着生成式人工智能(AI)时代的到来,几乎所有数字应用都在整合多模态 AI 创新技术,以提升自身能力。


多模态 AI 应用的成功部署在很大程度上依赖于数据,对于企业决策系统而言尤其如此 —— 高质量、时效性强的数据是这类系统的关键所在。


例如,在客户服务和风险控制系统中,无法获取实时客户输入的多模态 AI 模型,将难以做出准确决策。


Apache Fluss(孵化中)正是为满足这一需求而生,它是一款专为实时分析打造的流存储产品。


作为湖仓一体(lakehouse)架构的实时数据层,Fluss 具备智能分层服务,可确保流存储与湖存储之间的无缝集成。该服务会自动、持续地将 Fluss 中的数据转换为湖仓一体格式,从而实现 “数据共享” 功能:


1. Fluss 作为湖仓一体的实时层:Fluss 能对流数据进行高吞吐量、低延迟的输入与处理,成为湖仓一体架构的实时前端。


2. 湖仓一体作为 Fluss 的历史层:湖仓一体提供经过优化的长期存储,数据实效性可达分钟级,是实时层的历史批处理数据基础。


作为 Fluss 生态的补充,Lance 是专为机器学习和多模态 AI 应用设计的下一代 AI 湖仓平台。它能高效处理多模态数据(文本、图像、向量等),并支持高性能查询,例如速度极快的向量检索(详见 LanceDB)。



结合 Lance 与 Fluss,解锁真正的实时多模态 AI 分析检索增强生成(Retrieval-Augmented Generation,简称 RAG)是当前主流的生成式 AI 框架,通过在生成过程中融入最新信息,显著提升了大型语言模型(Large Language Models,简称 LLMs)的能力。


在 RAG 流程中,向量检索技术负责理解用户的查询上下文,并检索相关信息输入至大型语言模型,为模型生成内容提供支撑。


而借助 Fluss 与 Lance 的组合,RAG 系统如今可实现以下核心能力:


1. 依托 Lance 强大的索引功能,对其存储的历史数据进行高效向量检索。


2. 针对 Fluss 中未建立索引的实时流数据,计算即时洞察(live insights)。


3. 融合来自上述两个数据源的结果,为大型语言模型提供兼具时效性与全面性的信息,进一步提升模型性能。


在本篇博客中,我们将演示具体操作流程:先将图像数据流式传入 Fluss,再从 Lance 数据集中加载数据并转换为 Pandas DataFrame。


通过这一方式,图像数据能以更便捷的形式被访问,从而更好地服务于后续机器学习工作流中的处理与分析环节。



MinIO 安装配置


步骤 1:在本地安装 MinIO


如需详细说明,请查看官方文档。

- 链接:https://docs.min.io/enterprise/aistor-object-store/installation/macos/


步骤 2:启动 MinIO 服务器


运行以下命令,并指定用于存储 MinIO 数据的本地路径:

export MINIO_REGION_NAME=us-west-1export MINIO_ROOT_USER=minio export MINIO_ROOT_PASSWORD=minioadmin minio server /tmp/minio-tmp

步骤 3:通过 Web 界面验证 MinIO 是否正常运行


当你的 MinIO 服务器成功启动并正常运行后,你将看到端点信息和登录凭据:

API: http://192.168.3.40:9000  http://127.0.0.1:9000    RootUser: minio    RootPass: minioadmin WebUI: http://192.168.3.40:64217 http://127.0.0.1:64217               RootUser: minio    RootPass: minioadmin 

步骤 4:创建存储桶


打开 Web 界面链接,使用上述凭据登录。你可以通过 Web 界面创建一个 Lance 存储桶。



Fluss 与 Lance 安装配置


步骤 1:在本地安装 Fluss


如需详细说明,请查看官方文档。

- 链接:https://fluss.apache.org/docs/install-deploy/deploying-local-cluster/


步骤 2:配置 Lance 数据湖


编辑 

<FLUSS_HOME>/conf/server.yaml 文件,并添加以下配置:

datalake.format: lancedatalake.lance.warehouse: s3://lancedatalake.lance.endpoint: http://localhost:9000datalake.lance.allow_http: truedatalake.lance.access_key_id: miniodatalake.lance.secret_access_key: minioadmindatalake.lance.region: eu-west-1


此配置将 Lance 设为数据湖格式,同时将 MinIO 用作数据仓库。


步骤 3:启动 Fluss


<FLUSS_HOME>/bin/local-cluster.sh start


步骤 4:安装 Flink Fluss 连接器


从官网下载 Apache Flink 1.20 二进制包。下载 Fluss 连接器的 Jar 包,并将其复制到 Flink 的类路径 <FLINK_HOME>/lib 


步骤 5:配置 Flink Fluss 连接器


Flink Fluss 连接器使用 Apache Arrow Java API,而该 API 依赖直接内存。


需通过编辑

 <FLINK_HOME>/conf/config.yaml 文件中的 taskmanager.memory.task.off-heap.size 参数,为 Flink Task Manager(任务管理器)配置足够的堆外内存,具体如下:

# Increase available task slots.numberOfTaskSlots5taskmanager.memory.task.off-heap.size: 128m


步骤 6:启动 Flink


通过以下命令在本地启动 Flink:

<FLINK_HOME>/bin/start-cluster.sh

步骤 7:验证 Flink 是否正常运行


打开浏览器,访问http://localhost:8081确认集群处于运行状态。




启动 Fluss 分层服务


步骤 1:获取分层任务 Jar 包


下载 Fluss 分层任务 Jar 包。

下载链接:https://fluss.apache.org/downloads/


步骤 2:提交任务

./bin/flink run <path_to_jar>/fluss-flink-tiering-0.8.jar \    --fluss.bootstrap.servers localhost:9123 \    --datalake.format lance \    --datalake.lance.warehouse s3://lance \    --datalake.lance.endpoint http://localhost:9000 \    --datalake.lance.allow_http true \    --datalake.lance.secret_access_key minioadmin \    --datalake.lance.access_key_id minio \    --datalake.lance.region eu-west-1 \    --table.datalake.freshness 30s

步骤 3:确认部署


在 Flink 界面中查看 Fluss 数据湖分层服务(Fluss Lake Tiering Service)任务。该任务启动后,你的本地分层流水线即可正常使用。




多模态数据处理


下面将通过一个 Python 代码示例,演示如何将图像数据集流式传输到 Fluss 中,随后将分层后的 Lance 数据集加载到 Pandas DataFrame(数据框)中,以便进行后续处理。


步骤 1:安装依赖项


安装所需的依赖项:

pip install python-for-fluss pyarrow pylance pandas


步骤 2:创建连接


使用 Fluss Python 客户端创建连接和表格:

import flussimport pyarrow as paasync def create_connection(config_spec):    # Create connection    config = fluss.Config(config_spec)    conn = await fluss.FlussConnection.connect(config)    return connasync def create_table(conn, table_path, pa_schema):    # Create a Fluss Schema    fluss_schema = fluss.Schema(pa_schema)    # Create a Fluss TableDescriptor    table_descriptor = fluss.TableDescriptor(        fluss_schema,        properties={            "table.datalake.enabled""true",            "table.datalake.freshness""30s",        },    )    # Get the admin for Fluss    admin = await conn.get_admin()    # Create a Fluss table    try:        await admin.create_table(table_path, table_descriptor, True)        print(f"Created table: {table_path}")    except Exception as e:        print(f"Table creation failed: {e}")

步骤 3:处理图像

import osdef process_images(schema: pa.Schema):        # Get the current directory path        current_dir = os.getcwd()        images_folder = os.path.join(current_dir, "image")        # Get the list of image files        image_files = [filename for filename in os.listdir(images_folder)         if filename.endswith((".png"".jpg"".jpeg"))]        # Iterate over all images in the folder with tqdm        for filename in tqdm(image_files, desc="Processing Images"):                # Construct the full path to the image                image_path = os.path.join(images_folder, filename)                # Read and convert the image to a binary format                with open(image_path, 'rb'as f:                        binary_data = f.read()                image_array = pa.array([binary_data], type=pa.binary())                # Yield RecordBatch for each image                yield pa.RecordBatch.from_arrays([image_array], schema=schema)


该 process_images 函数是我们数据转换流程的核心。它负责遍历指定目录下的图像文件,读取每个图像的数据,并将其转换为二进制格式的 PyArrow RecordBatch(记录批次)对象。


步骤 4:写入 Fluss


write_to_fluss 函数从 process_images 生成器创建一个 RecordBatchReader(记录批次读取器),并将生成的数据写入 Fluss 的 fluss.images_minio 表中。

async def write_to_fluss(conn, table_path, pa_schema):    # Get table and create writer‘    table = await conn.get_table(table_path)    append_writer = await table.new_append_writer()    try:        print("\n--- Writing image data ---")        for record_batch in process_images(pa_schema):            append_writer.write_arrow_batch(record_batch)    except Exception as e:        print(f"Failed to write image data: {e}")    finally:        append_writer.close()

步骤 5:检查任务完成情况


稍等片刻,待分层任务结束后,进入 MinIO 界面,你将在 lance 存储桶中看到 Lance 数据集。你可以借助 Lance 的 to_pandas API,通过 Pandas 加载该数据集。



步骤 6:加载到 Pandas 中


当 Fluss 实时将数据写入 Lance 时,你可以从 MinIO 中的 Lance 数据集进行数据加载。


此时,你可借助任何与 Lance 集成的工具,在各类机器学习 / 人工智能(ML/AI)应用中使用这些数据,包括用于模型训练的 PyTorch(详见 Lance + PyTorch 集成文档)、用于检索增强生成(RAG)集成的 LangChain,以及用于混合搜索的 LanceDB。


你也可以使用 Lance 的 Blob API,以高效、底层的方式访问多模态图像数据。


Lance + PyTorch 集成文档:https://lancedb.github.io/lance/integrations/pytorch/


下面提供一个简单的 loading_into_pandas 示例,说明如何使用 Lance 内置的 to_pandas API,将 Lance 数据集中的图像数据加载到 Pandas DataFrame(数据框)中,从而让图像数据可通过 Pandas 用于后续的下游 AI分析。

import lanceimport pandas as pddef loading_into_pandas(table_name):        dataset = lance.dataset(        "s3://lance/fluss/" + table_name + ".lance"        storage_options={            "access_key_id""minio"            "secret_access_key""minioadmin"            "endpoint""http://localhost:9000"            "allow_http""true",        },    )
    df = dataset.to_table().to_pandas()        print("Pandas DataFrame is ready")        print("Total Rows: ", df.shape[0])

整合所有步骤


若你希望完整运行整个示例(从开始到结束),可在 Python 脚本中使用以下主函数(main function):

import asyncioasync def main():    config_spec = {        "bootstrap.servers""127.0.0.1:9123",    }    conn = await create_connection(config_spec)    table_path = fluss.TablePath("fluss", table_name)    pa_schema = pa.schema([("image", pa.binary())])    await create_table(conn, table_path, pa_schema)    await write_to_fluss(conn, table_path, pa_schema)    # sleep a little while to wait for Fluss tiering    sleep(60)    df = loading_into_pandas()    print(df.head())    conn.close()if __name__ == "__main__":    asyncio.run(main())



总结


Apache Fluss(孵化中)与 Lance 的集成,为实时多模态人工智能(AI)分析构建了强大的基础。


通过将 Fluss 的流存储能力与 Lance 针对 AI 优化的数据湖仓特性相结合,企业能够构建多模态 AI 应用,无缝利用实时数据与历史数据。


此次集成的核心要点如下:


1. 实时处理:以亚秒级延迟对流式传输和处理多模态数据,为对时间敏感的 AI 应用提供即时洞察与响应能力。


2. 统一架构:Fluss 作为实时层,Lance 提供高效的历史数据存储,二者共同构成完整的数据生命周期管理解决方案。


3. 多模态支持:能够处理图像、文本、向量等多种数据类型,使该技术栈成为现代多模态 AI 应用的理想选择。


4. 检索增强生成(RAG)优化:实时数据摄入确保 RAG 系统始终能获取最新信息,提升多模态 AI 生成响应的准确性与相关性。


5. 简易集成:如示例所示,搭建该数据流水线所需配置极少,且能轻松集成到现有机器学习(ML)工作流中。


随着多模态 AI 应用对数据新鲜度和洞察速度的要求不断提高,Fluss 与 Lance 的组合提供了稳健、可扩展的解决方案,填补了实时流数据与 AI 就绪型数据存储之间的空白。


无论你是构建推荐系统、欺诈检测系统,还是客户服务聊天机器人,该集成都能确保多模态 AI 模型既可以获取流数据的时效性,又能利用历史数据的深度上下文信息。



推荐阅读

图片

图片
图片
图片  点击阅读原文,跳转LanceDB GitHub 

【声明】内容源于网络
0
0
Lance & LanceDB
欢迎关注 Lance & LanceDB 技术公众号!Lance 是开源多模态数据湖格式,支持快速访问与高效存储。基于其构建的 LanceDB 是无服务器向量数据库。我们聚焦技术解读、实战案例,助你掌握 AI 数据湖前沿技术。
内容 19
粉丝 0
Lance & LanceDB 欢迎关注 Lance & LanceDB 技术公众号!Lance 是开源多模态数据湖格式,支持快速访问与高效存储。基于其构建的 LanceDB 是无服务器向量数据库。我们聚焦技术解读、实战案例,助你掌握 AI 数据湖前沿技术。
总阅读9
粉丝0
内容19