目录
正确答案
首页 后端开发 Python教程 cosmosdb 的计时器触发器无法正常工作

cosmosdb 的计时器触发器无法正常工作

Feb 22, 2024 pm 12:40 PM

cosmosdb 的计时器触发器无法正常工作

问题内容

我对我的函数应用“timertrigger”有疑问。

我开发了此功能来与 telegram 机器人进行通信,以便在 api 请求后发送消息。

我在本地尝试过该功能应用程序,效果很好。但是,当我尝试使用 cosmosdb 存储信息时,遇到问题并且无法保存信息。

我已经设置了将我的应用程序与 telegram 和 cosmosdb 连接所需的所有变量和内容

try:
        database_obj  = client.get_database_client(database_name)
        await database_obj.read()
        return database_obj
    except exceptions.cosmosresourcenotfounderror:
        print("creating database")
        return await client.create_database(database_name)
# </create_database_if_not_exists>
    
# create a container
# using a good partition key improves the performance of database operations.
# <create_container_if_not_exists>
async def get_or_create_container(database_obj, container_name):
    try:        
        todo_items_container = database_obj.get_container_client(container_name)
        await todo_items_container.read()   
        return todo_items_container
    except exceptions.cosmosresourcenotfounderror:
        print("creating container with lastname as partition key")
        return await database_obj.create_container(
            id=container_name,
            partition_key=partitionkey(path="/lastname"),
            offer_throughput=400)
    except exceptions.cosmoshttpresponseerror:
        raise
# </create_container_if_not_exists>

async def populate_container_items(container_obj, items_to_create):
    # add items to the container
    family_items_to_create = items_to_create
    # <create_item>
    for family_item in family_items_to_create:
        inserted_item = await container_obj.create_item(body=family_item)
        print("inserted item for %s family. item id: %s" %(inserted_item['lastname'], inserted_item['id']))
    # </create_item>
# </method_populate_container_items>

async def read_items(container_obj, items_to_read):
    # read items (key value lookups by partition key and id, aka point reads)
    # <read_item>
    for family in items_to_read:
        item_response = await container_obj.read_item(item=family['id'], partition_key=family['lastname'])
        request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
        print('read item with id {0}. operation consumed {1} request units'.format(item_response['id'], (request_charge)))
    # </read_item>
# </method_read_items>

# <method_query_items>
async def query_items(container_obj, query_text):
    # enable_cross_partition_query should be set to true as the container is partitioned
    # in this case, we do have to await the asynchronous iterator object since logic
    # within the query_items() method makes network calls to verify the partition key
    # definition in the container
    # <query_items>
    query_items_response = container_obj.query_items(
        query=query_text,
        enable_cross_partition_query=true
    )
    request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
    items = [item async for item in query_items_response]
    print('query returned {0} items. operation consumed {1} request units'.format(len(items), request_charge))
    # </query_items>
# </method_query_items>

async def run_sample():
    print('aaaa')
    print('sss {0}'.format(cosmosclient(endpoint,credential=key)))
    async with cosmosclient(endpoint, credential = key) as client:
        print('connected to db')
        try:
            database_obj = await get_or_create_db(client, database_name)
            # create a container
            container_obj = await get_or_create_container(database_obj, container_name)
            family_items_to_create = ["link", "ss", "s", "s"]
            await populate_container_items(container_obj, family_items_to_create)
            await read_items(container_obj, family_items_to_create)
            # query these items using the sql query syntax. 
            # specifying the partition key value in the query allows cosmos db to retrieve data only from the relevant partitions, which improves performance
            query = "select * from c "
            await query_items(container_obj, query)   
        except exceptions.cosmoshttpresponseerror as e:
            print('\nrun_sample has caught an error. {0}'.format(e.message))
        finally:
            print("\nquickstart complete")

async def main(mytimer: func.timerrequest) -> none:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    
    asyncio.create_task(run_sample())
    logging.info(' sono partito')
    sendnews()
    if mytimer.past_due:
        logging.info('the timer is past due!')

    logging.info('python timer trigger function ran at %s', utc_timestamp)
登录后复制

我已经开始我的功能

func host start --port 7072
登录后复制

但我认为与数据库的连接出了问题,因为 console.log('connected to db') 没有被打印。

似乎所有与cosmosdb相关的操作都没有执行,如果有错误不知道如何解决。

我的终端中没有任何错误,但正如我所说,cosmosdb 似乎不起作用。

我不确定是否向您提供了所有必要的信息。感谢您的帮助。


正确答案


我在使用异步函数时也遇到了同样的问题。当我使用非异步函数时,它对我有用。

参考请查看此 document

我的代码: timetrigger1/__init__.py

import datetime
import logging
import asyncio
import azure.functions as func
from azure.cosmos import cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import partitionkey

endpoint = "https://timercosmosdb.documents.azure.com/"
key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
database_name = "todolist"
container_name = "test"

def get_or_create_db(client,database_name):
    try:
        database_obj  = client.get_database_client(database_name)
        database_obj.read()
        return database_obj
    except exceptions.cosmosresourcenotfounderror:
        logging.info("creating database")
        return client.create_database_if_not_exists(database_name)
    

def get_or_create_container(database_obj, container_name):
    try:        
        todo_items_container = database_obj.get_container_client(container_name)
        todo_items_container.read()   
        return todo_items_container
    except exceptions.cosmosresourcenotfounderror:
        logging.info("creating container with lastname as partition key")
        return database_obj.create_container_if_not_exists(
            id=container_name,
            partition_key=partitionkey(path="/id"),
            offer_throughput=400)
    except exceptions.cosmoshttpresponseerror:
        raise


