大数跨境
0
0

复杂业务隔离需求下的 NebulaGraph 集群迁移与版本升级全攻略

复杂业务隔离需求下的 NebulaGraph 集群迁移与版本升级全攻略 NebulaGraph
2025-04-25
2
导读:如何使用 NebulaGraph Spark Connector 进行跨版本集群迁移?

导读:


在数据库的运维实践中,集群迁移与版本升级是及时满足业务需求的关键环节。这篇经验贴分享了使用 NebulaGraph Spark Connector 将多个业务的集群从 NebulaGraph 2.6.2 向 3.8.0 版本迁移,并部署到各自对应新集群中的完整方案,涉及业务隔离集群搭建、跨版本数据转运、脏数据处理等核心要点。

一、背景
  • 多个业务使用同一套 NebulaGraph 集群,当其中某一个业务有大量耗时查询或者大量数据加载时,这时此集群的压力可能会急剧上升,导致所有的业务出现服务性能极度下降的情况;
  • NebulaGraph 使用古早的 2.6.2 版本,很多业务需求较难快速实现。

 二、需求
  • 为每个业务搭建一套属于自己的 NebulaGraph 集群;
  • 将旧集群中的各个业务的数据迁移到自己业务对应的新集群中;
  • 新集群需要使用 3.8.0 版本 NebulaGraph;
  • 旧的 NebulaGraph 集群下线。

 三、数据迁移工具选型
通过 NebulaGraph 官网查看有没有现成的数据迁移工具,能直接实现我们的需求,调研如下:
  • 社区版的 NebulaGraph BR:限制是版本为 3.x 才能用;只能恢复到原集群,不可跨集群等等。遂放弃。
  • NebulaGraph Importer:导入数据使用。这个之前没怎么用过,应该是使用官方提供的工具执行相应的命令将数据发送到对应的集群。考虑到失败无法重试的缘故,遂放弃。
  • NebulaGraph Exchange:这个主要考虑到配置文件的繁琐性,且我们集群上的 Spark 客户端一般不暴露出来直接使用,都是通过 dolphinscheduler 使用的。遂放弃。
  • NebulaGraph Spark Connector:连接器使用,通过官网的 demo 发现这个可以将 NebulaGraph 数据写成 DF 格式,还可以将 DF 格式数据写入 NebulaGraph;且通过 dolphinscheduler 好控制。因此,我们决定使用 NebulaGraph Spark Connector 进行数据迁移。
image

四、具体步骤
(一)NebulaGraph to HDFS
运行 NebulaGraph to HDFS 对应的工作流,将 NebulaGraph 数据备份到 HDFS。
edge 类型导入 HDFS 为例:
package nebula

import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader, NebulaDataFrameWriter}

import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

import org.apache.spark.sql.SparkSession

object Edge2HDFS {

  def main(args: Array[String]): Unit = {

    try {

      if (args.length < 4) {

        println("Usage: <space> <edge> <graph_host> <hdfs path>")

        System.exit(-1)

      }

      val space = args(0)

      val edge = args(1)

      val graph_host = args(2)

      val hdfsPath = args(3)

      val spark:SparkSession =SparkSession.builder()

        .appName(space + "_" + edge)

        .getOrCreate()

      val config = NebulaConnectionConfig

        .builder()

        .withMetaAddress(graph_host)

        .withConenctionRetry(2)

        .withExecuteRetry(2)

        .withTimeout(36000000)

        .build()

      val edge_config = ReadNebulaConfig

        .builder()

        .withSpace(space)

        .withLabel(edge)

        .withNoColumn(false)

        .withPartitionNum(400)

        .build()

      val edges = spark.read.nebula(config, edge_config).loadEdgesToDF()

      edges.repartition(400).write.

        format("json").

        mode("overwrite").save(hdfsPath)

    } catch {

      case e: Exception => {

        // 在这里捕获异常,并进行相应的处理

        // 例如,记录异常信息、进行异常恢复操作等

        e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理

      }

    }

}

}

(二)HDFS to NebulaGraph
运行 HDFS to NebulaGraph, 对应的工作流把备份在 HDFS 上的数据加载到 3.8.0 的 NebulaGraph.
edge 类型导入 NebulaGraph 为例:
package com.nebula

import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter

import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaEdgeConfig}

import org.apache.spark.sql.SparkSession

object Edge2nebula {

