01
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-
Master 集群: 元数据管理、缓存调度、一致性保障
-
Worker 节点: 数据缓存、I/O 处理、任务执行
-
Client SDK: 多语言客户端,支持Rust、Fuse、Java、Python
-
Job Manager: 分布式任务调度和管理
-
Metrics 系统: 实时监控和性能分析
02
# 路径保持一致,便于管理和维护bin/cv mount s3://bucket/data /bucket/data --mnt-type cst
-
路径结构清晰的数据湖场景
-
需要直观路径映射的生产环境
-
多团队协作的数据平台
# 灵活路径映射,支持复杂的路径变换bin/cv mount s3://complex-bucket/deep/nested/path /simple/data --mnt-type arch
-
复杂的存储层次结构
-
需要路径抽象的场景
-
多云存储统一访问
bin/cv mount \s3://bucket/warehouse/tpch_500g.db/orders \/bucket/warehouse/tpch_500g.db/orders \--ttl-ms 24h \--ttl-action delete \--replicas 3 \--block-size 128MB \--consistency-strategy always \--storage-type ssd \-c s3.endpoint_url=https://s3.ap-southeast-1.amazonaws.com \-c s3.credentials.access=access_key \-c s3.credentials.secret=secret_key \-c s3.region_name=ap-southeast-1
|
|
|
|
|
|
|---|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bin/cv mountbin/cv unmount /bucket/warehouse/tpch_500g.db/orders
03
bin/cv load s3:/bucket/warehouse/critical-datasetbin/cv load s3:/bucket/warehouse/critical-dataset -w
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
04
bin/cv mount s3://bucket/path /bucket/path --consistency-strategy=none
-
适用场景: 静态数据、归档数据、只读数据集
-
性能: ⭐⭐⭐⭐⭐ (最快)
-
一致性: ⭐⭐ (依赖TTL)
bin/cv mount s3://bucket/path /bucket/path --consistency-strategy=always
-
适用场景: 经常更新的业务数据、关键业务系统
-
性能: ⭐⭐⭐ (有开销)
-
一致性: ⭐⭐⭐⭐⭐ (强一致性)
bin/cv mount s3://bucket/path /bucket/path \--consistency-strategy=period \--check-interval=5m
-
适用场景: 更新频率可预期的数据
-
性能: ⭐⭐⭐⭐ (较好)
-
一致性: ⭐⭐⭐⭐ (定期保证)
curl -s http://master:9001/metrics | grep -E "(cache_hits|cache_misses)"
client_mount_cache_hits{id="3108497238"} 823307client_mount_cache_misses{id="3108497238"} 4380
05
bin/cv mount s3://datasets/imagenet /datasets/imagenet \--storage-type=mem \--block-size=1GB \--replicas=2 \
bin/cv mount s3://model/bert-large /models/bert-large \--ttl-ms=none \--consistency-strategy=always \bin/cv mount s3://inference/input /inference/input \--storage-type=ssd \--ttl-ms=1h \--consistency-strategy=none
# PyTorch 训练脚本import torchfrom torch.utils.data import Dataset, DataLoaderfrom torchvision import transformsfrom PIL import Imageimport osclass CurvineImageDataset(Dataset):def __init__(self, root_dir, transform=None):"""通过 FUSE 挂载点直接访问 Curvine 中的数据root_dir: FUSE 挂载点路径,如 /curvine-fuse/datasets/imagenet"""self.root_dir = root_dirself.transform = transformself.image_paths = []# 直接遍历 FUSE 挂载的目录for class_dir in os.listdir(root_dir):class_path = os.path.join(root_dir, class_dir)if os.path.isdir(class_path):for img_file in os.listdir(class_path):if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):self.image_paths.append(os.path.join(class_path, img_file))def __len__(self):return len(self.image_paths)def __getitem__(self, idx):# 通过标准文件操作访问数据,享受 Curvine 缓存加速img_path = self.image_paths[idx]image = Image.open(img_path).convert('RGB')if self.transform:image = self.transform(image)# 从路径提取标签label = os.path.basename(os.path.dirname(img_path))return image, label# 使用示例transform = transforms.Compose([transforms.Resize((224, 224)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])# 直接使用 FUSE 挂载点的路径dataset = CurvineImageDataset(root_dir='/curvine-fuse/datasets/imagenet/train',transform=transform)dataloader = DataLoader(dataset,batch_size=64,shuffle=True,num_workers=8,pin_memory=True)# 训练循环for epoch in range(num_epochs):for batch_idx, (data, targets) in enumerate(dataloader):# 数据通过 FUSE 自动从 Curvine 缓存加载# 享受接近内存的访问速度outputs = model(data.cuda())loss = criterion(outputs, targets.cuda())# ... 训练逻辑
import tensorflow as tfimport osdef create_curvine_dataset(data_dir, batch_size=32):"""通过 FUSE 挂载点创建 TensorFlow 数据管道data_dir: FUSE 挂载的数据目录"""# 直接使用标准文件 API 访问 FUSE 挂载的数据def load_and_preprocess_image(path):# TensorFlow 通过 FUSE 透明访问 Curvine 缓存image = tf.io.read_file(path)image = tf.image.decode_jpeg(image, channels=3)image = tf.image.resize(image, [224, 224])image = tf.cast(image, tf.float32) / 255.0return image# 扫描 FUSE 挂载目录中的文件image_paths = []labels = []for class_name in os.listdir(data_dir):class_dir = os.path.join(data_dir, class_name)if os.path.isdir(class_dir):for img_file in os.listdir(class_dir):if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):image_paths.append(os.path.join(class_dir, img_file))labels.append(class_name)# 创建数据集path_ds = tf.data.Dataset.from_tensor_slices(image_paths)label_ds = tf.data.Dataset.from_tensor_slices(labels)# 应用预处理image_ds = path_ds.map(load_and_preprocess_image,num_parallel_calls=tf.data.AUTOTUNE)# 组合数据和标签dataset = tf.data.Dataset.zip((image_ds, label_ds))return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)# 使用示例train_dataset = create_curvine_dataset('/curvine-fuse/datasets/imagenet/train')val_dataset = create_curvine_dataset('/curvine-fuse/datasets/imagenet/val')# 模型训练model.fit(train_dataset,validation_data=val_dataset,epochs=50,callbacks=[tf.keras.callbacks.ModelCheckpoint('/curvine-fuse/models/checkpoints/'),tf.keras.callbacks.TensorBoard(log_dir='/curvine-fuse/logs/')])```##### 3. Jupyter Notebook 中的数据科学工作流```pythonimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltfrom sklearn.model_selection import train_test_splitimport os# 直接读取 FUSE 挂载的数据文件# Curvine 自动处理缓存和数据一致性data_path = '/curvine-fuse/datasets/tabular/customer_data.csv'df = pd.read_csv(data_path)# 数据探索print(f"数据集大小: {df.shape}")print(f"数据类型:\n{df.dtypes}")# 数据预处理(结果可以保存回 FUSE 挂载点)processed_data_path = '/curvine-fuse/datasets/processed/customer_data_clean.csv'df_clean = df.dropna()df_clean.to_csv(processed_data_path, index=False)# 模型训练和保存from sklearn.ensemble import RandomForestClassifierimport joblib# 训练模型model = RandomForestClassifier(n_estimators=100, random_state=42)model.fit(X_train, y_train)# 模型保存到 FUSE 挂载点,自动同步到 Curvine 集群model_path = '/curvine-fuse/models/customer_classification_v1.joblib'joblib.dump(model, model_path)# 实验结果记录results = {'accuracy': model.score(X_test, y_test),'timestamp': pd.Timestamp.now(),'model_path': model_path}results_df = pd.DataFrame([results])results_df.to_csv('/curvine-fuse/experiments/results.csv',mode='a', header=False, index=False)
06
<!-- Curvine FileSystem 实现 --><property><name>fs.cv.impl</name><value>io.curvine.CurvineFileSystem</value></property><!-- 单集群配置 --><property><name>fs.cv.master_addrs</name><value>master1:8995,master2:8995,master3:8995</value></property>
<!-- 集群1:生产环境 --><property><name>fs.cv.production.master_addrs</name><value>prod-master1:8995,prod-master2:8995,prod-master3:8995</value></property><!-- 集群2:开发环境 --><property><name>fs.cv.development.master_addrs</name><value>dev-master1:8995,dev-master2:8995</value></property><!-- 集群3:机器学习专用集群 --><property><name>fs.cv.ml-cluster.master_addrs</name><value>ml-master1:8995,ml-master2:8995,ml-master3:8995</value></property>
🔄 UFS透明代理
为了更好地支持现有Java应用无缝接入Curvine缓存,我们提供了UFS透明代理解决方案。该方案的核心优势是零代码修改,让现有应用可以立即享受Curvine的缓存加速效果。
✨ 透明代理核心特性
- 🚫 零代码修改
保留原有所有接口不变,业务代码无需任何修改 - 🔍 智能路径识别
仅在文件打开时判断路径是否已挂载到Curvine - ⚡ 自动缓存加速
已挂载路径自动启用缓存加速,未挂载路径走原生S3访问 - 🔄 平滑切换
支持在运行时动态切换是否使用缓存,无需重启应用 -
🛠️ 配置方式
只需要替换Hadoop配置中的S3FileSystem实现类:
<!-- 传统S3访问配置 --><!--<property><name>fs.s3a.impl</name><value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property>--><!-- 替换为Curvine透明代理 --><property><name>fs.s3a.impl</name><value>io.curvine.S3AProxyFileSystem</value></property><property><name>fs.cv.impl</name><value>io.curvine.CurvineFileSystem</value></property><!-- Curvine集群配置 --><property><name>fs.curvine.master_addrs</name><value>master1:8995,master2:8995,master3:8995</value></property>
🔧 工作原理
🚀 使用示例
无需修改任何业务代码,原有代码直接享受加速:
// 业务代码完全不变!Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create("s3a://my-bucket/"), conf);// 这个路径如果已挂载到Curvine,自动享受缓存加速FSDataInputStream input = fs.open(new Path("s3a://my-bucket/warehouse/data.parquet"));// 这个路径如果未挂载,走原生S3访问FSDataInputStream input2 = fs.open(new Path("s3a://my-bucket/archive/old-data.parquet"));
Spark/MapReduce代码示例:
// Spark代码无需任何修改Dataset<Row> df = spark.read().option("header", "true")// 如果/warehouse/路径已挂载,自动使用缓存加速.csv("s3a://data-lake/warehouse/customer_data/");df.groupBy("region").agg(sum("revenue").alias("total_revenue")).orderBy(desc("total_revenue")).show(20);
Python PySpark示例:
# Python代码也无需修改from pyspark.sql import SparkSessionfrom pyspark.sql.functions import sum, descspark = SparkSession.builder.appName("TransparentCache").getOrCreate()# 自动判断是否使用缓存df = spark.read \.option("header", "true") \.csv("s3a://data-lake/analytics/events/")result = df.groupBy("event_type") \.agg(sum("count").alias("total_events")) \.orderBy(desc("total_events"))result.show()
spark-submit \--class com.example.SparkApp \--master yarn \--deploy-mode cluster \--conf spark.hadoop.fs.cv.impl=io.curvine.CurvineFileSystem \--conf spark.hadoop.fs.cv.master_addrs=master1:8995,master2:8995,master3:8995 \--conf spark.sql.adaptive.enabled=true \--jars curvine-hadoop-client.jar \app.jar
// Scala 示例val spark = SparkSession.builder().appName("Curvine Demo").config("spark.hadoop.fs.cv.impl", "io.curvine.CurvineFileSystem").getOrCreate()// 直接使用 cv:// 协议访问缓存数据val df = spark.read.option("multiline", "true").json("cv://production/warehouse/events/2024/01/01/")df.groupBy("event_type").count().show()// 多集群访问val prodData = spark.read.parquet("cv://production/warehouse/sales/")val mlData = spark.read.parquet("cv://ml-cluster/features/user_profiles/")
//python 示例from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("Curvine Python Demo") \.config("spark.hadoop.fs.cv.impl", "io.curvine.CurvineFileSystem") \.config("spark.hadoop.fs.cv.master_addrs", "master1:8995,master2:8995") \df = spark.read.option("header", "true") \.csv("cv://warehouse/customer_data/")result = df.groupBy("region") \.agg({"revenue": "sum", "orders": "count"}) \.orderBy("sum(revenue)", ascending=False)result.show(20)
--class main.scala.Tpch \--name tpch_demo \--conf spark.hadoop.fs.cv.default.master_addrs=master1:8995,master2:8995 \--conf spark.sql.extensions=io.curvine.spark.CurvineSparkExtension \
// Flink Table API 集成示例TableEnvironment tableEnv = TableEnvironment.create(settings);// 配置 Curvine FileSystemConfiguration config = new Configuration();config.setString("fs.cv.impl", "io.curvine.CurvineFileSystem");config.setString("fs.cv.master_addrs", "master1:8995,master2:8995");// 创建 Curvine 表tableEnv.executeSql("CREATE TABLE user_events (" +" user_id BIGINT," +" event_type STRING," +" timestamp_ms BIGINT," +" properties MAP" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'cv://streaming/events/'," +" 'format' = 'json'" +")");// 实时查询享受缓存加速Table result = tableEnv.sqlQuery("SELECT userid, COUNT(*) as eventcount " +"FROM user_events " +"WHERE timestampms > UNIXTIMESTAMP() * 1000 - 3600000 " +"GROUP BY user_id"
07
# 热数据:高频访问,使用内存缓存bin/cv mount s3://bucket/hot /bucket/hot \--storage-type=mem \--replicas=3 \--ttl-ms=1d \--ttl-action=delete# 温数据:定期访问,使用 SSD 缓存bin/cv mount s3://bucket/warm /bucket/warm \--storage-type=ssd \--replicas=2 \--ttl-ms=7d \--ttl-action=delete# 冷数据:低频访问,使用磁盘缓存bin/cv mount s3://bucket/cold /bucket/cold \--storage-type=disk \--replicas=1 \--ttl-ms=30d \--ttl-action=delete
# 小文件密集型(如日志、配置)bin/cv mount s3://bucket/logs /bucket/logs \--block-size=4MB \--consistency-strategy=none \# 大文件型(如视频、模型)bin/cv mount s3://bucket/models /bucket/models \--block-size=1GB \--consistency-strategy=always \# 分析型数据(如 Parquet)bin/cv mount s3://bucket/analytics /bucket/analytics \--block-size=128MB \--consistency-strategy=none \
08
-
🚀 性能提升: 10-100倍的访问加速,显著降低数据访问延迟
-
💰 成本优化: 减少云存储访问费用,提高计算资源利用率
-
🛡️ 数据安全: 多重一致性保障,确保数据准确性和完整性
-
🌐 生态友好: 与主流大数据和 AI 框架无缝集成

