你是否也经历过这样的数据工作流程:导出CSV、用read_csv读取、修复数据类型、优化内存占用,然后周而复始?在小规模数据上这套流程确实可行,但随着数据量增长,问题就接踵而至——文件体积膨胀、数据类型漂移、加载时间越来越长。
今天我要告诉你一个事实:对于现代Python数据工作来说,Parquet就是新的CSV。它保留了相同的便利性,同时带来了显著的性能提升和安全性保障。让我为你详细解析为什么应该切换,以及如何无缝迁移。
为什么CSV在大数据场景下捉襟见肘
CSV的最大优势是简单易懂,但这也是它的致命陷阱:
-
缺乏类型系统:所有数据都被当作文本读取,然后你需要手动转换类型——处理日期、可空整数和分类数据时尤其痛苦 -
默认不压缩:虽然可以压缩,但读取压缩CSV会破坏并行处理能力 -
行式存储布局:即使只需要三列,也不得不读取整个文件 -
缺少元数据:编码方式、分隔符、表头格式等都需要猜测或记录
当数据量达到数千万行时,这些限制就成了日常工作的沉重负担。
Parquet为何胜出(特别是在Python生态中)
Parquet是专为分析场景设计的列式、强类型、压缩存储格式。
实际使用中:
-
文件体积显著减小:字典编码+游程编码+位打包+可选的ZSTD/Snappy压缩,通常能让分析型数据表的体积缩小3-10倍 -
读取速度大幅提升:列裁剪+谓词下推意味着"只读取需要的数据",而不是"读取全部再过滤" -
可靠的数据类型:布尔值就是布尔值,日期时间保持时区信息,空值有原生支持 -
出色的互操作性:pandas、PyArrow、DuckDB、Spark、Polars、BigQuery等都原生支持 -
支持追加和演进:可以添加新列,同时保持对旧文件的兼容

