在日常开发工作中,我们经常会遇到一些重复的、耗时的任务。作为一名资深Python开发者,我发现掌握一些高效的代码片段能够显著提升工作效率。本文精选了10个具有高度实用价值的Python代码案例,涵盖数据处理、文件操作、性能优化等多个领域。这些代码经过生产环境验证,可以直接复用于实际项目中,帮助你事半功倍。
案例一:批量文件重命名与正则表达式应用
需求背景
处理大量文件时,经常需要按照特定规则进行批量重命名。手动操作不仅效率低下,还容易出错。
核心代码
import os
import re
from pathlib import Path
def batch_rename_files(directory, pattern, replacement):
"""
批量重命名文件
:param directory: 目标目录
:param pattern: 正则表达式匹配模式
:param replacement: 替换规则
"""
for file_path in Path(directory).iterdir():
if file_path.is_file():
old_name = file_path.name
new_name = re.sub(pattern, replacement, old_name)
if old_name != new_name:
new_path = file_path.parent / new_name
file_path.rename(new_path)
print(f"重命名: {old_name} → {new_name}")
# 使用示例:将所有 test_*.py 改为 pytest_*.py
batch_rename_files('./files', r'^test_', 'pytest_')
优化要点
使用 pathlib.Path 比传统 os.path 更简洁且跨平台兼容性更强。正则表达式提供了强大的灵活性,支持复杂的匹配和替换规则。
案例二:高效的DataFrame数据聚合与多层分组
需求背景
数据分析中经常需要对大规模数据进行多维度分组和聚合,传统方法效率低下。
核心代码
import pandas as pd
import numpy as np
# 创建示例数据
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=1000),
'department': np.random.choice(['销售', '技术', '运营'], 1000),
'region': np.random.choice(['华东', '华南', '华北'], 1000),
'sales': np.random.randint(1000, 10000, 1000)
})
# 多层分组聚合
result = df.groupby(['date', 'department', 'region'])['sales'].agg([
('total_sales', 'sum'),
('avg_sales', 'mean'),
('max_sales', 'max'),
('count', 'count')
]).reset_index()
# 透视表优化
pivot_result = df.pivot_table(
values='sales',
index='department',
columns='region',
aggfunc=['sum', 'mean']
)
性能建议
使用 groupby + agg 组合比逐行循环快100倍以上。对于超大数据集(>100MB),考虑使用 dask 库进行并行处理。
案例三:装饰器实现函数执行时间监控
需求背景
性能优化的第一步是定位性能瓶颈。需要快速检测哪些函数执行效率低下。
核心代码
import time
from functools import wraps
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def performance_monitor(func):
"""
函数性能监控装饰器
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = time.perf_counter()
elapsed = (end_time - start_time) * 1000# 毫秒
logger.info(f"[{func.__name__}] 执行耗时: {elapsed:.2f}ms")
return wrapper
@performance_monitor
def data_processing(data):
"""模拟数据处理"""
time.sleep(0.1)
return len(data)
# 使用
data_processing([1, 2, 3, 4, 5])
进阶应用
可扩展为记录内存使用量、异常捕获、重试机制等功能。
案例四:单例模式与数据库连接管理
需求背景
数据库连接是宝贵的系统资源,避免重复创建连接是必要的。
核心代码
import threading
class DatabaseConnection:
"""
单例数据库连接管理器
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance isNone:
with cls._lock:
if cls._instance isNone:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 模拟数据库连接初始化
self.connection = self._create_connection()
self._initialized = True
def _create_connection(self):
"""创建数据库连接"""
print("创建数据库连接...")
return"DB_CONNECTION_OBJECT"
def query(self, sql):
"""执行查询"""
returnf"执行查询: {sql}"
# 使用示例
db1 = DatabaseConnection()
db2 = DatabaseConnection()
print(db1 is db2) # True - 同一个实例
print(db1.query("SELECT * FROM users"))
线程安全
使用双检查锁定(Double-Checked Locking)确保在多线程环境下的安全性。
案例五:上下文管理器实现资源自动释放
需求背景
文件、数据库连接等资源需要在使用后正确释放,手动管理容易遗漏。
核心代码
from contextlib import contextmanager
import time
class ResourceManager:
"""资源管理器"""
def __init__(self, resource_name):
self.resource_name = resource_name
def __enter__(self):
print(f"[获取资源] {self.resource_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"[释放资源] {self.resource_name}")
if exc_type:
print(f"[异常处理] {exc_type.__name__}: {exc_val}")
returnFalse
# 使用
with ResourceManager("数据库连接") as rm:
print("正在使用资源...")
time.sleep(1)
# 使用 contextmanager 装饰器简化
@contextmanager
def db_transaction():
"""事务上下文管理"""
print("开始事务")
try:
yield
print("提交事务")
except Exception as e:
print(f"回滚事务: {e}")
raise
with db_transaction():
print("执行数据库操作")
优势
自动保证资源释放,即使发生异常也能正确清理,提高代码可靠性。
案例六:使用 asyncio 实现高并发网络请求
需求背景
批量爬取数据或调用API时,串行处理效率极低。异步编程能显著提升性能。
核心代码
import asyncio
import aiohttp
import time
asyncdef fetch_url(session, url):
"""异步获取URL内容"""
try:
asyncwith session.get(url, timeout=5) as response:
returnawait response.text()
except Exception as e:
print(f"获取 {url} 失败: {e}")
returnNone
asyncdef batch_fetch_urls(urls, max_concurrent=10):
"""
批量异步请求
:param urls: URL列表
:param max_concurrent: 最大并发数
"""
connector = aiohttp.TCPConnector(limit=max_concurrent)
asyncwith aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 使用示例
if __name__ == '__main__':
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
] * 10
start = time.time()
# asyncio.run(batch_fetch_urls(urls))
print(f"异步请求耗时: {time.time() - start:.2f}秒")
性能对比
异步方式相比串行可提升10-100倍的并发能力,特别适合I/O密集型任务。
案例七:生成器优化内存使用
需求背景
处理大文件或大数据集时,一次性加载所有数据会导致内存溢出。
核心代码
import csv
from typing import Generator
def read_large_csv(filename: str, chunk_size: int = 1000) -> Generator:
"""
分块读取大型CSV文件
"""
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def process_large_file(filename: str):
"""处理大文件"""
for chunk in read_large_csv(filename):
# 处理每一块数据
processed_data = [
{**row, 'processed': True}
for row in chunk
]
# 进一步处理或保存
# 内存高效的行生成器
def read_lines_lazy(filename: str):
"""惰性读取文件行"""
with open(filename, 'r') as f:
for line in f:
yield line.strip()
for line in read_lines_lazy('large_file.txt'):
if'keyword'in line:
print(line)
内存优化
生成器采用惰性计算,只在需要时生成数据,内存占用恒定。
案例八:字典与对象的动态创建与扩展
需求背景
从API返回的动态数据需要灵活处理,传统方式不够灵活。
核心代码
from types import SimpleNamespace
from dataclasses import dataclass, asdict
# 方式一:使用 SimpleNamespace 创建动态对象
def create_object_from_dict(data_dict):
"""将字典转换为对象"""
return SimpleNamespace(**data_dict)
api_response = {
'user_id': 123,
'username': 'john_doe',
'email': 'john@example.com'
}
user = create_object_from_dict(api_response)
print(user.username) # john_doe
# 方式二:使用 dataclass 创建类型安全的对象
@dataclass
class User:
user_id: int
username: str
email: str
@classmethod
def from_dict(cls, data):
"""从字典创建实例"""
return cls(**data)
def to_dict(self):
"""转换为字典"""
return asdict(self)
user = User.from_dict(api_response)
print(user.username)
# 方式三:动态字典扩展
class FlexibleDict(dict):
"""支持属性访问的灵活字典"""
def __getattr__(self, name):
return self.get(name)
def __setattr__(self, name, value):
self[name] = value
config = FlexibleDict(
database_host='localhost',
database_port=5432
)
print(config.database_host)
类型安全
dataclass 提供了类型提示和自动生成 __init__ 方法的优势。
案例九:缓存装饰器提升函数性能
需求背景
某些计算密集型函数重复调用相同参数,缓存可避免重复计算。
核心代码
from functools import lru_cache
import time
# 方式一:使用 lru_cache
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
"""计算斐波那契数列,使用缓存优化"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
start = time.time()
result = fibonacci(35)
print(f"耗时: {time.time() - start:.4f}秒, 结果: {result}")
# 方式二:自定义缓存装饰器
class CacheDecorator:
def __init__(self, max_size=100):
self.cache = {}
self.max_size = max_size
def __call__(self, func):
def wrapper(*args, **kwargs):
# 生成缓存键
key = (func.__name__, args, tuple(sorted(kwargs.items())))
if key in self.cache:
print(f"[缓存命中] {func.__name__}")
return self.cache[key]
result = func(*args, **kwargs)
if len(self.cache) >= self.max_size:
# 移除最老的缓存
self.cache.pop(next(iter(self.cache)))
self.cache[key] = result
return result
return wrapper
@CacheDecorator(max_size=50)
def expensive_computation(n: int) -> int:
"""模拟耗时计算"""
time.sleep(1)
return n * n
print(expensive_computation(5))
print(expensive_computation(5)) # 从缓存返回
对于重复调用场景,缓存可将性能提升数百倍。
案例十:多进程并行处理大规模数据
需求背景
CPU密集型任务需要利用多核处理器,Python的GIL限制了线程的并行性。
核心代码
from multiprocessing import Pool, Manager
import time
import numpy as np
def compute_intensive_task(data_chunk):
"""CPU密集型任务"""
result = 0
for num in data_chunk:
result += num ** 2
return result
def parallel_processing_example():
"""多进程并行处理"""
# 生成大数据集
large_data = list(range(10_000_000))
# 分割数据
chunk_size = len(large_data) // 4
chunks = [
large_data[i:i+chunk_size]
for i in range(0, len(large_data), chunk_size)
]
# 使用进程池
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(compute_intensive_task, chunks)
print(f"并行处理耗时: {time.time() - start:.2f}秒")
print(f"计算结果: {sum(results)}")
def map_reduce_pattern():
"""Map-Reduce 模式处理"""
data = range(1_000_000)
chunk_size = 100_000
chunks = [
list(data)[i:i+chunk_size]
for i in range(0, len(data), chunk_size)
]
with Pool(processes=4) as pool:
# Map 阶段
mapped_results = pool.map(compute_intensive_task, chunks)
# Reduce 阶段
final_result = sum(mapped_results)
return final_result
if __name__ == '__main__':
parallel_processing_example()
最佳实践
使用 if __name__ == '__main__': 保护进程创建代码。对于长时间运行的进程,使用 pool.imap_unordered() 获得结果即时反馈。
总结
上述10个代码案例覆盖了Python开发中的高频场景。这些模式和最佳实践不仅能显著提升工作效率,还能帮助你写出更优雅、更可维护的代码。建议在实际项目中逐个实践,结合具体业务场景灵活应用。
持续学习和优化这些技能,是成为高效Python开发者的必经之路。希望本文能为你的开发工作带来启发和帮助!

