一、目的
本实验旨在实现跨Amazon账号(包括国内外账号)的 S3 文件同步功能,通过两种不同方案验证 S3 跨账号数据迁移的可行性:
-
针对同区域 S3 桶,利用 S3 自身复制机制实现文件的自动同步,验证版本控制在跨账号复制中的作用及权限配置的有效性 -
针对跨区域 S3 桶,通过 S3 事件通知结合 Lambda 函数的方式,实现文件的实时同步,验证基于事件驱动的跨区域数据同步方案的可靠性 -
对比两种方案的适用场景、配置复杂度及同步效果,为实际业务中的跨账号 S3 数据同步提供技术参考。
二、方案一
适用于源桶与目标桶同区域(均为国内 / 均为国外)
因源桶和目标桶处于同一区域,可直接借助 S3 自带的复制规则完成资源同步。以下以 “将 wenjun-test 桶下 bill 文件夹内的资源同步至 about-test 桶” 为例,说明配置流程:
1、前置条件
2、源账号配置步骤
创建同步现有文件的 IAM 角色 A(可选操作)
若无需自定义权限,可由 S3 在配置复制规则时,自动创建具备完整同步权限的新角色,无需手动创建角色 A。
权限策略:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:InitiateReplication",
"s3:GetReplicationConfiguration",
"s3:PutInventoryConfiguration"
],
"Resource": [
"arn:aws:s3:::wenjun-test",
"arn:aws:s3:::wenjun-test/*"
]
},
{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::wenjun-test/bill//*"
}
]
}
-
信任关系
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "batchoperations.s3.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
3、创建后续同步角色B(可选,可通过S3自动创建带有足够权限的新角色)
权限策略
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:GetReplicationConfiguration",
"s3:GetObjectVersionForReplication",
"s3:GetObjectVersionAcl",
"s3:GetObjectVersionTagging",
"s3:GetObjectRetention",
"s3:GetObjectLegalHold"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::wenjun-test",
"arn:aws:s3:::wenjun-test/*",
"arn:aws:s3:::about-test",
"arn:aws:s3:::about-test/*"
]
},
{
"Action": [
"s3:ReplicateObject",
"s3:ReplicateDelete",
"s3:ReplicateTags",
"s3:ObjectOwnerOverrideToBucketOwner"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::wenjun-test/*",
"arn:aws:s3:::about-test/*"
]
}
]
}
2. 信任关系
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"s3.amazonaws.com",
"batchoperations.s3.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
4、配置目标桶策略
添加允许源账号访问目标桶的策略
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::源账号ID:root"
},
"Action": [
"s3:ReplicateObject",
"s3:ReplicateDelete",
"s3:ReplicateTags",
"s3:GetObjectVersionTagging",
"s3:GetObjectVersionForReplication",
"s3:ObjectOwnerOverrideToBucketOwner"
],
"Resource": "目标桶ARN/*"
}
]
}
5、在源桶上创建复制管理规则
同步规则创建
复制当前文件
在源桶上上传文件测试
三、方案二
由于国内外 S3 服务分属不同云平台生态,且受跨境数据传输合规性、服务架构隔离等多重限制,Amazon S3 的原生复制功能不支持跨云平台及跨境到国内 S3 的同步。所以以下例子采用S3事件通知+lambda+国内凭证写入的方案。
下面例子为将国外S3桶wenjun-s3的bill文件夹下的资源同步到国内S3桶s3-cross-account下。
1、在目标账号下创建用户
控制台—IAM服务—策略—创建策略
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:PutObjectTagging"
],
"Resource": [
"arn:aws-cn:s3:::目标桶/*"
]
}
]
}
2、控制台---IAM服务---用户---创建用户
- 选择第一步创建的策略
-
创建用户
4、在源账号下创建secrets manager
- 控制台---secrets manager服务---存储新的密钥
- 完成创建后记录下密钥ARN
4、在源账号下创建lambda函数
创建策略
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::源桶名",
"arn:aws:s3:::源桶名/*"
]
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"目标账户aksk的ARN"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
5 、创建lambda执行角色
- 控制台---IAM服务---角色---创建角色---可信实体类型选择lambda服务
-
权限选择第一步创建的策略
6、 创建lambda函数
-
控制台---lambda服务---创建函数
import os
import json
import time
import urllib.parse
import boto3
from botocore.exceptions import ClientError
# 源 S3 client(Lambda role,用于读取源对象)
SRC_S3 = boto3.client('s3')
SM = boto3.client('secretsmanager')
# 必须的环境变量
TARGET_BUCKET = os.environ['TARGET_BUCKET']
SECRET_ARN = os.environ['TARGET_SECRET_ARN']
TARGET_REGION = os.environ.get('TARGET_REGION', 'us-east-1')
# 可选的前缀/后缀过滤(逗号分隔,多值均支持)
# 如果不设置,表示不过滤(处理所有对象)
SRC_PREFIXES = [p for p in os.environ.get('SRC_PREFIXES', '').split(',') if p != '']
SRC_SUFFIXES = [s for s in os.environ.get('SRC_SUFFIXES', '').split(',') if s != '']
# Secrets cache:避免每次都调用 SecretsManager(TTL 300s)
_TARGET_CREDS_CACHE = {"ak": None, "sk": None, "ts": 0, "ttl": int(os.environ.get("SECRETS_CACHE_TTL", "300"))}
def _log(*args):
print("[s3-sync]", *args)
def get_target_creds():
"""
从 Secrets Manager 获取目标账户 access/secret。
使用简单的进程内缓存,ttl 可通过 SECRETS_CACHE_TTL 环境变量调整(秒)。
SecretString 应包含 JSON: {"access_key_id":"...","secret_access_key":"..."}
"""
now = time.time()
if _TARGET_CREDS_CACHE["ak"] and (now - _TARGET_CREDS_CACHE["ts"] < _TARGET_CREDS_CACHE["ttl"]):
return _TARGET_CREDS_CACHE["ak"], _TARGET_CREDS_CACHE["sk"]
try:
resp = SM.get_secret_value(SecretId=SECRET_ARN)
sec = json.loads(resp['SecretString'])
ak = sec.get('access_key_id') or sec.get('aws_access_key_id') or sec.get('AccessKeyId')
sk = sec.get('secret_access_key') or sec.get('aws_secret_access_key') or sec.get('SecretAccessKey')
ifnot ak ornot sk:
raise ValueError("secret JSON missing access_key_id/secret_access_key")
except Exception as e:
_log("ERROR getting target creds from SecretsManager:", e)
raise
_TARGET_CREDS_CACHE.update({"ak": ak, "sk": sk, "ts": now})
return ak, sk
def upload_stream_to_target(key, stream):
"""
使用指定的目标 AK/SK 上传对象流到目标 bucket(upload_fileobj 能处理 StreamingBody)。
"""
ak, sk = get_target_creds()
target_s3 = boto3.client(
's3',
region_name=TARGET_REGION,
aws_access_key_id=ak,
aws_secret_access_key=sk
)
try:
# 如果目标需要分块/大文件优化可在此扩展(现在使用 upload_fileobj)
target_s3.upload_fileobj(stream, TARGET_BUCKET, key)
_log("Uploaded to target:", TARGET_BUCKET, key)
except ClientError as e:
_log("ERROR uploading to target S3:", e)
raise
def matches_filters(key):
"""
检查 key 是否匹配环境变量配置的前缀/后缀过滤。
只要配置了任一前缀或后缀,则必须满足对应条件。
多前缀/后缀用逗号分隔(在环境变量中)。
"""
if SRC_PREFIXES:
ifnot any(key.startswith(p) for p in SRC_PREFIXES):
returnFalse
if SRC_SUFFIXES:
ifnot any(key.endswith(s) for s in SRC_SUFFIXES):
returnFalse
returnTrue
def process_single_object(bucket, key):
"""
拉流并上传到目标 bucket。
注意:key 预期为已解码的字符串。
"""
ifnot matches_filters(key):
_log(f"Skipped (filter): {bucket}/{key}")
return
try:
# 从源桶流式读取(需要 Lambda role 有 s3:GetObject)
_log("Getting object from source:", bucket, key)
resp = SRC_S3.get_object(Bucket=bucket, Key=key)
stream = resp['Body'] # StreamingBody
except ClientError as e:
_log("ERROR getting object from source S3:", e)
return
try:
upload_stream_to_target(key, stream)
finally:
# StreamingBody 支持 close()
try:
stream.close()
except Exception:
pass
def extract_bucket_key_from_record(record):
"""
解析单条 S3 Record -> (bucket, key)
支持两种常见格式:
- S3 Event Notification (record['s3']['bucket']['name'], record['s3']['object']['key'])
- EventBridge detail (detail.bucket.name / detail.object.key) 已在 lambda_handler 处理
返回解码后的 key(使用 unquote_plus 与 AWS 建议一致)
"""
s3 = record.get('s3') or {}
bucket = s3.get('bucket', {}).get('name')
key = s3.get('object', {}).get('key')
if key:
# 通常 S3 event 中 key 是 URL encoded
key = urllib.parse.unquote_plus(key)
return bucket, key
def lambda_handler(event, context):
"""
支持:
- 原生 S3 Notification(event['Records'])
- EventBridge 的 S3 事件(event['detail'])
- 批量 Records
"""
_log("Received event type keys:", list(event.keys()))
# 1) 处理 Records (S3 Notification)
records = event.get('Records')
if records:
_log(f"Processing {len(records)} Records")
for rec in records:
# 仅处理来自 S3 的记录
if rec.get('eventSource') != 'aws:s3'and rec.get('eventSource') != 's3':
_log("Skipping non-s3 record")
continue
bucket, key = extract_bucket_key_from_record(rec)
ifnot bucket ornot key:
_log("record missing bucket/key:", rec)
continue
process_single_object(bucket, key)
return {"status": "ok", "processed": len(records)}
# 2) 处理 EventBridge 风格(你的原始实现)
detail = event.get('detail')
if detail:
# 支持 detail.bucket.name / detail.object.key 或 detail['bucket']['name'] 的多种嵌套情况
bucket = detail.get('bucket', {}).get('name') or (detail.get('bucket') if isinstance(detail.get('bucket'), str) elseNone)
key = detail.get('object', {}).get('key') if detail.get('object') else detail.get('object')
ifnot bucket ornot key:
_log("no bucket/key in detail:", event)
return {"status": "ignored", "reason": "no bucket/key in detail"}
key = urllib.parse.unquote_plus(key)
process_single_object(bucket, key)
return {"status": "ok", "processed": 1}
_log("No Records or detail found in event:", json.dumps(event)[:1000])
return {"status": "ignored", "reason": "no_records_or_detail"}
7、设置函数配置
- 常规配置
TARGET_REGION-----------cn-northwest-1
TARGET_BUCKET------------s3-cross-account
TARGET_SECRET_ARN-------arn:aws:secretsmanager:us-east-1:123456789101:secret:global-key-n5d8LT
8、源桶开通事件通知
- 控制台---S3服务---选择源桶---属性---创建事件通知
9、测试
- 往源桶bill文件夹内上传文件 test4-new.csv.zip 和 test4-new.csv - 副本.zip,测试后缀条件是否匹配
- 往源桶bill文件夹外上传文件 test5-new.csv.zip 和 test5-new.csv - 副本.zip,测试前缀条件是否匹配
-
查看目标桶---只有符合前缀条件 bill/和后缀条件-new.csv.zip才触发lambda同步
四、总结
本次实验通过两种方案实现了跨账号 S3 文件的同步功能,各方案特点如下:
1、方案一(S3 自身同步机制)
-
优势:依托 S3 原生复制功能,配置完成后无需额外代码维护,适合同区域桶间的全量同步,支持版本控制和删除同步 -
限制:仅适用于同一区域的 S3 桶,且要求源桶和目标桶均开启版本控制,权限配置涉及多角色交互,复杂度较高
-
优势:支持跨区域同步,可通过前缀 / 后缀过滤实现精细化同步控制,配置流程相对清晰,扩展性强(可通过修改 Lambda 代码增加业务逻辑) -
限制:需要维护 Lambda 函数及相关权限,对大文件同步需额外优化,依赖事件触发可能存在轻微延迟
实际应用中,应根据桶的区域分布、同步粒度需求及运维成本等因素选择合适的方案:同区域全量同步优先考虑方案一,跨区域或需精细化控制的场景优先选择方案二;两种方案均需严格配置 IAM 权限及桶策略,确保跨账号访问的安全性和合法性。
扫码加入博思云为技术交流群
与一线大厂技术人员实时交流云技术~
关于博思云为
博思云为是一家专业的云管理服务提供商(云MSP),致力于为客户提供亚马逊云科技上的运营服务,包括架构咨询、项目迁移、运维托管、混合云管理、大数据、DevOps、CDN、云安全、培训等服务以及多种行业解决方案。
作为亚马逊云科技的核心级咨询合作伙伴,服务全球企业超过数百家,在互联网、高新制造、游戏、电商、智能制造等行业拥有丰富的实践经验。
博思云为还拥有自研的核心业务管理平台,致力于用工具和平台为客户提供可信赖的云上IT服务与解决方案。
欢迎关注「博思云为」官方账号
第一时间获取关于博思云为的最新资讯