def populate_container_items(container_obj,items):
    inserted_item = container_obj.create_item(body=items)
    logging.info("inserted item for %s family. item id: %s" %(inserted_item['lastname'], inserted_item['id']))

def read_items(container_obj,id):
        item_response = container_obj.read_item(item=id, partition_key=id)
        request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
        logging.info('read item with id {0}. operation consumed {1} request units'.format(item_response['id'], (request_charge)))

def query_items(container_obj, query_text):
    query_items_response = container_obj.query_items(
        query=query_text,
        enable_cross_partition_query=true
    )
    request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
    items = [item for item in query_items_response]
    logging.info('query returned {0} items. operation consumed {1} request units'.format(len(items), request_charge))

def run_sample():
    logging.info('aaaa')
    client = cosmos_client.cosmosclient(endpoint, key)
    logging.info('connected to db')
    try:
        id= "test"
        database_obj = get_or_create_db(client,database_name)

        container_obj = get_or_create_container(database_obj,container_name)
        item_dict = {
                "id": id,
                "lastname": "shandilya",
                "firstname": "vivek",
                "gender": "male",
                "age": 35
            }
        populate_container_items(container_obj,item_dict)
        read_items(container_obj,id)

        query = "select * from c "
        query_items(container_obj, query)   
    except exceptions.cosmoshttpresponseerror as e:
        logging.info('\nrun_sample has caught an error. {0}'.format(e.message))
    finally:
        logging.info("\nquickstart complete")

def main(mytimer: func.timerrequest) -> none:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    
    run_sample()
    logging.info(' sono partito')
    logging.info('python timer trigger function ran at %s', utc_timestamp)
登录后复制

output

functions:

        timertrigger1: timertrigger

for detailed output, run func with --verbose flag.
[2024-01-30t09:00:24.818z] executing 'functions.timertrigger1' (reason='timer fired at 2024-01-30t14:30:24.7842979+05:30', id=5499e180-4964-4d7e-b9f2-b024860945dd)
[2024-01-30t09:00:24.822z] trigger details: unscheduledinvocationreason: ispastdue, originalschedule: 2024-01-30t14:30:00.0000000+05:30
[2024-01-30t09:00:25.022z] aaaa
[2024-01-30t09:00:26.387z] connected to db
[2024-01-30t09:00:28.212z] inserted item for shandilya family. item id: test
[2024-01-30t09:00:28.373z] read item with id test. operation consumed 1 request units
[2024-01-30t09:00:28.546z]
quickstart complete
[2024-01-30t09:00:28.548z] python timer trigger function ran at 2024-01-30t09:00:25.008468+00:00
[2024-01-30t09:00:28.547z]  sono partito
[2024-01-30t09:00:28.546z] query returned 1 items. operation consumed 1 request units
[2024-01-30t09:00:28.592z] executed 'functions.timertrigger1' (succeeded, id=5499e180-4964-4d7e-b9f2-b024860945dd, duration=3793ms)
[2024-01-30t09:00:29.296z] host lock lease acquired by instance id '000000000000000000000000aae5f384'.
登录后复制

{
    "id": "test",
    "lastName": "Shandilya",
    "firstName": "Vivek",
    "gender": "male",
    "age": 35,
    "_rid": "ey58AO9yWqwCAAAAAAAAAA==",
    "_self": "dbs/ey58AA==/colls/ey58AO9yWqw=/docs/ey58AO9yWqwCAAAAAAAAAA==/",
    "_etag": "\"01001327-0000-1a00-0000-65b8baac0000\"",
    "_attachments": "attachments/",
    "_ts": 1706605228
}
登录后复制

以上是cosmosdb 的计时器触发器无法正常工作的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

如何解决Linux终端中查看Python版本时遇到的权限问题? 如何解决Linux终端中查看Python版本时遇到的权限问题? Apr 01, 2025 pm 05:09 PM

Linux终端中查看Python版本时遇到权限问题的解决方法当你在Linux终端中尝试查看Python的版本时,输入python...

如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础? 如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础? Apr 02, 2025 am 07:18 AM

如何在10小时内教计算机小白编程基础?如果你只有10个小时来教计算机小白一些编程知识,你会选择教些什么�...

如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到? 如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere进行中间人读取时如何避免被检测到当你使用FiddlerEverywhere...

在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中? 在Python中如何高效地将一个DataFrame的整列复制到另一个结构不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas库时,如何在两个结构不同的DataFrame之间进行整列复制是一个常见的问题。假设我们有两个Dat...

Uvicorn是如何在没有serve_forever()的情况下持续监听HTTP请求的? Uvicorn是如何在没有serve_forever()的情况下持续监听HTTP请求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持续监听HTTP请求的?Uvicorn是一个基于ASGI的轻量级Web服务器,其核心功能之一便是监听HTTP请求并进�...

Python中如何通过字符串动态创建对象并调用其方法? Python中如何通过字符串动态创建对象并调用其方法? Apr 01, 2025 pm 11:18 PM

在Python中,如何通过字符串动态创建对象并调用其方法?这是一个常见的编程需求,尤其在需要根据配置或运行...

在Linux终端中使用python --version命令时如何解决权限问题? 在Linux终端中使用python --version命令时如何解决权限问题? Apr 02, 2025 am 06:36 AM

Linux终端中使用python...

See all articles