大数跨境
0
0

云技术分享|跨境 S3 存储桶同步方案:中国区域与海外区域数据复制设计

云技术分享|跨境 S3 存储桶同步方案:中国区域与海外区域数据复制设计 博思云为
2025-11-28
0
导读:实际应用中,应根据桶的区域分布、同步粒度需求及运维成本等因素选择合适的方案:同区域全量同步优先考虑方案一,跨区域或需精细化控制的场景优先选择方案二;两种方案均需严格配置 IAM 权限及桶策略,确保跨账

一、目的

本实验旨在实现跨Amazon账号(包括国内外账号)的 S3 文件同步功能,通过两种不同方案验证 S3 跨账号数据迁移的可行性:

  1. 针对同区域 S3 桶,利用 S3 自身复制机制实现文件的自动同步,验证版本控制在跨账号复制中的作用及权限配置的有效性
  2. 针对跨区域 S3 桶,通过 S3 事件通知结合 Lambda 函数的方式,实现文件的实时同步,验证基于事件驱动的跨区域数据同步方案的可靠性
  3. 对比两种方案的适用场景、配置复杂度及同步效果,为实际业务中的跨账号 S3 数据同步提供技术参考。

二、方案一

适用于源桶与目标桶同区域(均为国内 / 均为国外)

因源桶和目标桶处于同一区域,可直接借助 S3 自带的复制规则完成资源同步。以下以 “将 wenjun-test 桶下 bill 文件夹内的资源同步至 about-test 桶” 为例,说明配置流程:

1、前置条件

源账号与目标账号的存储桶,均已开启存储桶版本控制。

2、源账号配置步骤

创建同步现有文件的 IAM 角色 A(可选操作

若无需自定义权限,可由 S3 在配置复制规则时,自动创建具备完整同步权限的新角色,无需手动创建角色 A。

  • 权限策略:

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Action": [
                "s3:InitiateReplication",
                "s3:GetReplicationConfiguration",
                "s3:PutInventoryConfiguration"
            ],
            "Resource": [
                "arn:aws:s3:::wenjun-test",
                "arn:aws:s3:::wenjun-test/*"
            ]
        },
        {
            "Effect""Allow",
            "Action""s3:PutObject",
            "Resource""arn:aws:s3:::wenjun-test/bill//*"
        }
    ]
}
  • 信任关系
{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Principal": {
                "Service""batchoperations.s3.amazonaws.com"
            },
            "Action""sts:AssumeRole"
        }
    ]
}

3、创建后续同步角色B(可选,可通过S3自动创建带有足够权限的新角色)

  • 权限策略

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket",
                "s3:GetReplicationConfiguration",
                "s3:GetObjectVersionForReplication",
                "s3:GetObjectVersionAcl",
                "s3:GetObjectVersionTagging",
                "s3:GetObjectRetention",
                "s3:GetObjectLegalHold"
            ],
            "Effect""Allow",
            "Resource": [
                "arn:aws:s3:::wenjun-test",
                "arn:aws:s3:::wenjun-test/*",
                "arn:aws:s3:::about-test",
                "arn:aws:s3:::about-test/*"
            ]
        },
        {
            "Action": [
                "s3:ReplicateObject",
                "s3:ReplicateDelete",
                "s3:ReplicateTags",
                "s3:ObjectOwnerOverrideToBucketOwner"
            ],
            "Effect""Allow",
            "Resource": [
                "arn:aws:s3:::wenjun-test/*",
                "arn:aws:s3:::about-test/*"
            ]
        }
    ]
}
         2. 信任关系

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Principal": {
                "Service": [
                    "s3.amazonaws.com",
                    "batchoperations.s3.amazonaws.com"
                ]
            },
            "Action""sts:AssumeRole"
        }
    ]
}

4、配置目标桶策略

添加允许源账号访问目标桶的策略

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Principal": {
                "AWS""arn:aws:iam::源账号ID:root"
            },
            "Action": [
                "s3:ReplicateObject",
                "s3:ReplicateDelete",
                "s3:ReplicateTags",
                "s3:GetObjectVersionTagging",
                "s3:GetObjectVersionForReplication",
                "s3:ObjectOwnerOverrideToBucketOwner"
            ],
            "Resource""目标桶ARN/*"
        }
    ]
}

