目錄
正確答案
首頁 後端開發 Python教學 cosmosdb 的計時器觸發器無法正常運作

cosmosdb 的計時器觸發器無法正常運作

Feb 22, 2024 pm 12:40 PM

cosmosdb 的计时器触发器无法正常工作

問題內容

我對我的函數應用「timertrigger」有疑問。

我開發了此功能來與 telegram 機器人進行通信,以便在 api 請求後發送訊息。

我在本地嘗試過該功能應用程序,效果很好。但是,當我嘗試使用 cosmosdb 儲存資訊時,遇到問題並且無法儲存資訊。

我已經設定了將我的應用程式與 telegram 和 cosmosdb 連接所需的所有變數和內容

try:
        database_obj  = client.get_database_client(database_name)
        await database_obj.read()
        return database_obj
    except exceptions.cosmosresourcenotfounderror:
        print("creating database")
        return await client.create_database(database_name)
# </create_database_if_not_exists>
    
# create a container
# using a good partition key improves the performance of database operations.
# <create_container_if_not_exists>
async def get_or_create_container(database_obj, container_name):
    try:        
        todo_items_container = database_obj.get_container_client(container_name)
        await todo_items_container.read()   
        return todo_items_container
    except exceptions.cosmosresourcenotfounderror:
        print("creating container with lastname as partition key")
        return await database_obj.create_container(
            id=container_name,
            partition_key=partitionkey(path="/lastname"),
            offer_throughput=400)
    except exceptions.cosmoshttpresponseerror:
        raise
# </create_container_if_not_exists>

async def populate_container_items(container_obj, items_to_create):
    # add items to the container
    family_items_to_create = items_to_create
    # <create_item>
    for family_item in family_items_to_create:
        inserted_item = await container_obj.create_item(body=family_item)
        print("inserted item for %s family. item id: %s" %(inserted_item['lastname'], inserted_item['id']))
    # </create_item>
# </method_populate_container_items>

async def read_items(container_obj, items_to_read):
    # read items (key value lookups by partition key and id, aka point reads)
    # <read_item>
    for family in items_to_read:
        item_response = await container_obj.read_item(item=family['id'], partition_key=family['lastname'])
        request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
        print('read item with id {0}. operation consumed {1} request units'.format(item_response['id'], (request_charge)))
    # </read_item>
# </method_read_items>

# <method_query_items>
async def query_items(container_obj, query_text):
    # enable_cross_partition_query should be set to true as the container is partitioned
    # in this case, we do have to await the asynchronous iterator object since logic
    # within the query_items() method makes network calls to verify the partition key
    # definition in the container
    # <query_items>
    query_items_response = container_obj.query_items(
        query=query_text,
        enable_cross_partition_query=true
    )
    request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
    items = [item async for item in query_items_response]
    print('query returned {0} items. operation consumed {1} request units'.format(len(items), request_charge))
    # </query_items>
# </method_query_items>

async def run_sample():
    print('aaaa')
    print('sss {0}'.format(cosmosclient(endpoint,credential=key)))
    async with cosmosclient(endpoint, credential = key) as client:
        print('connected to db')
        try:
            database_obj = await get_or_create_db(client, database_name)
            # create a container
            container_obj = await get_or_create_container(database_obj, container_name)
            family_items_to_create = ["link", "ss", "s", "s"]
            await populate_container_items(container_obj, family_items_to_create)
            await read_items(container_obj, family_items_to_create)
            # query these items using the sql query syntax. 
            # specifying the partition key value in the query allows cosmos db to retrieve data only from the relevant partitions, which improves performance
            query = "select * from c "
            await query_items(container_obj, query)   
        except exceptions.cosmoshttpresponseerror as e:
            print('\nrun_sample has caught an error. {0}'.format(e.message))
        finally:
            print("\nquickstart complete")

async def main(mytimer: func.timerrequest) -> none:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    
    asyncio.create_task(run_sample())
    logging.info(' sono partito')
    sendnews()
    if mytimer.past_due:
        logging.info('the timer is past due!')

    logging.info('python timer trigger function ran at %s', utc_timestamp)
登入後複製

我已經開始我的功能

func host start --port 7072
登入後複製

但我認為與資料庫的連接出了問題,因為 console.log('connected to db') 沒有被列印。

似乎所有與cosmosdb相關的操作都沒有執行,如果有錯誤不知道如何解決。

我的終端中沒有任何錯誤,但正如我所說,cosmosdb 似乎不起作用。

我不確定是否向您提供了所有必要的資訊。感謝您的協助。


正確答案


我在使用非同步函數時也遇到了相同的問題。當我使用非非同步函數時,它對我有用。

參考請查看此 document

我的程式碼: timetrigger1/__init__.py

import datetime
import logging
import asyncio
import azure.functions as func
from azure.cosmos import cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import partitionkey

endpoint = "https://timercosmosdb.documents.azure.com/"
key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
database_name = "todolist"
container_name = "test"

def get_or_create_db(client,database_name):
    try:
        database_obj  = client.get_database_client(database_name)
        database_obj.read()
        return database_obj
    except exceptions.cosmosresourcenotfounderror:
        logging.info("creating database")
        return client.create_database_if_not_exists(database_name)
    

