ホームページ > バックエンド開発 > Python チュートリアル > Python を使用して Amazon RDS インスタンスの暗号化を自動化する方法

Python を使用して Amazon RDS インスタンスの暗号化を自動化する方法

Linda Hamilton
リリース: 2025-01-03 21:12:43
オリジナル
570 人が閲覧しました

Amazon RDS (リレーショナル データベース サービス) は、AWS が提供する強力でスケーラブルなデータベース サービスですが、コンプライアンスまたはセキュリティ上の理由から、暗号化されていない既存のデータベース インスタンスを暗号化する必要がある場合があります。この記事では、暗号化されていない Amazon RDS インスタンスを暗号化された Amazon RDS インスタンスに移行するプロセスを自動化する Python スクリプトについて説明します。

RDS インスタンスを暗号化する理由

RDS インスタンスの暗号化により、保存データの安全性が確保され、PCI DSS、HIPAA などのさまざまなコンプライアンス要件が満たされます。暗号化により、バックアップ、スナップショット、RDS データベースの基盤となるストレージが自動的に暗号化されます。

ただし、既存の暗号化されていない RDS インスタンスの暗号化を直接有効にすることはできません。代わりに、スナップショットを作成し、暗号化を有効にしてそのスナップショットをコピーし、暗号化されたスナップショットから新しい RDS インスタンスを復元する必要があります。

これがこのチュートリアルで自動化するものです。

前提条件

このガイドに従うには、次のものが必要です。

  • AWS アカウント: RDS および KMS (キー管理サービス) を管理する権限を持つ AWS アカウントへのアクセス。
  • Python 3.x: ローカル マシンにインストールされ、構成されています。
  • Boto3: AWS SDK for Python。pip を使用してインストールできます。
  pip install boto3
ログイン後にコピー

次の AWS 認証情報も必要です:

  1. AWS_ACCESS_KEY_ID
  2. AWS_SECRET_ACCESS_KEY
  3. AWS_DEFAULT_REGION

暗号化の移行プロセス

この Python スクリプトは次の手順を自動化します:

  1. スナップショットの作成: 既存の暗号化されていない RDS インスタンスのスナップショットを作成します。
  2. 暗号化してスナップショットをコピー: AWS KMS (キー管理サービス) を使用して、スナップショットの暗号化されたコピーを作成します。
  3. データベースの復元: 暗号化されたスナップショットから新しい RDS インスタンスを作成します。

移行を自動化するための Python スクリプト

import boto3
import time
from botocore.exceptions import WaiterError

class RDSEncryptionMigrator:
    def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'):
        self.source_db_identifier = source_db_identifier
        self.target_db_identifier = target_db_identifier
        self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}'
        self.region = region

        self.rds_client = boto3.client('rds', region_name=region)
        self.kms_client = boto3.client('kms', region_name=region)

    def get_kms_key_id(self):
        """Get the KMS key ID from the alias"""
        try:
            response = self.kms_client.describe_key(
                KeyId=self.kms_key_alias
            )
            return response['KeyMetadata']['Arn']
        except Exception as e:
            print(f"Error getting KMS key ID from alias: {e}")
            raise

    def create_snapshot(self, snapshot_identifier):
        print(f"Creating snapshot of source database: {self.source_db_identifier}")
        response = self.rds_client.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceIdentifier=self.source_db_identifier
        )

        # Wait for snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=snapshot_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id):
        print("Creating encrypted copy of snapshot")
        kms_key_id = self.get_kms_key_id()
        response = self.rds_client.copy_db_snapshot(
            SourceDBSnapshotIdentifier=source_snapshot_id,
            TargetDBSnapshotIdentifier=encrypted_snapshot_id,
            KmsKeyId=kms_key_id,
            CopyTags=True,
            SourceRegion=self.region
        )

        # Wait for encrypted snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=encrypted_snapshot_id,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for encrypted snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def restore_from_snapshot(self, snapshot_identifier):
        print(f"Restoring new encrypted database from snapshot")

        # Get source DB instance details
        source_db = self.rds_client.describe_db_instances(
            DBInstanceIdentifier=self.source_db_identifier
        )['DBInstances'][0]

        # Restore the encrypted instance
        response = self.rds_client.restore_db_instance_from_db_snapshot(
            DBInstanceIdentifier=self.target_db_identifier,
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceClass=source_db['DBInstanceClass'],
            VpcSecurityGroupIds=self._get_security_group_ids(source_db),
            DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'],
            PubliclyAccessible=source_db['PubliclyAccessible'],
            MultiAZ=source_db['MultiAZ']
        )

        # Wait for the new instance to be available
        waiter = self.rds_client.get_waiter('db_instance_available')
        try:
            waiter.wait(
                DBInstanceIdentifier=self.target_db_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for database restoration: {e}")
            raise

        return response['DBInstance']['DBInstanceArn']

    def _get_security_group_ids(self, db_instance):
        return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']]

    def perform_encryption(self):
        try:
            # Create timestamp for unique identifiers
            timestamp = int(time.time())

            # Step 1: Create initial snapshot
            snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}"
            self.create_snapshot(snapshot_id)

            # Step 2: Create encrypted copy of the snapshot
            encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}"
            self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id)

            # Step 3: Restore from encrypted snapshot
            self.restore_from_snapshot(encrypted_snapshot_id)

            print(f"""
            Encryption process completed successfully!
            New encrypted database instance: {self.target_db_identifier}

            Next steps:
            1. Verify the new encrypted instance
            2. Update your application connection strings
            3. Once verified, you can delete the old unencrypted instance
            """)

        except Exception as e:
            print(f"Error during encryption process: {e}")
            raise

def main():
    # These values should ideally come from environment variables or command line arguments
    source_db_identifier = 'database-2'
    target_db_identifier = 'database-2-enc'
    kms_key_alias = 'aws/rds'
    region = 'us-east-1'

    migrator = RDSEncryptionMigrator(
        source_db_identifier=source_db_identifier,
        target_db_identifier=target_db_identifier,
        kms_key_alias=kms_key_alias,
        region=region
    )

    migrator.perform_encryption()

if __name__ == '__main__':
    main()
ログイン後にコピー

スクリプトの仕組み

スクリプトは、以下を処理するクラス RDSEncryptionMigrator を定義します。

  1. スナップショットの作成: ソース データベースのスナップショットが作成されます。
  2. 暗号化されたスナップショット コピー: スナップショットは、提供された KMS キー エイリアスを使用してコピーされ、暗号化されます。
  3. データベースの復元: 暗号化されたスナップショットは、新しい RDS インスタンスを復元するために使用されます。

結論

提供されたスクリプトを使用すると、RDS データベースの暗号化プロセスを自動化し、データの安全性を確保できます。このアプローチにより、手動介入の必要性がなくなり、移行プロセスにおける人的エラーのリスクが軽減されます。新しい暗号化されたインスタンスを確認し、アプリケーションの接続文字列を更新し、準備ができたら古い暗号化されていないインスタンスを削除してください。

これをさらに拡張したい場合は、このスクリプトを AWS Lambda または AWS Step Functions と統合して、CI/CD パイプライン内のプロセスをさらに自動化できます。

How to Automate the Encryption of an Amazon RDS Instance with Python

以上がPython を使用して Amazon RDS インスタンスの暗号化を自動化する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート