RDS备份上传至OSS

  • ~6.41K 字
  1. 1. 说明
  2. 2. 方案
  3. 3. 功能要求
  4. 4. 更新
    1. 4.1. 2022-11-12
  5. 5. 代码

说明

最近公司项目在做等保 2.0 等级 3,结果评测机构一测发现数据 i 库没有做异地备份,然后看了下阿里云 rds 的异地备份真贵,和阿里云还有评测机构的沟通下,只用把 RDS 数据备份放在其他其他机房即可以满足,本着为公司节省的态度,决定把每日的物理备份通过阿里云接口来同步到同地区的 OSS 内网中。

方案

使用一台同区域的服务器作为中转服务器,通过 RDS 备份列表接口先将每日的物理备份拉取到中转服务器上,拉取完毕后在上传到 oss,上传完删除本地物理备份,全程走内网,只占用 oss 的空间,毕竟 oss 空间还算便宜。

功能要求

python 3.6 以上
RDS 需要 AliyunRDSFullAccess 权限
OSS 需要桶的 PutObject、ListObjects、ListParts、ListMultipartUploads 权限
如需企业微信推送,需开通企业微信及创建第三方应用

更新

2022-11-12

修复阿里云 RDS 备份格式问题

代码

安装 py 环境库

pip3 install aliyun-python-sdk-core aliyun-python-sdk-rds oss2

计划任务

脚本定时任务设为 每天早上8点
阿里云上RDS备份任务设为 每天早上4点

脚本主代码

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# here put the import lib

import re,os,oss2,requests,sys,json,datetime,urllib.request
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import AccessKeyCredential
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkrds.request.v20140815.DescribeBackupsRequest import DescribeBackupsRequest

def Get_Token(Corpid, Secret):
    url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
    values = {
        "corpid": Corpid,
        "corpsecret": Secret
    }
    req = requests.post(url, params=values)
    data = json.loads(req.text)
    Token = data["access_token"]
    return Token

def WeiXin(Token, Agentid, Content):
    url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token
    data = {
        "touser": UserID,                               # 员工id,与部门ID 2选1
        # "toparty": PartyID,                           # 部门id,与用户ID 2选1
        "msgtype": "text",                              # 消息类型,该字段非空
        "agentid": Agentid,
        "text": {
            "content": Content.replace('\\n', '\n')     # 消息内容,非空
        },
        "safe": "0"                                     # 表示是否是保密消息,0表示否,1表示是,默认0
    }
    res = requests.post(url, json=data)
    return res.text

def Get_BackupDownloadUrl(DBInstanceId, LocalDir):
    request = DescribeBackupsRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(DBInstanceId)
    request.set_BackupStatus("Success")
    request.set_BackupMode("Automated")
    StartTime = (datetime.datetime.utcnow() - datetime.timedelta(hours = 4)).strftime('%Y-%m-%dT%H:%MZ')
    EndTime = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%MZ')
    request.set_StartTime(StartTime)
    request.set_EndTime(EndTime)
    response = client.do_action_with_exception(request)
    jdata = json.loads(response.decode('utf-8'))
    # 内网备份地址
    BackupDownloadURL = jdata['Items']['Backup'][0]['BackupIntranetDownloadURL']
    # 备份完成时间
    BackupTime = jdata['Items']['Backup'][0]['BackupEndTime']

    # 检测RDS全量备份格式
    try:
        idx = BackupDownloadURL.index('xb.qp')
        file_extension = "1"
    except ValueError:
        idx = BackupDownloadURL.index('qp.xb')
        file_extension = "2"

    # 获取备份的名字
    FileName = BackupDownloadURL[8:idx + 5].replace('/', '_')
    SrcFilePath = "%s/%s" % (LocalDir, FileName)
    return BackupDownloadURL, SrcFilePath, BackupTime, file_extension

def DownBackupRDS(BackupDownloadURL, SrcFilePath):
    urllib.request.urlretrieve(BackupDownloadURL, SrcFilePath)
    return

def UpdateOSS(BucketInfo, DstFilePath, SrcFilePath):
    try:
        oss2.resumable_upload(BucketInfo, DstFilePath, SrcFilePath)
    except Exception as e:
        print(e)

def Message():
    # 发送信息
    if WeChatWork_Status == "yes":
        Token = Get_Token(Corpid, Secret)
        Send_Message = WeiXin(Token, Agentid, Content)
        print(Send_Message)
    else:
        print(Content)

if __name__ == '__main__':
    AccessKeyId = ''                   # 阿里云AccessKey
    AccessSecret = ''                  # 阿里云Secret
    RegionId = ''                      # OSS桶区域
    Bucket_name = ''                   # OSS桶名
    DBInstanceId = ''                  # 数据库ID

    LocalDir = ''                       # 本地路径

    # 企业微信通知
    WeChatWork_Status = 'no'           # 是否开启企业微信通知yes或no
    Corpid = ''                        # 企业微信Corpid
    Secret = ''                        # 企业微信应用Secret
    Agentid = ''                       # 企业微信应用id
    # UserID = ""                      # 用户ID列表,对应用户的'用户ID'字段,与部门ID 2选1
    PartyID = ''                       # 部门ID列表,对应部门的'部门ID'字段,与用户ID 2选1

    # Aliyun SDK登陆
    credentials = AccessKeyCredential(AccessKeyId, AccessSecret)
    client = AcsClient(region_id=RegionId, credential=credentials)
    Token = Get_Token(Corpid, Secret)

     # 获取RDS备份下载地址
    try:
        BackupDownloadURL, SrcFilePath, BackupTime, file_extension = Get_BackupDownloadUrl(DBInstanceId, datadir)
        print(BackupDownloadURL)
        print(BackupTime)
    except Exception as e:
        Content = "获取RDS备份下载失败,请上线检查"
        Message()
        exit()

    # 下载RDS备份
    DownBackupRDS(BackupDownloadURL, SrcFilePath)

    # Aliyun OSS登陆
    auth = oss2.Auth(AccessKeyId, AccessSecret)
    RegionUrl = 'https://oss-' + RegionId + '-internal.aliyuncs.com'
    BucketInfo = oss2.Bucket(auth, RegionUrl, Bucket_name)

    # 获取当时日期,并进行备份重命名
    UtcTime = datetime.datetime.strptime(BackupTime, "%Y-%m-%dT%H:%M:%SZ")
    Localtime = UtcTime + datetime.timedelta(hours = 8)
    TodayTime = Localtime.strftime("%Y-%m-%d")

    if file_extension == "1":
        DstFilePath = TodayTime + "-" + "School-RDS-Backup.xb.qp"
    elif file_extension == "2":
        DstFilePath = TodayTime + "-" + "School-RDS-Backup.qp.xb"

    # 上传OSS
    try:
        UpdateOSS(BucketInfo, DstFilePath, SrcFilePath)
        # 删除本地备份文件
        os.remove(SrcFilePath)
        Content = TodayTime + ' RDS备份上传至OSS成功'
        Message()
    except Exception as e:
        Content = "RDS备份上传oss错误:" + e
        Message()