Amazon RDS (Perkhidmatan Pangkalan Data Perhubungan) ialah perkhidmatan pangkalan data yang berkuasa dan berskala yang ditawarkan oleh AWS, tetapi kadangkala, atas sebab pematuhan atau keselamatan, anda perlu menyulitkan contoh pangkalan data yang tidak disulitkan sedia ada. Dalam artikel ini, kami akan menelusuri skrip Python yang mengautomasikan proses pemindahan tika Amazon RDS yang tidak disulitkan kepada yang disulitkan.
Penyulitan tika RDS memastikan data dalam keadaan tenang adalah selamat dan memenuhi pelbagai keperluan pematuhan, seperti PCI DSS, HIPAA dan banyak lagi. Penyulitan memastikan sandaran, syot kilat dan storan asas pangkalan data RDS anda disulitkan secara automatik.
Walau bagaimanapun, anda tidak boleh mendayakan penyulitan pada tika RDS yang tidak disulitkan secara langsung. Sebaliknya, anda mesti membuat syot kilat, salin syot kilat itu dengan penyulitan didayakan, dan kemudian pulihkan tika RDS baharu daripada syot kilat yang disulitkan.
Inilah yang akan kami automatikkan dalam tutorial ini.
Untuk mengikuti panduan ini, anda memerlukan:
pip install boto3
Anda juga memerlukan bukti kelayakan AWS berikut:
Skrip Python ini mengautomasikan langkah berikut:
import boto3 import time from botocore.exceptions import WaiterError class RDSEncryptionMigrator: def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'): self.source_db_identifier = source_db_identifier self.target_db_identifier = target_db_identifier self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}' self.region = region self.rds_client = boto3.client('rds', region_name=region) self.kms_client = boto3.client('kms', region_name=region) def get_kms_key_id(self): """Get the KMS key ID from the alias""" try: response = self.kms_client.describe_key( KeyId=self.kms_key_alias ) return response['KeyMetadata']['Arn'] except Exception as e: print(f"Error getting KMS key ID from alias: {e}") raise def create_snapshot(self, snapshot_identifier): print(f"Creating snapshot of source database: {self.source_db_identifier}") response = self.rds_client.create_db_snapshot( DBSnapshotIdentifier=snapshot_identifier, DBInstanceIdentifier=self.source_db_identifier ) # Wait for snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=snapshot_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id): print("Creating encrypted copy of snapshot") kms_key_id = self.get_kms_key_id() response = self.rds_client.copy_db_snapshot( SourceDBSnapshotIdentifier=source_snapshot_id, TargetDBSnapshotIdentifier=encrypted_snapshot_id, KmsKeyId=kms_key_id, CopyTags=True, SourceRegion=self.region ) # Wait for encrypted snapshot to be available waiter = self.rds_client.get_waiter('db_snapshot_available') try: waiter.wait( DBSnapshotIdentifier=encrypted_snapshot_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for encrypted snapshot: {e}") raise return response['DBSnapshot']['DBSnapshotArn'] def restore_from_snapshot(self, snapshot_identifier): print(f"Restoring new encrypted database from snapshot") # Get source DB instance details source_db = self.rds_client.describe_db_instances( DBInstanceIdentifier=self.source_db_identifier )['DBInstances'][0] # Restore the encrypted instance response = self.rds_client.restore_db_instance_from_db_snapshot( DBInstanceIdentifier=self.target_db_identifier, DBSnapshotIdentifier=snapshot_identifier, DBInstanceClass=source_db['DBInstanceClass'], VpcSecurityGroupIds=self._get_security_group_ids(source_db), DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'], PubliclyAccessible=source_db['PubliclyAccessible'], MultiAZ=source_db['MultiAZ'] ) # Wait for the new instance to be available waiter = self.rds_client.get_waiter('db_instance_available') try: waiter.wait( DBInstanceIdentifier=self.target_db_identifier, WaiterConfig={'Delay': 30, 'MaxAttempts': 60} ) except WaiterError as e: print(f"Error waiting for database restoration: {e}") raise return response['DBInstance']['DBInstanceArn'] def _get_security_group_ids(self, db_instance): return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']] def perform_encryption(self): try: # Create timestamp for unique identifiers timestamp = int(time.time()) # Step 1: Create initial snapshot snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}" self.create_snapshot(snapshot_id) # Step 2: Create encrypted copy of the snapshot encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}" self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id) # Step 3: Restore from encrypted snapshot self.restore_from_snapshot(encrypted_snapshot_id) print(f""" Encryption process completed successfully! New encrypted database instance: {self.target_db_identifier} Next steps: 1. Verify the new encrypted instance 2. Update your application connection strings 3. Once verified, you can delete the old unencrypted instance """) except Exception as e: print(f"Error during encryption process: {e}") raise def main(): # These values should ideally come from environment variables or command line arguments source_db_identifier = 'database-2' target_db_identifier = 'database-2-enc' kms_key_alias = 'aws/rds' region = 'us-east-1' migrator = RDSEncryptionMigrator( source_db_identifier=source_db_identifier, target_db_identifier=target_db_identifier, kms_key_alias=kms_key_alias, region=region ) migrator.perform_encryption() if __name__ == '__main__': main()
Skrip mentakrifkan kelas RDSEncryptionMigrator yang mengendalikan:
Dengan menggunakan skrip yang disediakan, anda boleh mengautomasikan proses penyulitan untuk pangkalan data RDS anda dan memastikan data anda selamat. Pendekatan ini menghapuskan keperluan untuk campur tangan manual dan mengurangkan risiko kesilapan manusia dalam proses migrasi. Pastikan anda mengesahkan tika yang disulitkan baharu, mengemas kini rentetan sambungan aplikasi anda dan mengalih keluar tika yang tidak disulitkan lama setelah anda bersedia.
Jika anda ingin menambah skala ini, anda boleh menyepadukan skrip ini dengan AWS Lambda atau AWS Step Functions untuk mengautomasikan proses lebih jauh dalam saluran paip CI/CD anda.
Atas ialah kandungan terperinci Cara Mengautomasikan Penyulitan Instance Amazon RDS dengan Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!