  def main(args: Array[String]): Unit = {

    try {

      if (args.length < 5) {

        println("Usage: <space> <edge> <graph_host> <hdfsPath> <meta_host>")

        System.exit(-1)

      }

      val space = args(0)

      val edge = args(1)

      val graph_host = args(2)

      val hdfsPath = args(3)

      val meta_host = args(4)

      val _SPARK_SESSION: SparkSession = SparkSession.builder()

        .appName(edge)

        .getOrCreate()

      val config = NebulaConnectionConfig

        .builder()

        .withMetaAddress(meta_host)

        .withGraphAddress(graph_host)

        .withConenctionRetry(2)

        .withTimeout(36000000)

        .build()

      // HDFS 目录路径

      val edgeHdfsPath = s"hdfs://HACluster" + hdfsPath + "/*.json"

      var edgeDF = _SPARK_SESSION.read

        .format("json")

        .load(edgeHdfsPath)

      edgeDF = edgeDF.withColumnRenamed("_srcId""_vertexId")

      edgeDF = edgeDF.withColumnRenamed("_dstId""dstId")

      edgeDF = edgeDF.withColumnRenamed("_rank""rank")

      edgeDF.show(10)

      val edgeCount = edgeDF.count()

      // 打印记录总数

      println(s"Total number of rows: $edgeCount")

      val nebulaWriteContainConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig

        .builder()

        .withSpace(space)

        .withEdge(edge)

        .withSrcIdField("_vertexId")

        .withSrcPolicy(null)

        .withDstIdField("dstId")

        .withDstPolicy(null)

        .withRankField("rank")

        .withUser("root")

        .withPasswd("nebula")

        .withBatch(50)

        .build()

      edgeDF.write.nebula(config, nebulaWriteContainConfig).writeEdges()

    } catch {

      case e: Exception => {

        // 在这里捕获异常,并进行相应的处理

        // 例如,记录异常信息、进行异常恢复操作等

        e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理

      }

   }

}

}
将代码写好编译成 jar 包上传到 dolphinscheduler 并通过 dolphinscheduler 创建对应的工作流就可以用了。
贴一下 Spark 参数:
(三)数据量与准确性检验
数据量检验可通过执行SHOW STATS 命令,对比新旧集群的统计信息,确保数据完整无缺失。准确性验证则通过在新旧 NebulaGraph 集群上执行相同的 nGQL 查询语句,对比返回结果是否一致。通过这两种方式,全面验证数据迁移的完整性和准确性,确保业务在新集群上可正常运行。

五、遇到的问题
1. 读写时可能会遇到
com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out
这是我们设置的超时时间太短了。我们把withTimeout(36000000)置得大一些就好了。
2. 导入数据时可能会遇到
RaftPart buffer is full
这是我们并发设置得太大了,调整 Spark 参数,降低 Executor 数量,batch 数量等,减小并发。
image
3. 如果遇到 NebulaGraph 中创建 tag 和 edge 使用的属性类型和数据类型不一致的脏数据的话,就无法正常读取数据到 HDFS.
这时候需要下载官网的 nebula-spark-connector 源码调整相应的部分,编译打包上传到自己的 maven 库引用,然后将脏数据找到,进行手动处理,再重新跑一遍 NebulaGraph to HDFS 对应的工作流。

六、总结
通过以上步骤,可使用 NebulaGraph Spark Connector 顺利实现复杂业务隔离需求下的 NebulaGraph 集群迁移与版本升级。NebulaGraph Spark Connector 是一个强大的生态工具,能够将 NebulaGraph 中的数据读取为 DF 格式,并支持将 DF 数据写入 NebulaGraph, 具有良好的扩展性和易用性,非常适合大规模数据迁移任务。希望本文能为其他有类似需求的社区用户提供参考与帮助。

可参考官方文档,进一步了解 NebulaGraph Spark Connector:
https://docs.nebula-graph.com.cn/3.6.0/connector/nebula-spark-connector/



以上数据迁移实操经验,首发于 NebulaGraph 论坛,可点击「阅读原文」直达该帖,期待大家在论坛与作者进行更多交流~

开源之夏 x NebulaGraph 项目已发布,等你解锁图数据库的新世界~
🫱开源之夏|从 NebulaGraph 开启你的图数据库开源之旅!


如果你觉得 NebulaGraph能帮到你,或者你只是单纯支持开源精神,可以在 GitHub 上为 NebulaGraph 点个 Star!每一个 Star 都是对我们的支持和鼓励✨

https://github.com/vesoft-inc/nebula



扫码添加

 可爱星云 

技术交流

资料分享

NebulaGraph 用户案例

风控场携程Airwallex众安保险中国移动Akulaku邦盛科技360数科BOSS直聘金蝶征信快手青藤云安全

平台建设:博睿数据携程众安科技微信OPPOvivo美团百度爱番番携程金融普适智能BIGO

知识图谱:中医药大学企查查腾讯音乐中科大脑泰康在线苏宁微澜同花顺携程酒店

数据血缘:波克城市微众银行携程金融

智能运维:58同城中亦安图

✨ NebulaGraph 推荐阅读

【声明】内容源于网络
0
0
NebulaGraph
一个开源的分布式图数据库
内容 731
粉丝 0
NebulaGraph 一个开源的分布式图数据库
总阅读333
粉丝0
内容731