Amazon RDS (リレーショナル データベース サービス) は、AWS が提供する強力でスケーラブルなデータベース サービスですが、コンプライアンスまたはセキュリティ上の理由から、暗号化されていない既存のデータベース インスタンスを暗号化する必要がある場合があります。この記事では、暗号化されていない Amazon RDS インスタンスを暗号化された Amazon RDS インスタンスに移行するプロセスを自動化する Python スクリプトについて説明します。
RDS インスタンスの暗号化により、保存データの安全性が確保され、PCI DSS、HIPAA などのさまざまなコンプライアンス要件が満たされます。暗号化により、バックアップ、スナップショット、RDS データベースの基盤となるストレージが自動的に暗号化されます。
ただし、既存の暗号化されていない RDS インスタンスの暗号化を直接有効にすることはできません。代わりに、スナップショットを作成し、暗号化を有効にしてそのスナップショットをコピーし、暗号化されたスナップショットから新しい RDS インスタンスを復元する必要があります。
これがこのチュートリアルで自動化するものです。
このガイドに従うには、次のものが必要です。
pip install boto3
次の AWS 認証情報も必要です:
この Python スクリプトは次の手順を自動化します:
import boto3 import time from botocore.exceptions import WaiterError class RDSEncryptionMigrator: def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'): self.source_db_identifier = source_db_identifier self.target_db_identifier = target_db_identifier self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}' self.region = region self.rds_client = boto3.client('rds', region_name=region) self.kms_client = boto3.client('kms', region_name=region) def get_kms_key_id(self): """Get the KMS key ID from the alias""" try: response = self.kms_client.describe_key( KeyId=self.kms_key_alias ) return response['KeyMetadata']['Arn'] except Exception as e: print(f"Error getting KMS key ID from alias: {e}") raise def create_snapshot(self, snapshot_identifier): print(f"Creating snapshot of source database: {self.source_db_identifier}") response = self.rds_client.create_db_snapshot( DBSnapshotIdentifier=snapshot_identifier, DBInstanceIdentifier=self.source_db_identifier ) # Wait for snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=snapshot_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id): print("Creating encrypted copy of snapshot") kms_key_id = self.get_kms_key_id() response = self.rds_client.copy_db_snapshot( SourceDBSnapshotIdentifier=source_snapshot_id, TargetDBSnapshotIdentifier=encrypted_snapshot_id, KmsKeyId=kms_key_id, CopyTags=True, SourceRegion=self.region ) # Wait for encrypted snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=encrypted_snapshot_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for encrypted snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def restore_from_snapshot(self, snapshot_identifier): print(f"Restoring new encrypted database from snapshot") # Get source DB instance details source_db = self.rds_client.describe_db_instances( DBInstanceIdentifier=self.source_db_identifier )['DBInstances'][0] # Restore the encrypted instance response = self.rds_client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier=self.target_db_identifier, DBSnapshotIdentifier=snapshot_identifier, DBInstanceClass=source_db['DBInstanceClass'], VpcSecurityGroupIds=self._get_security_group_ids(source_db), DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'], PubliclyAccessible=source_db['PubliclyAccessible'], MultiAZ=source_db['MultiAZ'] ) # Wait for the new instance to be available waiter = self.rds_client.get_waiter('db_instance_available') try: waiter.wait( DBInstanceIdentifier=self.target_db_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for database restoration: {e}") raise return response['DBInstance']['DBInstanceArn'] def _get_security_group_ids(self, db_instance): return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']] def perform_encryption(self): try: # Create timestamp for unique identifiers timestamp = int(time.time()) # Step 1: Create initial snapshot snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}" self.create_snapshot(snapshot_id) # Step 2: Create encrypted copy of the snapshot encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}" self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id) # Step 3: Restore from encrypted snapshot self.restore_from_snapshot(encrypted_snapshot_id) print(f""" Encryption process completed successfully! New encrypted database instance: {self.target_db_identifier} Next steps: 1. Verify the new encrypted instance 2. Update your application connection strings 3. Once verified, you can delete the old unencrypted instance """) except Exception as e: print(f"Error during encryption process: {e}") raise def main(): # These values should ideally come from environment variables or command line arguments source_db_identifier = 'database-2' target_db_identifier = 'database-2-enc' kms_key_alias = 'aws/rds' region = 'us-east-1' migrator = RDSEncryptionMigrator( source_db_identifier=source_db_identifier, target_db_identifier=target_db_identifier, kms_key_alias=kms_key_alias, region=region ) migrator.perform_encryption() if __name__ == '__main__': main()
スクリプトは、以下を処理するクラス RDSEncryptionMigrator を定義します。
提供されたスクリプトを使用すると、RDS データベースの暗号化プロセスを自動化し、データの安全性を確保できます。このアプローチにより、手動介入の必要性がなくなり、移行プロセスにおける人的エラーのリスクが軽減されます。新しい暗号化されたインスタンスを確認し、アプリケーションの接続文字列を更新し、準備ができたら古い暗号化されていないインスタンスを削除してください。
これをさらに拡張したい場合は、このスクリプトを AWS Lambda または AWS Step Functions と統合して、CI/CD パイプライン内のプロセスをさらに自動化できます。
以上がPython を使用して Amazon RDS インスタンスの暗号化を自動化する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。