def get_or_create_container(database_obj, container_name):
    try:        
        todo_items_container = database_obj.get_container_client(container_name)
        todo_items_container.read()   
        return todo_items_container
    except exceptions.cosmosresourcenotfounderror:
        logging.info("creating container with lastname as partition key")
        return database_obj.create_container_if_not_exists(
            id=container_name,
            partition_key=partitionkey(path="/id"),
            offer_throughput=400)
    except exceptions.cosmoshttpresponseerror:
        raise


def populate_container_items(container_obj,items):
    inserted_item = container_obj.create_item(body=items)
    logging.info("inserted item for %s family. item id: %s" %(inserted_item['lastname'], inserted_item['id']))

def read_items(container_obj,id):
        item_response = container_obj.read_item(item=id, partition_key=id)
        request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
        logging.info('read item with id {0}. operation consumed {1} request units'.format(item_response['id'], (request_charge)))

def query_items(container_obj, query_text):
    query_items_response = container_obj.query_items(
        query=query_text,
        enable_cross_partition_query=true
    )
    request_charge = container_obj.client_connection.last_response_headers['x-ms-request-charge']
    items = [item for item in query_items_response]
    logging.info('query returned {0} items. operation consumed {1} request units'.format(len(items), request_charge))

def run_sample():
    logging.info('aaaa')
    client = cosmos_client.cosmosclient(endpoint, key)
    logging.info('connected to db')
    try:
        id= "test"
        database_obj = get_or_create_db(client,database_name)

        container_obj = get_or_create_container(database_obj,container_name)
        item_dict = {
                "id": id,
                "lastname": "shandilya",
                "firstname": "vivek",
                "gender": "male",
                "age": 35
            }
        populate_container_items(container_obj,item_dict)
        read_items(container_obj,id)

        query = "select * from c "
        query_items(container_obj, query)   
    except exceptions.cosmoshttpresponseerror as e:
        logging.info('\nrun_sample has caught an error. {0}'.format(e.message))
    finally:
        logging.info("\nquickstart complete")

def main(mytimer: func.timerrequest) -> none:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    
    run_sample()
    logging.info(' sono partito')
    logging.info('python timer trigger function ran at %s', utc_timestamp)
登入後複製

output

functions:

        timertrigger1: timertrigger

for detailed output, run func with --verbose flag.
[2024-01-30t09:00:24.818z] executing 'functions.timertrigger1' (reason='timer fired at 2024-01-30t14:30:24.7842979+05:30', id=5499e180-4964-4d7e-b9f2-b024860945dd)
[2024-01-30t09:00:24.822z] trigger details: unscheduledinvocationreason: ispastdue, originalschedule: 2024-01-30t14:30:00.0000000+05:30
[2024-01-30t09:00:25.022z] aaaa
[2024-01-30t09:00:26.387z] connected to db
[2024-01-30t09:00:28.212z] inserted item for shandilya family. item id: test
[2024-01-30t09:00:28.373z] read item with id test. operation consumed 1 request units
[2024-01-30t09:00:28.546z]
quickstart complete
[2024-01-30t09:00:28.548z] python timer trigger function ran at 2024-01-30t09:00:25.008468+00:00
[2024-01-30t09:00:28.547z]  sono partito
[2024-01-30t09:00:28.546z] query returned 1 items. operation consumed 1 request units
[2024-01-30t09:00:28.592z] executed 'functions.timertrigger1' (succeeded, id=5499e180-4964-4d7e-b9f2-b024860945dd, duration=3793ms)
[2024-01-30t09:00:29.296z] host lock lease acquired by instance id '000000000000000000000000aae5f384'.
登入後複製

{
    "id": "test",
    "lastName": "Shandilya",
    "firstName": "Vivek",
    "gender": "male",
    "age": 35,
    "_rid": "ey58AO9yWqwCAAAAAAAAAA==",
    "_self": "dbs/ey58AA==/colls/ey58AO9yWqw=/docs/ey58AO9yWqwCAAAAAAAAAA==/",
    "_etag": "\"01001327-0000-1a00-0000-65b8baac0000\"",
    "_attachments": "attachments/",
    "_ts": 1706605228
}
登入後複製

以上是cosmosdb 的計時器觸發器無法正常運作的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何解決Linux終端中查看Python版本時遇到的權限問題? 如何解決Linux終端中查看Python版本時遇到的權限問題? Apr 01, 2025 pm 05:09 PM

Linux終端中查看Python版本時遇到權限問題的解決方法當你在Linux終端中嘗試查看Python的版本時,輸入python...

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? 如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? 在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas庫時,如何在兩個結構不同的DataFrame之間進行整列複製是一個常見的問題。假設我們有兩個Dat...

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? 如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持續監聽HTTP請求的? Uvicorn是一個基於ASGI的輕量級Web服務器,其核心功能之一便是監聽HTTP請求並進�...

在Linux終端中使用python --version命令時如何解決權限問題? 在Linux終端中使用python --version命令時如何解決權限問題? Apr 02, 2025 am 06:36 AM

Linux終端中使用python...

如何繞過Investing.com的反爬蟲機制獲取新聞數據? 如何繞過Investing.com的反爬蟲機制獲取新聞數據? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬蟲策略許多人嘗試爬取Investing.com(https://cn.investing.com/news/latest-news)的新聞數據時,常常�...

See all articles