> 백엔드 개발 > 파이썬 튜토리얼 > Python을 사용하여 Amazon RDS 인스턴스 암호화를 자동화하는 방법

Python을 사용하여 Amazon RDS 인스턴스 암호화를 자동화하는 방법

Linda Hamilton
풀어 주다: 2025-01-03 21:12:43
원래의
570명이 탐색했습니다.

Amazon RDS(관계형 데이터베이스 서비스)는 AWS에서 제공하는 강력하고 확장 가능한 데이터베이스 서비스이지만 규정 준수 또는 보안상의 이유로 암호화되지 않은 기존 데이터베이스 인스턴스를 암호화해야 하는 경우도 있습니다. 이 기사에서는 암호화되지 않은 Amazon RDS 인스턴스를 암호화된 인스턴스로 마이그레이션하는 프로세스를 자동화하는 Python 스크립트를 살펴보겠습니다.

RDS 인스턴스를 암호화하는 이유는 무엇입니까?

RDS 인스턴스를 암호화하면 저장 데이터가 안전하게 보호되고 PCI DSS, HIPAA 등과 같은 다양한 규정 준수 요구 사항을 충족할 수 있습니다. 암호화를 사용하면 RDS 데이터베이스의 백업, 스냅샷 및 기본 스토리지가 자동으로 암호화됩니다.

그러나 암호화되지 않은 기존 RDS 인스턴스에서는 직접 암호화를 활성화할 수 없습니다. 대신 스냅샷을 생성하고 암호화가 활성화된 해당 스냅샷을 복사한 다음 암호화된 스냅샷에서 새 RDS 인스턴스를 복원해야 합니다.

이 튜토리얼에서는 이것이 자동화됩니다.

전제조건

이 가이드를 따르려면 다음이 필요합니다.

  • AWS 계정: RDS 및 KMS(키 관리 서비스)를 관리할 권한이 있는 AWS 계정에 액세스합니다.
  • Python 3.x: 로컬 컴퓨터에 설치 및 구성됩니다.
  • Boto3: pip를 사용하여 설치할 수 있는 Python용 AWS SDK:
  pip install boto3
로그인 후 복사

다음 AWS 자격 증명도 필요합니다.

  1. AWS_ACCESS_KEY_ID
  2. AWS_SECRET_ACCESS_KEY
  3. AWS_DEFAULT_REGION

암호화 마이그레이션 프로세스

이 Python 스크립트는 다음 단계를 자동화합니다.

  1. 스냅샷 생성: 암호화되지 않은 기존 RDS 인스턴스의 스냅샷을 생성합니다.
  2. 암호화로 스냅샷 복사: AWS KMS(키 관리 서비스)를 사용하여 스냅샷의 암호화된 복사본을 생성합니다.
  3. 데이터베이스 복원: 암호화된 스냅샷에서 새 RDS 인스턴스를 생성합니다.

마이그레이션을 자동화하는 Python 스크립트

import boto3
import time
from botocore.exceptions import WaiterError