5、在源桶上创建复制管理规则

  • 同步规则创建

  • 复制当前文件

  • 在源桶上上传文件测试


三、方案二

适合S3目标桶和源桶不在同一区域

由于国内外 S3 服务分属不同云平台生态,且受跨境数据传输合规性、服务架构隔离等多重限制,Amazon S3 的原生复制功能不支持跨云平台及跨境到国内 S3 的同步。所以以下例子采用S3事件通知+lambda+国内凭证写入的方案。

下面例子为将国外S3桶wenjun-s3的bill文件夹下的资源同步到国内S3桶s3-cross-account下

1、在目标账号下创建用户

  • 控制台—IAM服务—策略—创建策略

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:PutObjectTagging"
            ],
            "Resource": [
                "arn:aws-cn:s3:::目标桶/*"
            ]
        }
    ]
}

2、控制台---IAM服务---用户---创建用户

  • 选择第一步创建的策略
  • 创建用户
3、创建安全凭证
选择刚创建的用户,创建访问密钥,并下载csv文件。

4、在源账号下创建secrets manager

  • 控制台---secrets manager服务---存储新的密钥
  • 完成创建后记录下密钥ARN

4、在源账号下创建lambda函数

  • 创建策略

{
    "Version""2012-10-17",
    "Statement": [
        {
            "Effect""Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::源桶名",
                "arn:aws:s3:::源桶名/*"
            ]
        },
        {
            "Effect""Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": [
                "目标账户aksk的ARN"
            ]
        },
        {
            "Effect""Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource""*"
        }
    ]
}

   5 、创建lambda执行角色

  • 控制台---IAM服务---角色---创建角色---可信实体类型选择lambda服务

  • 权限选择第一步创建的策略

     6、 创建lambda函数

  • 控制台---lambda服务---创建函数
import os
import json
import time
import urllib.parse
import boto3
from botocore.exceptions import ClientError

# 源 S3 client(Lambda role,用于读取源对象)
SRC_S3 = boto3.client('s3')
SM = boto3.client('secretsmanager')

# 必须的环境变量
TARGET_BUCKET = os.environ['TARGET_BUCKET']
SECRET_ARN = os.environ['TARGET_SECRET_ARN']
TARGET_REGION = os.environ.get('TARGET_REGION''us-east-1')

