本文翻译自LanceDB Blog
作者:Wayne Wang@腾讯
随着生成式人工智能(AI)时代的到来,几乎所有数字应用都在整合多模态 AI 创新技术,以提升自身能力。
多模态 AI 应用的成功部署在很大程度上依赖于数据,对于企业决策系统而言尤其如此 —— 高质量、时效性强的数据是这类系统的关键所在。
例如,在客户服务和风险控制系统中,无法获取实时客户输入的多模态 AI 模型,将难以做出准确决策。
Apache Fluss(孵化中)正是为满足这一需求而生,它是一款专为实时分析打造的流存储产品。
作为湖仓一体(lakehouse)架构的实时数据层,Fluss 具备智能分层服务,可确保流存储与湖存储之间的无缝集成。该服务会自动、持续地将 Fluss 中的数据转换为湖仓一体格式,从而实现 “数据共享” 功能:
1. Fluss 作为湖仓一体的实时层:Fluss 能对流数据进行高吞吐量、低延迟的输入与处理,成为湖仓一体架构的实时前端。
2. 湖仓一体作为 Fluss 的历史层:湖仓一体提供经过优化的长期存储,数据实效性可达分钟级,是实时层的历史批处理数据基础。
作为 Fluss 生态的补充,Lance 是专为机器学习和多模态 AI 应用设计的下一代 AI 湖仓平台。它能高效处理多模态数据(文本、图像、向量等),并支持高性能查询,例如速度极快的向量检索(详见 LanceDB)。
结合 Lance 与 Fluss,解锁真正的实时多模态 AI 分析检索增强生成(Retrieval-Augmented Generation,简称 RAG)是当前主流的生成式 AI 框架,通过在生成过程中融入最新信息,显著提升了大型语言模型(Large Language Models,简称 LLMs)的能力。
在 RAG 流程中,向量检索技术负责理解用户的查询上下文,并检索相关信息输入至大型语言模型,为模型生成内容提供支撑。
而借助 Fluss 与 Lance 的组合,RAG 系统如今可实现以下核心能力:
1. 依托 Lance 强大的索引功能,对其存储的历史数据进行高效向量检索。
2. 针对 Fluss 中未建立索引的实时流数据,计算即时洞察(live insights)。
3. 融合来自上述两个数据源的结果,为大型语言模型提供兼具时效性与全面性的信息,进一步提升模型性能。
在本篇博客中,我们将演示具体操作流程:先将图像数据流式传入 Fluss,再从 Lance 数据集中加载数据并转换为 Pandas DataFrame。
通过这一方式,图像数据能以更便捷的形式被访问,从而更好地服务于后续机器学习工作流中的处理与分析环节。
MinIO 安装配置
步骤 1:在本地安装 MinIO
如需详细说明,请查看官方文档。
- 链接:https://docs.min.io/enterprise/aistor-object-store/installation/macos/
步骤 2:启动 MinIO 服务器
运行以下命令,并指定用于存储 MinIO 数据的本地路径:
export MINIO_REGION_NAME=us-west-1export MINIO_ROOT_USER=minioexport MINIO_ROOT_PASSWORD=minioadminminio server /tmp/minio-tmp
步骤 3:通过 Web 界面验证 MinIO 是否正常运行
当你的 MinIO 服务器成功启动并正常运行后,你将看到端点信息和登录凭据:
API: http://192.168.3.40:9000 http://127.0.0.1:9000RootUser: minioRootPass: minioadminWebUI: http://192.168.3.40:64217 http://127.0.0.1:64217RootUser: minioRootPass: minioadmin
步骤 4:创建存储桶
打开 Web 界面链接,使用上述凭据登录。你可以通过 Web 界面创建一个 Lance 存储桶。
Fluss 与 Lance 安装配置
步骤 1:在本地安装 Fluss
如需详细说明,请查看官方文档。
- 链接:https://fluss.apache.org/docs/install-deploy/deploying-local-cluster/
步骤 2:配置 Lance 数据湖
编辑
<FLUSS_HOME>/conf/server.yaml 文件,并添加以下配置:
datalake.format: lancedatalake.lance.warehouse: s3://lancedatalake.lance.endpoint: http://localhost:9000datalake.lance.allow_http: truedatalake.lance.access_key_id: miniodatalake.lance.secret_access_key: minioadmindatalake.lance.region: eu-west-1
此配置将 Lance 设为数据湖格式,同时将 MinIO 用作数据仓库。
步骤 3:启动 Fluss
<FLUSS_HOME>/bin/local-cluster.sh start
步骤 4:安装 Flink Fluss 连接器
从官网下载 Apache Flink 1.20 二进制包。下载 Fluss 连接器的 Jar 包,并将其复制到 Flink 的类路径 <FLINK_HOME>/lib 中。
步骤 5:配置 Flink Fluss 连接器
Flink Fluss 连接器使用 Apache Arrow Java API,而该 API 依赖直接内存。
需通过编辑
<FLINK_HOME>/conf/config.yaml 文件中的 taskmanager.memory.task.off-heap.size 参数,为 Flink Task Manager(任务管理器)配置足够的堆外内存,具体如下:
# Increase available task slots.numberOfTaskSlots: 5taskmanager.memory.task.off-heap.size: 128m
步骤 6:启动 Flink
通过以下命令在本地启动 Flink:
<FLINK_HOME>/bin/start-cluster.sh
步骤 7:验证 Flink 是否正常运行
打开浏览器,访问http://localhost:8081,确认集群处于运行状态。
启动 Fluss 分层服务
步骤 1:获取分层任务 Jar 包
下载 Fluss 分层任务 Jar 包。
下载链接:https://fluss.apache.org/downloads/
步骤 2:提交任务
./bin/flink run <path_to_jar>/fluss-flink-tiering-0.8.jar \--fluss.bootstrap.servers localhost:9123 \--datalake.format lance \--datalake.lance.warehouse s3://lance \--datalake.lance.endpoint http://localhost:9000 \--datalake.lance.allow_http true \--datalake.lance.secret_access_key minioadmin \--datalake.lance.access_key_id minio \--datalake.lance.region eu-west-1 \--table.datalake.freshness 30s
步骤 3:确认部署
在 Flink 界面中查看 Fluss 数据湖分层服务(Fluss Lake Tiering Service)任务。该任务启动后,你的本地分层流水线即可正常使用。
多模态数据处理
下面将通过一个 Python 代码示例,演示如何将图像数据集流式传输到 Fluss 中,随后将分层后的 Lance 数据集加载到 Pandas DataFrame(数据框)中,以便进行后续处理。
步骤 1:安装依赖项
安装所需的依赖项:
pip install python-for-fluss pyarrow pylance pandas
步骤 2:创建连接
使用 Fluss Python 客户端创建连接和表格:
import flussimport pyarrow as paasync def create_connection(config_spec):# Create connectionconfig = fluss.Config(config_spec)conn = await fluss.FlussConnection.connect(config)return connasync def create_table(conn, table_path, pa_schema):# Create a Fluss Schemafluss_schema = fluss.Schema(pa_schema)# Create a Fluss TableDescriptortable_descriptor = fluss.TableDescriptor(fluss_schema,properties={"table.datalake.enabled": "true","table.datalake.freshness": "30s",},)# Get the admin for Flussadmin = await conn.get_admin()# Create a Fluss tabletry:await admin.create_table(table_path, table_descriptor, True)print(f"Created table: {table_path}")except Exception as e:print(f"Table creation failed: {e}")
步骤 3:处理图像
import osdef process_images(schema: pa.Schema):# Get the current directory pathcurrent_dir = os.getcwd()images_folder = os.path.join(current_dir, "image")# Get the list of image filesimage_files = [filename for filename in os.listdir(images_folder)if filename.endswith((".png", ".jpg", ".jpeg"))]# Iterate over all images in the folder with tqdmfor filename in tqdm(image_files, desc="Processing Images"):# Construct the full path to the imageimage_path = os.path.join(images_folder, filename)# Read and convert the image to a binary formatwith open(image_path, 'rb') as f:binary_data = f.read()image_array = pa.array([binary_data], type=pa.binary())# Yield RecordBatch for each imageyield pa.RecordBatch.from_arrays([image_array], schema=schema)
该 process_images 函数是我们数据转换流程的核心。它负责遍历指定目录下的图像文件,读取每个图像的数据,并将其转换为二进制格式的 PyArrow RecordBatch(记录批次)对象。
步骤 4:写入 Fluss
write_to_fluss 函数从 process_images 生成器创建一个 RecordBatchReader(记录批次读取器),并将生成的数据写入 Fluss 的 fluss.images_minio 表中。
async def write_to_fluss(conn, table_path, pa_schema):# Get table and create writer‘table = await conn.get_table(table_path)append_writer = await table.new_append_writer()try:print("\n--- Writing image data ---")for record_batch in process_images(pa_schema):append_writer.write_arrow_batch(record_batch)except Exception as e:print(f"Failed to write image data: {e}")finally:append_writer.close()
步骤 5:检查任务完成情况
稍等片刻,待分层任务结束后,进入 MinIO 界面,你将在 lance 存储桶中看到 Lance 数据集。你可以借助 Lance 的 to_pandas API,通过 Pandas 加载该数据集。
步骤 6:加载到 Pandas 中
当 Fluss 实时将数据写入 Lance 时,你可以从 MinIO 中的 Lance 数据集进行数据加载。
此时,你可借助任何与 Lance 集成的工具,在各类机器学习 / 人工智能(ML/AI)应用中使用这些数据,包括用于模型训练的 PyTorch(详见 Lance + PyTorch 集成文档)、用于检索增强生成(RAG)集成的 LangChain,以及用于混合搜索的 LanceDB。
你也可以使用 Lance 的 Blob API,以高效、底层的方式访问多模态图像数据。
Lance + PyTorch 集成文档:https://lancedb.github.io/lance/integrations/pytorch/
下面提供一个简单的 loading_into_pandas 示例,说明如何使用 Lance 内置的 to_pandas API,将 Lance 数据集中的图像数据加载到 Pandas DataFrame(数据框)中,从而让图像数据可通过 Pandas 用于后续的下游 AI分析。
import lanceimport pandas as pddef loading_into_pandas(table_name):dataset = lance.dataset("s3://lance/fluss/" + table_name + ".lance",storage_options={"access_key_id": "minio","secret_access_key": "minioadmin","endpoint": "http://localhost:9000","allow_http": "true",},)df = dataset.to_table().to_pandas()print("Pandas DataFrame is ready")print("Total Rows: ", df.shape[0])
整合所有步骤
若你希望完整运行整个示例(从开始到结束),可在 Python 脚本中使用以下主函数(main function):
import asyncioasync def main():config_spec = {"bootstrap.servers": "127.0.0.1:9123",}conn = await create_connection(config_spec)table_path = fluss.TablePath("fluss", table_name)pa_schema = pa.schema([("image", pa.binary())])await create_table(conn, table_path, pa_schema)await write_to_fluss(conn, table_path, pa_schema)# sleep a little while to wait for Fluss tieringsleep(60)df = loading_into_pandas()print(df.head())conn.close()if __name__ == "__main__":asyncio.run(main())
总结
Apache Fluss(孵化中)与 Lance 的集成,为实时多模态人工智能(AI)分析构建了强大的基础。
通过将 Fluss 的流存储能力与 Lance 针对 AI 优化的数据湖仓特性相结合,企业能够构建多模态 AI 应用,无缝利用实时数据与历史数据。
此次集成的核心要点如下:
1. 实时处理:以亚秒级延迟对流式传输和处理多模态数据,为对时间敏感的 AI 应用提供即时洞察与响应能力。
2. 统一架构:Fluss 作为实时层,Lance 提供高效的历史数据存储,二者共同构成完整的数据生命周期管理解决方案。
3. 多模态支持:能够处理图像、文本、向量等多种数据类型,使该技术栈成为现代多模态 AI 应用的理想选择。
4. 检索增强生成(RAG)优化:实时数据摄入确保 RAG 系统始终能获取最新信息,提升多模态 AI 生成响应的准确性与相关性。
5. 简易集成:如示例所示,搭建该数据流水线所需配置极少,且能轻松集成到现有机器学习(ML)工作流中。
随着多模态 AI 应用对数据新鲜度和洞察速度的要求不断提高,Fluss 与 Lance 的组合提供了稳健、可扩展的解决方案,填补了实时流数据与 AI 就绪型数据存储之间的空白。
无论你是构建推荐系统、欺诈检测系统,还是客户服务聊天机器人,该集成都能确保多模态 AI 模型既可以获取流数据的时效性,又能利用历史数据的深度上下文信息。
推荐阅读
点击阅读原文,跳转LanceDB GitHub


