
我是来自企查查的技术团队工程师,和 Nebula 相识已久。本文是根据自己的使用历程分享的数据导入工具选择经验,希望对大家有所帮助。
>>>>
前言
Nebula 目前作为较为成熟的产品,已经有着很丰富的生态。数据导入的维度而言就已经提供了多种选择。有大而全的 Nebula Exchange, 小而精简的 Nebula Importer, 还有为 Spark / Flink 引擎提供的 Nebula Spark Connector 和 Nebula Flink Connector。
在众多的导入方式中,究竟哪种较为方便呢?
使用场景介绍:
Nebula Exchange
需要将来自 Kafka、Pulsar 平台的流式数据, 导入 Nebula Graph 数据库
需要从关系型数据库(如 MySQL)或者分布式文件系统(如 HDFS)中读取批式数据
需要将大批量数据生成 Nebula Graph 能识别的 SST 文件
Nebula Importer
Importer 适用于将本地 CSV 文件的内容导入至 Nebula Graph 中
Nebula Spark Connector:
在不同的 Nebula Graph 集群之间迁移数据
在同一个 Nebula Graph 集群内不同图空间之间迁移数据
Nebula Graph 与其他数据源之间迁移数据
结合 Nebula Algorithm 进行图计算
Nebula Flink Connector
在不同的 Nebula Graph 集群之间迁移数据
在同一个 Nebula Graph 集群内不同图空间之间迁移数据
Nebula Graph 与其他数据源之间迁移数据
以上摘自 Nebula 官方文档:
https://docs.nebula-graph.com.cn/2.6.2/1.introduction/1.what-is-nebula-graph/
总体来说,Nebula Exchange 大而全,可以和大部分的存储引擎结合,导入到 Nebula 中,但是需要部署 Spark 环境。

Nebula Importer 使用简单,所需依赖较少,但需要自己提前生成数据文件,配置好 schema 一劳永逸,但是不支持断点续传,适合中等数据量的情况。
Spark / Flink Connector 则需要和流批数据结合。
不同的场景选择不同的工具。新人使用 Nebula 在导入数据时,建议使用 Nebula Importer 这款数据导入工具,因为它操作简单,能够快速上手。
Nebula Importer 的使用
在我们刚接触 Nebula Graph 时,因为生态不够完善, 加上只有部分业务迁移到 Nebula 上,导致我们对 Nebula Graph 数据的导入不管全量还是增量都是采用 Hive 表推到 Kafka,消费 Kafka 批量写入 Nebula Graph 的方式。后来随着越来越多的数据和业务切换到 Nebula Graph,导入的数据效率问题愈发严峻。导入时长的增加,使得业务高峰期时仍然在全量的数据导入,这是不可接受的。
针对以上问题,在尝试 Nebula Spark Connector 和 Nebula Importer 之后,从便于维护和迁移等多方面考虑,决定采用 Hive table → CSV → Nebula Server → Nebula Importer 的方式进行全量的导入,整体耗时时长也明显缩短。
系统环境
[root@nebula-server-prod-05 importer]# lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2499.998
BogoMIPS: 4999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Disk:SSD
Memory: 128G
集群环境
Nebula Version: v2.6.1
部署方式: RPM
集群规模: 三副本,六节点
数据规模
+---------+--------------------------+-----------+
| "Space" | "vertices" | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges" | 722490436 |
+---------+--------------------------+-----------+
Nebula Importer 配置
# Graph版本,连接2.x时设置为v2。
version: v2
description: Relation Space import data
# 是否删除临时生成的日志和错误数据文件。
removeTempFiles: false
clientSettings:
# nGQL语句执行失败的重试次数。
retry: 3
# Nebula Graph客户端并发数。
concurrency: 5
# 每个Nebula Graph客户端的缓存队列大小。
channelBufferSize: 1024
# 指定数据要导入的Nebula Graph图空间。
space: Relation
# 连接信息。
connection:
user: root
password: ******
address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669
postStart:
# 配置连接Nebula Graph服务器之后,在插入数据之前执行的一些操作。
commands: |
# 执行上述命令后到执行插入数据命令之间的间隔。
afterPeriod: 1s
preStop:
# 配置断开Nebula Graph服务器连接之前执行的一些操作。
commands: |
# 错误等日志信息输出的文件路径。
logPath: /mnt/csv_file/prod_relation/err/test.log
....
由于篇幅只展示些全局相关的配置,点边相关的配置较多,不再展开,详情可以参考 https://github.com/vesoft-inc/nebula-importer。
设置 Crontab,Hive 生成表之后传输到 Nebula Server, 在夜间流量较低的时候跑起 Nebula Importer 任务:
50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log
总共耗时 2h, 在 6 点左右完成全量数据的导入。
部分 log 如下,导入速度最高维持在 200000/s 左右
2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)
然后在 7 点,根据时间戳,重新消费 Kafka 导入当天凌晨到 7 点的增量数据, 防止 t+1 的全量数据覆盖当天的增量数据。
50 07 15 * * python3 /mnt/code/consumer_by_time/relation_consumer_by_timestamp.py
增量的消费大概耗时 10-15 min。
根据 MD5 对比之后得到的增量数据,导入 Kafka 中,实时消费 Kafka 的数据,确保数据的延迟不超过 1 分钟。
另外长时间的实时可能会有非预期的数据问题出现而未发现,所以每 30 天会导入一次全量数据,也就是上面介绍的 Nebula Importer 导入。然后给 Space 的点边添加 TTL=35天 确保未及时更新的数据会被 Filter 和后续回收。
Nebula CSV 数据导入(nebula-importer) 你该知道的事项 https://discuss.nebula-graph.com.cn/t/topic/361 这里提到了关于 CSV 导入常遇到的问题,大家可以参考下。另外根据自身的经验,我这边也有几点建议:
关于并发度,问题中有提到,这个 Concurrency 指定为你的 CPU Cores 就可以, 表示起多少个 client 连接 Nebula Server。 实际操作中,要去 trade off 导入速度和服务器压力的影响。在我们这边测试,如果并发过高,会导致磁盘 IO 过高,引发设置的一些告警,所以不建议一下把并发拉太高。可以根据实际业务进行测试,然后再做权衡。
Nebula Importer 并不能断点续传,如果出现错误,需要手动处理。在实际操作中,我们会程序分析 Nebula Importer 的 log,然后根据情况进行处理。如果哪部分数据出现了非预期的错误,会告警通知,人工介入,防止出现意外。
Hive 生成表之后传输到 Nebula Server, 这部分任务实际耗时是和 Hadoop 资源情况密切相关的。有可能会出现资源不够而导致 Hive 和 CSV 表生成时间滞缓,而 Nebula Importer 正常在跑的情况下,这部分需要提前做好预判。我们这边是根据 Hive 任务结束时间和 Nebula Importer 任务开始时间做对比,判断是否需要 Nebula Importer 的进程正常运行。
🔔 由于微信对外链的跳转限制,上文链接无法直接点击,请大家点击下文【阅读原文】查看原文~
🧐 想要来交流图数据库技术吗?关注公众号后发送“加群”,Nebula 迷人小姐姐拉你进群~

🙋♂️ 喜欢本文的话,来个分享、👍 赞、在看
谢谢!