你可能会问:是否需要重写整个数据管道?完全不需要!在Python中,这主要就是替换数据读取方式。
一行代码实现读取器切换
pandas用户
import pandas as pd
# 旧方式:
df = pd.read_csv("orders_2024.csv")
# 新方式:
df = pd.read_parquet("orders_2024.parquet") # 新版pandas默认使用pyarrow引擎
只需要特定列?很简单:
df = pd.read_parquet("orders_2024.parquet", columns=["order_id", "country", "total"])
PyArrow(追求极致速度和控制)
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/orders/", format="parquet", partitioning="hive")
table = dataset.to_table(columns=["order_id", "date", "total"],
filter=ds.field("date") >= ds.scalar("2025-01-01"))
df = table.to_pandas(types_mapper=pd.ArrowDtype) # 使用pandas的可空数据类型
DuckDB(直接对文件执行SQL查询)
import duckdb
con = duckdb.connect()
df = con.execute("""
SELECT order_id, country, total
FROM 's3://bucket/orders/*.parquet'
WHERE date >= DATE '2025-01-01' AND country IN ('IN','US')
""").df()
这才是真正的魔法:无需数据库服务器,直接查询Parquet文件。
迁移实战:从CSV平稳过渡到Parquet
1. 预先定义数据模式
建立明确的数据契约:列名、逻辑类型、是否可空、计量单位等,避免后续的"0/1还是true/false"混乱。
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
schema = pa.schema([
pa.field("order_id", pa.int64()),
pa.field("date", pa.timestamp("ms", tz="UTC")),
pa.field("country", pa.string()),
pa.field("total", pa.decimal128(18, 2)),
pa.field("coupon", pa.string()).with_nullable(True),
])
# 将一次性CSV转换为类型明确的Parquet
df = pd.read_csv("orders_2024.csv", parse_dates=["date"], dtype={"order_id": "Int64"})
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
pq.write_table(table, "orders_2024.parquet", compression="zstd", coerce_timestamps="ms")
重要提示:使用pd.ArrowDtype或pandas的可空数据类型,避免包含空值的整数列被静默转换为浮点型。
2. 合理分区提升性能
根据实际过滤条件使用的字段进行分区(如date=YYYY/MM/DD、country=IN)。避免过度分区,数千个小文件比几十个中等文件更糟糕。
import pyarrow.dataset as ds
import pyarrow.parquet as pq
pq.write_to_dataset(
table,
root_path="orders_parquet/",
partition_cols=["country", "date"],
compression="zstd",
)
3. 明智选择压缩算法
-
Snappy:速度极快,是很好的默认选择 -
ZSTD:压缩率更高,CPU消耗稍多。对于冷数据或网络I/O受限的场景是绝佳选择
4. 安全地进行模式演进
需要添加新列?使用默认值或空值添加,写入新文件,让读取器自动合并模式:
import pyarrow.dataset as ds
dataset = ds.dataset("orders_parquet/", format="parquet")
table = dataset.to_table() # 自动合并各文件的模式
真实世界案例:效果显著
从CSV到Parquet的完整迁移代码
某团队将每日销售数据(约800万行/天)从CSV迁移到Parquet后观察到:
-
存储占用从~12 GB/天(CSV)降至~2.5 GB/天(Parquet + ZSTD) -
报表子集(5列,最近7天数据)的pandas加载时间从~70秒缩短到~9秒,这得益于列裁剪和分区过滤 -
"神秘bug"显著减少,日期时间和十进制数在生产者和消费者之间保持一致
你的具体数据可能会有所不同,但列式存储+压缩+强类型的组合优势是稳定可靠的。
以下是一个完整的Python代码示例,展示如何将日常销售数据从CSV迁移到Parquet格式,并演示性能对比:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import time
import os
from datetime import datetime, timedelta
import numpy as np
# 生成模拟的销售数据(模拟8M行/天的数据量)
def generate_sample_data(num_rows=80000):# 使用8万行作为示例
"""生成模拟销售数据"""
countries = ['US', 'IN', 'UK', 'DE', 'FR', 'JP', 'CA', 'AU']
data = {
'order_id': range(100000, 100000 + num_rows),
'date': pd.date_range('2024-01-01', periods=num_rows, freq='T'),
'country': np.random.choice(countries, num_rows),
'product_id': np.random.randint(1000, 9999, num_rows),
'quantity': np.random.randint(1, 10, num_rows),
'unit_price': np.round(np.random.uniform(10, 500, num_rows), 2),
'customer_id': np.random.randint(10000, 99999, num_rows),
'coupon_used': np.random.choice([True, False, None], num_rows, p=[0.2, 0.75, 0.05])
}
df = pd.DataFrame(data)
df['total'] = df['quantity'] * df['unit_price']
return df
# 定义Parquet模式
def define_schema():
"""定义强类型的数据模式"""
return pa.schema([
pa.field("order_id", pa.int64()),
pa.field("date", pa.timestamp("ms", tz="UTC")),
pa.field("country", pa.string()),
pa.field("product_id", pa.int32()),
pa.field("quantity", pa.int32()),
pa.field("unit_price", pa.float64()),
pa.field("customer_id", pa.int32()),
pa.field("coupon_used", pa.bool_()).with_nullable(True),
pa.field("total", pa.float64())
])
# CSV处理流程
def process_with_csv(df, csv_path):
"""传统的CSV处理流程"""
print("=== CSV处理流程 ===")
# 保存为CSV
start_time = time.time()
df.to_csv(csv_path, index=False)
csv_write_time = time.time() - start_time
# 获取文件大小
csv_size = os.path.getsize(csv_path) / (1024 * 1024) # MB
# 读取CSV(全量)
start_time = time.time()
df_csv = pd.read_csv(csv_path)
csv_read_full_time = time.time() - start_time
# 读取CSV(选择特定列)
start_time = time.time()
df_csv_subset = pd.read_csv(csv_path, usecols=['order_id', 'country', 'total'])
csv_read_subset_time = time.time() - start_time
print(f"CSV文件大小: {csv_size:.2f} MB")
print(f"CSV写入时间: {csv_write_time:.2f} 秒")
print(f"CSV全量读取时间: {csv_read_full_time:.2f} 秒")
print(f"CSV子集读取时间: {csv_read_subset_time:.2f} 秒")
return {
'size': csv_size,
'write_time': csv_write_time,
'read_full_time': csv_read_full_time,
'read_subset_time': csv_read_subset_time
}
# Parquet处理流程
def process_with_parquet(df, parquet_path, partitioned_path):
"""Parquet处理流程"""
print("\n=== Parquet处理流程 ===")
schema = define_schema()
# 转换为PyArrow Table
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
# 保存为单文件Parquet
start_time = time.time()
pq.write_table(table, parquet_path, compression='zstd')
parquet_write_time = time.time() - start_time
# 获取文件大小
parquet_size = os.path.getsize(parquet_path) / (1024 * 1024) # MB
# 分区存储
start_time = time.time()
pq.write_to_dataset(
table,
root_path=partitioned_path,
partition_cols=['country'],
compression='zstd'
)
partitioned_write_time = time.time() - start_time
# 读取Parquet(全量)
start_time = time.time()
df_parquet = pd.read_parquet(parquet_path)
parquet_read_full_time = time.time() - start_time
# 读取Parquet(选择特定列)
start_time = time.time()
df_parquet_subset = pd.read_parquet(parquet_path, columns=['order_id', 'country', 'total'])
parquet_read_subset_time = time.time() - start_time
# 使用分区数据进行高效查询
start_time = time.time()
dataset = ds.dataset(partitioned_path, format="parquet")
# 只查询美国的数据,且只需要3个列
table_filtered = dataset.to_table(
columns=["order_id", "date", "total"],
filter=ds.field("country") == ds.scalar("US")
)
df_partitioned = table_filtered.to_pandas()
partitioned_query_time = time.time() - start_time
print(f"Parquet文件大小: {parquet_size:.2f} MB")
print(f"Parquet写入时间: {parquet_write_time:.2f} 秒")
print(f"分区写入时间: {partitioned_write_time:.2f} 秒")
print(f"Parquet全量读取时间: {parquet_read_full_time:.2f} 秒")
print(f"Parquet子集读取时间: {parquet_read_subset_time:.2f} 秒")
print(f"分区查询时间: {partitioned_query_time:.2f} 秒")
print(f"分区查询结果行数: {len(df_partitioned)}")
return {
'size': parquet_size,
'write_time': parquet_write_time,
'read_full_time': parquet_read_full_time,
'read_subset_time': parquet_read_subset_time,
'partitioned_query_time': partitioned_query_time
}
# 性能对比分析
def performance_comparison(csv_stats, parquet_stats):
"""对比分析两种格式的性能"""
print("\n=== 性能对比分析 ===")
# 文件大小对比
size_reduction = (csv_stats['size'] - parquet_stats['size']) / csv_stats['size'] * 100
print(f"文件大小减少: {size_reduction:.1f}%")
# 读取性能对比
full_read_improvement = (csv_stats['read_full_time'] - parquet_stats['read_full_time']) / csv_stats['read_full_time'] * 100
subset_read_improvement = (csv_stats['read_subset_time'] - parquet_stats['read_subset_time']) / csv_stats['read_subset_time'] * 100
print(f"全量读取性能提升: {full_read_improvement:.1f}%")
print(f"子集读取性能提升: {subset_read_improvement:.1f}%")
# 压缩比
compression_ratio = csv_stats['size'] / parquet_stats['size']
print(f"压缩比: {compression_ratio:.1f}x")
# 主执行函数
def main():
# 创建输出目录
os.makedirs('output', exist_ok=True)
os.makedirs('output/partitioned', exist_ok=True)
# 文件路径
csv_path = 'output/sales_data.csv'
parquet_path = 'output/sales_data.parquet'
partitioned_path = 'output/partitioned'
print("生成模拟数据...")
df = generate_sample_data(80000) # 8万行数据
print(f"数据维度: {df.shape}")
print(f"内存使用: {df.memory_usage(deep=True).sum() / (1024 * 1024):.2f} MB")
# 处理CSV
csv_stats = process_with_csv(df, csv_path)
# 处理Parquet
parquet_stats = process_with_parquet(df, parquet_path, partitioned_path)
# 性能对比
performance_comparison(csv_stats, parquet_stats)
# 数据类型验证
print("\n=== 数据类型验证 ===")
df_parquet_loaded = pd.read_parquet(parquet_path)
print("Parquet加载后的数据类型:")
print(df_parquet_loaded.dtypes)
# 数据一致性检查
print("\n=== 数据一致性检查 ===")
df_csv_loaded = pd.read_csv(csv_path)
# 由于类型转换,需要确保关键数据一致
common_orders = len(set(df_parquet_loaded['order_id']) & set(df_csv_loaded['order_id']))
print(f"共同订单数: {common_orders} (应该等于总行数: {len(df)})")
if __name__ == "__main__":
main()
预期输出结果示例
运行上述代码后,你会看到类似以下的输出:
生成模拟数据...
数据维度: (80000, 9)
内存使用: 8.42 MB
=== CSV处理流程 ===
CSV文件大小: 12.45 MB
CSV写入时间: 1.23 秒
CSV全量读取时间: 0.85 秒
CSV子集读取时间: 0.72 秒
=== Parquet处理流程 ===
Parquet文件大小: 2.17 MB
Parquet写入时间: 0.45 秒
分区写入时间: 0.68 秒
Parquet全量读取时间: 0.12 秒
Parquet子集读取时间: 0.08 秒
分区查询时间: 0.05 秒
分区查询结果行数: 10023
=== 性能对比分析 ===
文件大小减少: 82.6%
全量读取性能提升: 85.9%
子集读取性能提升: 88.9%
压缩比: 5.7x
=== 数据类型验证 ===
Parquet加载后的数据类型:
order_id int64
date datetime64[ns]
country object
product_id int32
quantity int32
unit_price float64
customer_id int32
coupon_used bool
total float64
dtype: object
=== 数据一致性检查 ===
共同订单数: 80000 (应该等于总行数: 80000)
这个完整案例展示了:
-
数据生成:创建真实的销售数据模拟 -
格式转换:从CSV到Parquet的完整流程 -
性能对比:文件大小、读写速度的量化比较 -
高级功能:分区存储和条件查询 -
数据验证:确保迁移过程中数据完整性
在实际生产环境中,随着数据量增加到数百万行,性能优势会更加明显。
实用的代码模式
快速读取数据子集
import pyarrow.dataset as ds
dataset = ds.dataset("orders_parquet/", format="parquet", partitioning="hive")
filt = (ds.field("date") >= ds.scalar("2025-10-01")) & (ds.field("country") == "IN")
table = dataset.to_table(columns=["order_id", "total"], filter=filt)
df = table.to_pandas()
数据接入时实时写入Parquet
import pyarrow as pa, pyarrow.parquet as pq
writer = None
for batch_df in stream_source(): # 生成pandas DataFrame
batch = pa.Table.from_pandas(batch_df, preserve_index=False)
if writer is None:
writer = pq.ParquetWriter("live_orders.parquet", batch.schema, compression="zstd")
writer.write_table(batch)
writer.close()
追加前的数据验证
import pyarrow as pa
expected = {
"order_id": pa.int64(),
"date": pa.timestamp("ms", tz="UTC"),
"country": pa.string(),
"total": pa.decimal128(18, 2),
"coupon": pa.string()
}
def validate_table(tbl: pa.Table):
for name, typ in expected.items():
assert name in tbl.schema.names, f"缺少列: {name}"
assert pa.types.is_compatible(tbl.schema.field(name).type, typ), f"类型错误: {name}"
常见疑问与解答
"但CSV是人类可读的"
确实如此。可以保留少量CSV样本用于人工检查,或者使用
parquet-tools或pandas的.head()快速查看Parquet内容。
"小众工具支持怎么办?"
Parquet现在已成为主流。Python的pandas、PyArrow、DuckDB以及几乎所有数据湖仓栈都原生支持。
"需要Spark才能受益吗?"
完全不需要。单机Python环境能立即享受到列裁剪、压缩和类型化I/O带来的好处。
"如果数据生产者只提供CSV怎么办?"
将原始CSV放入隔离区,在接入时使用固定模式转换为Parquet,让Parquet成为下游的统一数据契约。
实用检查清单
-
使用Parquet作为存储格式,而不是CSV。将CSV仅视为接入阶段的临时格式 -
在数据边界强制执行模式,尽早拒绝或转换不符合要求的数据 -
基于实际过滤条件分区(日期/国家等),不要对所有列都分区 -
静态数据优先使用ZSTD,CPU密集型场景考虑Snappy -
使用DuckDB/PyArrow进行选择性读取,停止"以防万一而加载所有列"的做法 -
在生产环境前测试模式演进(添加/删除列)
采用这些实践,你的未来工作生活会轻松很多。
写在最后
如果你在Python中处理数据,切换到Parquet不是赶时髦,而是必备技能。你将获得更快的加载速度、更少的存储占用,并告别每个迭代周期都要处理类型问题的烦恼。
你可以尝试将一个热门的CSV文件转换为Parquet,使用PyArrow或DuckDB配置一个过滤读取,测量效果,然后继续推进。你的数据处理效率将迎来质的飞跃。
尝试一下,你会发现这可能是今年对你数据工作流程最重要的改进之一。
关注我们!与InfraLink共赴智能未来
🔗 聚焦数据科学 | 深耕算法创新 | 赋能AI工程化
📌 技术干货持续更新,全球生态合作共建
✨ 点击关注@InfraLink,解锁更多前沿技术资讯与实践洞察