class RDSEncryptionMigrator:
    def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'):
        self.source_db_identifier = source_db_identifier
        self.target_db_identifier = target_db_identifier
        self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}'
        self.region = region

        self.rds_client = boto3.client('rds', region_name=region)
        self.kms_client = boto3.client('kms', region_name=region)

    def get_kms_key_id(self):
        """Get the KMS key ID from the alias"""
        try:
            response = self.kms_client.describe_key(
                KeyId=self.kms_key_alias
            )
            return response['KeyMetadata']['Arn']
        except Exception as e:
            print(f"Error getting KMS key ID from alias: {e}")
            raise

    def create_snapshot(self, snapshot_identifier):
        print(f"Creating snapshot of source database: {self.source_db_identifier}")
        response = self.rds_client.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceIdentifier=self.source_db_identifier
        )

        # Wait for snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=snapshot_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id):
        print("Creating encrypted copy of snapshot")
        kms_key_id = self.get_kms_key_id()
        response = self.rds_client.copy_db_snapshot(
            SourceDBSnapshotIdentifier=source_snapshot_id,
            TargetDBSnapshotIdentifier=encrypted_snapshot_id,
            KmsKeyId=kms_key_id,
            CopyTags=True,
            SourceRegion=self.region
        )

        # Wait for encrypted snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=encrypted_snapshot_id,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for encrypted snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def restore_from_snapshot(self, snapshot_identifier):
        print(f"Restoring new encrypted database from snapshot")

        # Get source DB instance details
        source_db = self.rds_client.describe_db_instances(
            DBInstanceIdentifier=self.source_db_identifier
        )['DBInstances'][0]

        # Restore the encrypted instance
        response = self.rds_client.restore_db_instance_from_db_snapshot(
            DBInstanceIdentifier=self.target_db_identifier,
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceClass=source_db['DBInstanceClass'],
            VpcSecurityGroupIds=self._get_security_group_ids(source_db),
            DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'],
            PubliclyAccessible=source_db['PubliclyAccessible'],
            MultiAZ=source_db['MultiAZ']
        )

        # Wait for the new instance to be available
        waiter = self.rds_client.get_waiter('db_instance_available')
        try:
            waiter.wait(
                DBInstanceIdentifier=self.target_db_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for database restoration: {e}")
            raise

        return response['DBInstance']['DBInstanceArn']

    def _get_security_group_ids(self, db_instance):
        return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']]

    def perform_encryption(self):
        try:
            # Create timestamp for unique identifiers
            timestamp = int(time.time())

            # Step 1: Create initial snapshot
            snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}"
            self.create_snapshot(snapshot_id)

            # Step 2: Create encrypted copy of the snapshot
            encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}"
            self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id)

            # Step 3: Restore from encrypted snapshot
            self.restore_from_snapshot(encrypted_snapshot_id)

            print(f"""
            Encryption process completed successfully!
            New encrypted database instance: {self.target_db_identifier}

            Next steps:
            1. Verify the new encrypted instance
            2. Update your application connection strings
            3. Once verified, you can delete the old unencrypted instance
            """)

        except Exception as e:
            print(f"Error during encryption process: {e}")
            raise

def main():
    # These values should ideally come from environment variables or command line arguments
    source_db_identifier = 'database-2'
    target_db_identifier = 'database-2-enc'
    kms_key_alias = 'aws/rds'
    region = 'us-east-1'

    migrator = RDSEncryptionMigrator(
        source_db_identifier=source_db_identifier,
        target_db_identifier=target_db_identifier,
        kms_key_alias=kms_key_alias,
        region=region
    )

    migrator.perform_encryption()

if __name__ == '__main__':
    main()
로그인 후 복사

스크립트 작동 방식

스크립트는 다음을 처리하는 RDSEncryptionMigrator 클래스를 정의합니다.

  1. 스냅샷 생성: 소스 데이터베이스의 스냅샷이 생성됩니다.
  2. 암호화된 스냅샷 복사: 제공된 KMS 키 별칭을 사용하여 스냅샷이 복사되고 암호화됩니다.
  3. 데이터베이스 복원: 암호화된 스냅샷은 새 RDS 인스턴스를 복원하는 데 사용됩니다.

결론

제공된 스크립트를 사용하면 RDS 데이터베이스의 암호화 프로세스를 자동화하고 데이터를 안전하게 보호할 수 있습니다. 이 접근 방식을 사용하면 수동 개입이 필요하지 않으며 마이그레이션 프로세스에서 인적 오류가 발생할 위험이 줄어듭니다. 암호화된 새 인스턴스를 확인하고, 애플리케이션 연결 문자열을 업데이트하고, 준비가 되면 암호화되지 않은 기존 인스턴스를 제거하세요.

이를 더 확장하려는 경우 이 스크립트를 AWS Lambda 또는 AWS Step Functions와 통합하여 CI/CD 파이프라인 내에서 프로세스를 더욱 자동화할 수 있습니다.

How to Automate the Encryption of an Amazon RDS Instance with Python

위 내용은 Python을 사용하여 Amazon RDS 인스턴스 암호화를 자동화하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
저자별 최신 기사
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