Amazon RDS(关系数据库服务)是 AWS 提供的功能强大且可扩展的数据库服务,但有时,出于合规性或安全原因,您需要加密现有的未加密数据库实例。在本文中,我们将逐步介绍一个 Python 脚本,该脚本可自动完成将未加密的 Amazon RDS 实例迁移到加密实例的过程。
RDS 实例的加密可确保静态数据的安全并满足各种合规性要求,例如 PCI DSS、HIPAA 等。加密可确保 RDS 数据库的备份、快照和底层存储自动加密。
但是,您无法直接在现有未加密的 RDS 实例上启用加密。相反,您必须创建快照,在启用加密的情况下复制该快照,然后从加密快照恢复新的 RDS 实例。
这就是我们将在本教程中实现自动化的内容。
要遵循本指南,您需要:
pip install boto3
您还需要以下 AWS 凭证:
此 Python 脚本自动执行以下步骤:
import boto3 import time from botocore.exceptions import WaiterError class RDSEncryptionMigrator: def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'): self.source_db_identifier = source_db_identifier self.target_db_identifier = target_db_identifier self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}' self.region = region self.rds_client = boto3.client('rds', region_name=region) self.kms_client = boto3.client('kms', region_name=region) def get_kms_key_id(self): """Get the KMS key ID from the alias""" try: response = self.kms_client.describe_key( KeyId=self.kms_key_alias ) return response['KeyMetadata']['Arn'] except Exception as e: print(f"Error getting KMS key ID from alias: {e}") raise def create_snapshot(self, snapshot_identifier): print(f"Creating snapshot of source database: {self.source_db_identifier}") response = self.rds_client.create_db_snapshot( DBSnapshotIdentifier=snapshot_identifier, DBInstanceIdentifier=self.source_db_identifier ) # Wait for snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=snapshot_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id): print("Creating encrypted copy of snapshot") kms_key_id = self.get_kms_key_id() response = self.rds_client.copy_db_snapshot( SourceDBSnapshotIdentifier=source_snapshot_id, TargetDBSnapshotIdentifier=encrypted_snapshot_id, KmsKeyId=kms_key_id, CopyTags=True, SourceRegion=self.region ) # Wait for encrypted snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=encrypted_snapshot_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for encrypted snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def restore_from_snapshot(self, snapshot_identifier): print(f"Restoring new encrypted database from snapshot") # Get source DB instance details source_db = self.rds_client.describe_db_instances( DBInstanceIdentifier=self.source_db_identifier )['DBInstances'][0] # Restore the encrypted instance response = self.rds_client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier=self.target_db_identifier, DBSnapshotIdentifier=snapshot_identifier, DBInstanceClass=source_db['DBInstanceClass'], VpcSecurityGroupIds=self._get_security_group_ids(source_db), DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'], PubliclyAccessible=source_db['PubliclyAccessible'], MultiAZ=source_db['MultiAZ'] ) # Wait for the new instance to be available waiter = self.rds_client.get_waiter('db_instance_available') try: waiter.wait( DBInstanceIdentifier=self.target_db_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for database restoration: {e}") raise return response['DBInstance']['DBInstanceArn'] def _get_security_group_ids(self, db_instance): return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']] def perform_encryption(self): try: # Create timestamp for unique identifiers timestamp = int(time.time()) # Step 1: Create initial snapshot snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}" self.create_snapshot(snapshot_id) # Step 2: Create encrypted copy of the snapshot encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}" self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id) # Step 3: Restore from encrypted snapshot self.restore_from_snapshot(encrypted_snapshot_id) print(f""" Encryption process completed successfully! New encrypted database instance: {self.target_db_identifier} Next steps: 1. Verify the new encrypted instance 2. Update your application connection strings 3. Once verified, you can delete the old unencrypted instance """) except Exception as e: print(f"Error during encryption process: {e}") raise def main(): # These values should ideally come from environment variables or command line arguments source_db_identifier = 'database-2' target_db_identifier = 'database-2-enc' kms_key_alias = 'aws/rds' region = 'us-east-1' migrator = RDSEncryptionMigrator( source_db_identifier=source_db_identifier, target_db_identifier=target_db_identifier, kms_key_alias=kms_key_alias, region=region ) migrator.perform_encryption() if __name__ == '__main__': main()
该脚本定义了一个 RDSEncryptionMigrator 类,它处理:
通过使用提供的脚本,您可以自动执行 RDS 数据库的加密过程并确保您的数据安全。这种方法无需人工干预,并降低了迁移过程中出现人为错误的风险。确保验证新的加密实例,更新您的应用程序连接字符串,并在准备好后删除旧的未加密实例。
如果您希望进一步扩展此规模,您可以将此脚本与 AWS Lambda 或 AWS Step Functions 集成,以在 CI/CD 管道中进一步自动化该流程。
以上是如何使用 Python 自动加密 Amazon RDS 实例的详细内容。更多信息请关注PHP中文网其他相关文章!