Amazon RDS(관계형 데이터베이스 서비스)는 AWS에서 제공하는 강력하고 확장 가능한 데이터베이스 서비스이지만 규정 준수 또는 보안상의 이유로 암호화되지 않은 기존 데이터베이스 인스턴스를 암호화해야 하는 경우도 있습니다. 이 기사에서는 암호화되지 않은 Amazon RDS 인스턴스를 암호화된 인스턴스로 마이그레이션하는 프로세스를 자동화하는 Python 스크립트를 살펴보겠습니다.
RDS 인스턴스를 암호화하면 저장 데이터가 안전하게 보호되고 PCI DSS, HIPAA 등과 같은 다양한 규정 준수 요구 사항을 충족할 수 있습니다. 암호화를 사용하면 RDS 데이터베이스의 백업, 스냅샷 및 기본 스토리지가 자동으로 암호화됩니다.
그러나 암호화되지 않은 기존 RDS 인스턴스에서는 직접 암호화를 활성화할 수 없습니다. 대신 스냅샷을 생성하고 암호화가 활성화된 해당 스냅샷을 복사한 다음 암호화된 스냅샷에서 새 RDS 인스턴스를 복원해야 합니다.
이 튜토리얼에서는 이것이 자동화됩니다.
이 가이드를 따르려면 다음이 필요합니다.
pip install boto3
다음 AWS 자격 증명도 필요합니다.
이 Python 스크립트는 다음 단계를 자동화합니다.
import boto3 import time from botocore.exceptions import WaiterError class RDSEncryptionMigrator: def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'): self.source_db_identifier = source_db_identifier self.target_db_identifier = target_db_identifier self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}' self.region = region self.rds_client = boto3.client('rds', region_name=region) self.kms_client = boto3.client('kms', region_name=region) def get_kms_key_id(self): """Get the KMS key ID from the alias""" try: response = self.kms_client.describe_key( KeyId=self.kms_key_alias ) return response['KeyMetadata']['Arn'] except Exception as e: print(f"Error getting KMS key ID from alias: {e}") raise def create_snapshot(self, snapshot_identifier): print(f"Creating snapshot of source database: {self.source_db_identifier}") response = self.rds_client.create_db_snapshot( DBSnapshotIdentifier=snapshot_identifier, DBInstanceIdentifier=self.source_db_identifier ) # Wait for snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=snapshot_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id): print("Creating encrypted copy of snapshot") kms_key_id = self.get_kms_key_id() response = self.rds_client.copy_db_snapshot( SourceDBSnapshotIdentifier=source_snapshot_id, TargetDBSnapshotIdentifier=encrypted_snapshot_id, KmsKeyId=kms_key_id, CopyTags=True, SourceRegion=self.region ) # Wait for encrypted snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=encrypted_snapshot_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for encrypted snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def restore_from_snapshot(self, snapshot_identifier): print(f"Restoring new encrypted database from snapshot") # Get source DB instance details source_db = self.rds_client.describe_db_instances( DBInstanceIdentifier=self.source_db_identifier )['DBInstances'][0] # Restore the encrypted instance response = self.rds_client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier=self.target_db_identifier, DBSnapshotIdentifier=snapshot_identifier, DBInstanceClass=source_db['DBInstanceClass'], VpcSecurityGroupIds=self._get_security_group_ids(source_db), DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'], PubliclyAccessible=source_db['PubliclyAccessible'], MultiAZ=source_db['MultiAZ'] ) # Wait for the new instance to be available waiter = self.rds_client.get_waiter('db_instance_available') try: waiter.wait( DBInstanceIdentifier=self.target_db_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for database restoration: {e}") raise return response['DBInstance']['DBInstanceArn'] def _get_security_group_ids(self, db_instance): return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']] def perform_encryption(self): try: # Create timestamp for unique identifiers timestamp = int(time.time()) # Step 1: Create initial snapshot snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}" self.create_snapshot(snapshot_id) # Step 2: Create encrypted copy of the snapshot encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}" self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id) # Step 3: Restore from encrypted snapshot self.restore_from_snapshot(encrypted_snapshot_id) print(f""" Encryption process completed successfully! New encrypted database instance: {self.target_db_identifier} Next steps: 1. Verify the new encrypted instance 2. Update your application connection strings 3. Once verified, you can delete the old unencrypted instance """) except Exception as e: print(f"Error during encryption process: {e}") raise def main(): # These values should ideally come from environment variables or command line arguments source_db_identifier = 'database-2' target_db_identifier = 'database-2-enc' kms_key_alias = 'aws/rds' region = 'us-east-1' migrator = RDSEncryptionMigrator( source_db_identifier=source_db_identifier, target_db_identifier=target_db_identifier, kms_key_alias=kms_key_alias, region=region ) migrator.perform_encryption() if __name__ == '__main__': main()
스크립트는 다음을 처리하는 RDSEncryptionMigrator 클래스를 정의합니다.
제공된 스크립트를 사용하면 RDS 데이터베이스의 암호화 프로세스를 자동화하고 데이터를 안전하게 보호할 수 있습니다. 이 접근 방식을 사용하면 수동 개입이 필요하지 않으며 마이그레이션 프로세스에서 인적 오류가 발생할 위험이 줄어듭니다. 암호화된 새 인스턴스를 확인하고, 애플리케이션 연결 문자열을 업데이트하고, 준비가 되면 암호화되지 않은 기존 인스턴스를 제거하세요.
이를 더 확장하려는 경우 이 스크립트를 AWS Lambda 또는 AWS Step Functions와 통합하여 CI/CD 파이프라인 내에서 프로세스를 더욱 자동화할 수 있습니다.
위 내용은 Python을 사용하여 Amazon RDS 인스턴스 암호화를 자동화하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!