我们一直在致力于一门新的 AI 数据课程,向您展示如何通过将电子商务数据从 Stripe 移动到在 Supabase 上运行的 PGVector 来构建 AI 聊天机器人,通过 Airbyte PGVector 连接器创建 OpenAI 嵌入,使用 OpenAI 客户端库将自然语言支持添加到应用程序中。这是我们的许多客户正在实施的一种非常常见的“智能数据堆栈”应用程序模式。源和目的地可能会发生变化,但模式(数据源 > 移动数据并创建嵌入 > 支持矢量的数据存储 > 使用 OpenAI 的 Web 应用程序)保持不变。
由于我们正在开发旨在让人们上手的课程,因此我们希望让设置尽可能简单。其中很大一部分是在 Stripe 中创建足够的测试数据,这样就有一个合理的数据集供聊天机器人与之交互。如果您以前使用过 Stripe,您就会知道他们有一个很棒的沙盒,您可以在其中进行试验。唯一的问题是它没有预先加载示例数据。
您可以通过 CLI 设备命令加载一些示例数据集。但是,对于我们的使用来说,这些不符合需要。我们想要一个更大的数据集,并且由于这些材料将在网上和研讨会中使用,因此要求学习者在他们的本地计算机上安装一些东西,比如 CLI,会让你面临一大堆复杂的处理。您永远不知道用户正在运行什么操作系统版本,他们是否拥有正确的安装权限等等。我已经被烧伤太多次了,无法走这条路。
值得庆幸的是,Stripe 还拥有出色的 API 和出色的 Python 客户端,这意味着我们可以快速创建协作笔记本,供学习者运行和插入我们想要的数据。
通过 !pip install stripe 安装 stripe 库并使用 Google Collab 密钥传递测试密钥后,我们必须为客户和产品设置一些随机名称。目标是插入随机集合的客户、不同价格的产品和购买情况。这样,当我们向聊天机器人询问诸如“谁购买的商品最便宜?他们支付了多少钱?他们买了什么?”之类的问题时,就会这样。有足够的数据。
import stripe import random from google.colab import userdata stripe.api_key = userdata.get('STRIPE_TEST_KEY') # Sample data for generating random names first_names = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Hank", "Ivy", "Jack", "Quinton", "Akriti", "Justin", "Marcos"] last_names = ["Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor", "Wall", "Chau", "Keswani", "Marx"] # Sample clothing product names clothing_names = [ "T-Shirt", "Jeans", "Jacket", "Sweater", "Hoodie", "Shorts", "Dress", "Blouse", "Skirt", "Pants", "Shoes", "Sandals", "Sneakers", "Socks", "Hat", "Scarf", "Gloves", "Coat", "Belt", "Tie", "Tank Top", "Cardigan", "Overalls", "Tracksuit", "Polo Shirt", "Cargo Pants", "Capris", "Dungarees", "Boots", "Cufflinks", "Raincoat", "Peacoat", "Blazer", "Slippers", "Underwear", "Leggings", "Windbreaker", "Tracksuit Bottoms", "Beanie", "Bikini" ] # List of random colors colors = [ "Red", "Blue", "Green", "Yellow", "Black", "White", "Gray", "Pink", "Purple", "Orange", "Brown", "Teal", "Navy", "Maroon", "Gold", "Silver", "Beige", "Lavender", "Turquoise", "Coral" ]
接下来,是时候为我们需要的 Stripe 中的每种数据类型添加函数了。
# Function to create sample customers with random names def create_customers(count=5): customers = [] for _ in range(count): first_name = random.choice(first_names) last_name = random.choice(last_names) name = f"{first_name} {last_name}" email = f"{first_name.lower()}.{last_name.lower()}@example.com" customer = stripe.Customer.create( name=name, email=email, description="Sample customer for testing" ) customers.append(customer) print(f"Created Customer: {customer['name']} (ID: {customer['id']})") return customers # Function to create sample products with random clothing names and colors def create_products(count=3): products = [] for _ in range(count): color = random.choice(colors) product_name = random.choice(clothing_names) full_name = f"{color} {product_name}" product = stripe.Product.create( name=full_name, description=f"This is a {color.lower()} {product_name.lower()}" ) products.append(product) print(f"Created Product: {product['name']} (ID: {product['id']})") return products # Function to create prices for the products with random unit_amount def create_prices(products, min_price=500, max_price=5000): prices = [] for product in products: unit_amount = random.randint(min_price, max_price) # Random amount in cents price = stripe.Price.create( unit_amount=unit_amount, currency="usd", product=product['id'] ) prices.append(price) print(f"Created Price: ${unit_amount / 100:.2f} for Product {product['name']} (ID: {price['id']})") return prices # Function to create random purchases for each customer def create_purchases(customers, prices, max_purchases_per_customer=5): purchases = [] for customer in customers: num_purchases = random.randint(1, max_purchases_per_customer) # Random number of purchases per customer for _ in range(num_purchases): price = random.choice(prices) # Randomly select a product's price purchase = stripe.PaymentIntent.create( amount=price['unit_amount'], # Amount in cents currency=price['currency'], customer=customer['id'], payment_method_types=["card"], # Simulate card payment description=f"Purchase of {price['product']} by {customer['name']}" ) purchases.append(purchase) print(f"Created Purchase for Customer {customer['name']} (Amount: ${price['unit_amount'] / 100:.2f})") return purchases
剩下的就是运行脚本并指定我们需要多少数据。
# Main function to create sample data def main(): print("Creating sample customers with random names...") customers = create_customers(count=20) print("\nCreating sample products with random clothing names and colors...") products = create_products(count=30) print("\nCreating prices for products with random amounts...") prices = create_prices(products, min_price=500, max_price=5000) print("\nCreating random purchases for each customer...") purchases = create_purchases(customers, prices, max_purchases_per_customer=10) print("\nSample data creation complete!") print(f"Created {len(customers)} customers, {len(products)} products, and {len(purchases)} purchases.") if __name__ == "__main__": main()
将数据加载到我们的 Stripe Sandbox 中后,通过使用 Connector Builder 将 API 端点映射到每种数据类型的流并设置同步作业,将其连接到 Airbyte 只需几分钟。
问题解决了!我们的 Collab Python 脚本对于学习者来说非常容易将测试数据插入到 Stripe 中。希望对其他进行类似测试的人有所帮助。
以上是在 Python 中创建 Stripe 测试数据的详细内容。更多信息请关注PHP中文网其他相关文章!