# 可选的前缀/后缀过滤(逗号分隔,多值均支持)
# 如果不设置,表示不过滤(处理所有对象)
SRC_PREFIXES = [p for p in os.environ.get('SRC_PREFIXES''').split(','if p != '']
SRC_SUFFIXES = [s for s in os.environ.get('SRC_SUFFIXES''').split(','if s != '']

# Secrets cache:避免每次都调用 SecretsManager(TTL 300s)
_TARGET_CREDS_CACHE = {"ak"None"sk"None"ts"0"ttl": int(os.environ.get("SECRETS_CACHE_TTL""300"))}

def _log(*args):
    print("[s3-sync]", *args)

def get_target_creds():
    """
    从 Secrets Manager 获取目标账户 access/secret。
    使用简单的进程内缓存,ttl 可通过 SECRETS_CACHE_TTL 环境变量调整(秒)。
    SecretString 应包含 JSON: {"access_key_id":"...","secret_access_key":"..."}
    """

    now = time.time()
    if _TARGET_CREDS_CACHE["ak"and (now - _TARGET_CREDS_CACHE["ts"] < _TARGET_CREDS_CACHE["ttl"]):
        return _TARGET_CREDS_CACHE["ak"], _TARGET_CREDS_CACHE["sk"]

    try:
        resp = SM.get_secret_value(SecretId=SECRET_ARN)
        sec = json.loads(resp['SecretString'])
        ak = sec.get('access_key_id'or sec.get('aws_access_key_id'or sec.get('AccessKeyId')
        sk = sec.get('secret_access_key'or sec.get('aws_secret_access_key'or sec.get('SecretAccessKey')
        ifnot ak ornot sk:
            raise ValueError("secret JSON missing access_key_id/secret_access_key")
    except Exception as e:
        _log("ERROR getting target creds from SecretsManager:", e)
        raise

    _TARGET_CREDS_CACHE.update({"ak": ak, "sk": sk, "ts": now})
    return ak, sk

def upload_stream_to_target(key, stream):
    """
    使用指定的目标 AK/SK 上传对象流到目标 bucket(upload_fileobj 能处理 StreamingBody)。
    """

    ak, sk = get_target_creds()
    target_s3 = boto3.client(
        's3',
        region_name=TARGET_REGION,
        aws_access_key_id=ak,
        aws_secret_access_key=sk
    )

    try:
        # 如果目标需要分块/大文件优化可在此扩展(现在使用 upload_fileobj)
        target_s3.upload_fileobj(stream, TARGET_BUCKET, key)
        _log("Uploaded to target:", TARGET_BUCKET, key)
    except ClientError as e:
        _log("ERROR uploading to target S3:", e)
        raise

def matches_filters(key):
    """
    检查 key 是否匹配环境变量配置的前缀/后缀过滤。
    只要配置了任一前缀或后缀,则必须满足对应条件。
    多前缀/后缀用逗号分隔(在环境变量中)。
    """

    if SRC_PREFIXES:
        ifnot any(key.startswith(p) for p in SRC_PREFIXES):
            returnFalse
    if SRC_SUFFIXES:
        ifnot any(key.endswith(s) for s in SRC_SUFFIXES):
            returnFalse
    returnTrue

def process_single_object(bucket, key):
    """
    拉流并上传到目标 bucket。
    注意:key 预期为已解码的字符串。
    """

    ifnot matches_filters(key):
        _log(f"Skipped (filter): {bucket}/{key}")
        return

    try:
        # 从源桶流式读取(需要 Lambda role 有 s3:GetObject)
        _log("Getting object from source:", bucket, key)
        resp = SRC_S3.get_object(Bucket=bucket, Key=key)
        stream = resp['Body'# StreamingBody
    except ClientError as e:
        _log("ERROR getting object from source S3:", e)
        return

    try:
        upload_stream_to_target(key, stream)
    finally:
        # StreamingBody 支持 close()
        try:
            stream.close()
        except Exception:
            pass

def extract_bucket_key_from_record(record):
    """
    解析单条 S3 Record -> (bucket, key)
    支持两种常见格式:
      - S3 Event Notification (record['s3']['bucket']['name'], record['s3']['object']['key'])
      - EventBridge detail (detail.bucket.name / detail.object.key) 已在 lambda_handler 处理
    返回解码后的 key(使用 unquote_plus 与 AWS 建议一致)
    """

    s3 = record.get('s3'or {}
    bucket = s3.get('bucket', {}).get('name')
    key = s3.get('object', {}).get('key')
    if key:
        # 通常 S3 event 中 key 是 URL encoded
        key = urllib.parse.unquote_plus(key)
    return bucket, key

def lambda_handler(event, context):
    """
    支持:
    - 原生 S3 Notification(event['Records'])
    - EventBridge 的 S3 事件(event['detail'])
    - 批量 Records
    """

    _log("Received event type keys:", list(event.keys()))

    # 1) 处理 Records (S3 Notification)
    records = event.get('Records')
    if records:
        _log(f"Processing {len(records)} Records")
        for rec in records:
            # 仅处理来自 S3 的记录
            if rec.get('eventSource') != 'aws:s3'and rec.get('eventSource') != 's3':
                _log("Skipping non-s3 record")
                continue
            bucket, key = extract_bucket_key_from_record(rec)
            ifnot bucket ornot key:
                _log("record missing bucket/key:", rec)
                continue
            process_single_object(bucket, key)
        return {"status""ok""processed": len(records)}

    # 2) 处理 EventBridge 风格(你的原始实现)
    detail = event.get('detail')
    if detail:
        # 支持 detail.bucket.name / detail.object.key 或 detail['bucket']['name'] 的多种嵌套情况
        bucket = detail.get('bucket', {}).get('name'or (detail.get('bucket'if isinstance(detail.get('bucket'), str) elseNone)
        key = detail.get('object', {}).get('key'if detail.get('object'else detail.get('object')
        ifnot bucket ornot key:
            _log("no bucket/key in detail:", event)
            return {"status""ignored""reason""no bucket/key in detail"}
        key = urllib.parse.unquote_plus(key)
        process_single_object(bucket, key)
        return {"status""ok""processed"1}

    _log("No Records or detail found in event:", json.dumps(event)[:1000])
    return {"status""ignored""reason""no_records_or_detail"}

7、设置函数配置

  • 常规配置
TARGET_REGION-----------cn-northwest-1
TARGET_BUCKET------------s3-cross-account
TARGET_SECRET_ARN-------arn:aws:secretsmanager:us-east-1:123456789101:secret:global-key-n5d8LT

8、源桶开通事件通知

  • 控制台---S3服务---选择源桶---属性---创建事件通知

9、测试

  • 往源桶bill文件夹内上传文件 test4-new.csv.zip 和 test4-new.csv - 副本.zip,测试后缀条件是否匹配
  • 往源桶bill文件夹外上传文件 test5-new.csv.zip 和 test5-new.csv - 副本.zip,测试前缀条件是否匹配
  • 查看目标桶---只有符合前缀条件bill/和后缀条件-new.csv.zip才触发lambda同步

四、总结

本次实验通过两种方案实现了跨账号 S3 文件的同步功能,各方案特点如下:

1、方案一(S3 自身同步机制)

    • 优势:依托 S3 原生复制功能,配置完成后无需额外代码维护,适合同区域桶间的全量同步,支持版本控制和删除同步
    • 限制:仅适用于同一区域的 S3 桶,且要求源桶和目标桶均开启版本控制,权限配置涉及多角色交互,复杂度较高
2、方案二(S3 事件通知 + Lambda)
    • 优势:支持跨区域同步,可通过前缀 / 后缀过滤实现精细化同步控制,配置流程相对清晰,扩展性强(可通过修改 Lambda 代码增加业务逻辑)
    • 限制:需要维护 Lambda 函数及相关权限,对大文件同步需额外优化,依赖事件触发可能存在轻微延迟

实际应用中,应根据桶的区域分布、同步粒度需求及运维成本等因素选择合适的方案:同区域全量同步优先考虑方案一,跨区域或需精细化控制的场景优先选择方案二;两种方案均需严格配置 IAM 权限及桶策略,确保跨账号访问的安全性和合法性。




点击蓝色关注我们

扫码加入博思云为技术交流群

与一线大厂技术人员实时交流云技术~

图片


图片

关于博思云为

博思云为是一家专业的云管理服务提供商(云MSP),致力于为客户提供亚马逊云科技上的运营服务,包括架构咨询、项目迁移、运维托管、混合云管理、大数据、DevOps、CDN、云安全、培训等服务以及多种行业解决方案。


作为亚马逊云科技的核心级咨询合作伙伴,服务全球企业超过数百家,在互联网、高新制造、游戏、电商、智能制造等行业拥有丰富的实践经验。


博思云为还拥有自研的核心业务管理平台,致力于用工具和平台为客户提供可信赖的云上IT服务与解决方案。



欢迎关注「博思云为」官方账号

第一时间获取关于博思云为的最新资讯


图片

【声明】内容源于网络
0
0
博思云为
深圳市博思云为科技有限公司是一家提供云计算服务的公司,其主要运营中心位于深圳,并在苏州、广州、杭州和北京等城市拥有分支机构。公司向客户提供一系列云服务,包括云计算架构的咨询服务、专业培训、数据迁移、运维管理、混合云环境的协调、大数据分析以及DevOps实践等。
内容 222
粉丝 0
认证用户
博思云为 深圳市博思云为科技有限公司 深圳市博思云为科技有限公司是一家提供云计算服务的公司,其主要运营中心位于深圳,并在苏州、广州、杭州和北京等城市拥有分支机构。公司向客户提供一系列云服务,包括云计算架构的咨询服务、专业培训、数据迁移、运维管理、混合云环境的协调、大数据分析以及DevOps实践等。
总阅读45
粉丝0
内容222