Amazon RDS(關聯式資料庫服務)是 AWS 提供的強大且可擴展的資料庫服務,但有時,出於合規性或安全性原因,您需要加密現有的未加密資料庫實例。在本文中,我們將逐步介紹一個 Python 腳本,該腳本可自動完成將未加密的 Amazon RDS 執行個體移轉到加密執行個體的過程。
RDS 實例的加密可確保靜態資料的安全性並滿足各種合規性要求,例如 PCI DSS、HIPAA 等。加密可確保 RDS 資料庫的備份、快照和底層儲存自動加密。
但是,您無法直接在現有未加密的 RDS 實例上啟用加密。相反,您必須建立快照,在啟用加密的情況下複製該快照,然後從加密快照還原新的 RDS 執行個體。
這就是我們將在本教程中實現自動化的內容。
要遵循本指南,您需要:
pip install boto3
您還需要以下 AWS 憑證:
此 Python 腳本自動執行以下步驟:
import boto3 import time from botocore.exceptions import WaiterError class RDSEncryptionMigrator: def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'): self.source_db_identifier = source_db_identifier self.target_db_identifier = target_db_identifier self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}' self.region = region self.rds_client = boto3.client('rds', region_name=region) self.kms_client = boto3.client('kms', region_name=region) def get_kms_key_id(self): """Get the KMS key ID from the alias""" try: response = self.kms_client.describe_key( KeyId=self.kms_key_alias ) return response['KeyMetadata']['Arn'] except Exception as e: print(f"Error getting KMS key ID from alias: {e}") raise def create_snapshot(self, snapshot_identifier): print(f"Creating snapshot of source database: {self.source_db_identifier}") response = self.rds_client.create_db_snapshot( DBSnapshotIdentifier=snapshot_identifier, DBInstanceIdentifier=self.source_db_identifier ) # Wait for snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=snapshot_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id): print("Creating encrypted copy of snapshot") kms_key_id = self.get_kms_key_id() response = self.rds_client.copy_db_snapshot( SourceDBSnapshotIdentifier=source_snapshot_id, TargetDBSnapshotIdentifier=encrypted_snapshot_id, KmsKeyId=kms_key_id, CopyTags=True, SourceRegion=self.region ) # Wait for encrypted snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=encrypted_snapshot_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for encrypted snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def restore_from_snapshot(self, snapshot_identifier): print(f"Restoring new encrypted database from snapshot") # Get source DB instance details source_db = self.rds_client.describe_db_instances( DBInstanceIdentifier=self.source_db_identifier )['DBInstances'][0] # Restore the encrypted instance response = self.rds_client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier=self.target_db_identifier, DBSnapshotIdentifier=snapshot_identifier, DBInstanceClass=source_db['DBInstanceClass'], VpcSecurityGroupIds=self._get_security_group_ids(source_db), DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'], PubliclyAccessible=source_db['PubliclyAccessible'], MultiAZ=source_db['MultiAZ'] ) # Wait for the new instance to be available waiter = self.rds_client.get_waiter('db_instance_available') try: waiter.wait( DBInstanceIdentifier=self.target_db_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for database restoration: {e}") raise return response['DBInstance']['DBInstanceArn'] def _get_security_group_ids(self, db_instance): return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']] def perform_encryption(self): try: # Create timestamp for unique identifiers timestamp = int(time.time()) # Step 1: Create initial snapshot snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}" self.create_snapshot(snapshot_id) # Step 2: Create encrypted copy of the snapshot encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}" self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id) # Step 3: Restore from encrypted snapshot self.restore_from_snapshot(encrypted_snapshot_id) print(f""" Encryption process completed successfully! New encrypted database instance: {self.target_db_identifier} Next steps: 1. Verify the new encrypted instance 2. Update your application connection strings 3. Once verified, you can delete the old unencrypted instance """) except Exception as e: print(f"Error during encryption process: {e}") raise def main(): # These values should ideally come from environment variables or command line arguments source_db_identifier = 'database-2' target_db_identifier = 'database-2-enc' kms_key_alias = 'aws/rds' region = 'us-east-1' migrator = RDSEncryptionMigrator( source_db_identifier=source_db_identifier, target_db_identifier=target_db_identifier, kms_key_alias=kms_key_alias, region=region ) migrator.perform_encryption() if __name__ == '__main__': main()
該腳本定義了一個 RDSEncryptionMigrator 類,它處理:
透過使用提供的腳本,您可以自動執行 RDS 資料庫的加密過程並確保您的資料安全。這種方法無需人工幹預,並降低了遷移過程中出現人為錯誤的風險。確保驗證新的加密實例,更新您的應用程式連接字串,並在準備好後刪除舊的未加密實例。
如果您希望進一步擴展此規模,您可以將此腳本與 AWS Lambda 或 AWS Step Functions 集成,以在 CI/CD 管道中進一步自動化該流程。
以上是如何使用 Python 自動加密 Amazon RDS 實例的詳細內容。更多資訊請關注PHP中文網其他相關文章!