Bagaimana untuk menggunakan acara terkini menggunakan Azure Event Hubs Go SDK (azeventhubs)?

WBOY
Lepaskan: 2024-02-09 08:15:08
ke hadapan
556 orang telah melayarinya

如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

editor php Strawberry membawakan anda panduan tentang cara menggunakan acara terkini menggunakan Azure Event Hubs Go SDK (azeventhubs). Azure Event Hubs ialah perkhidmatan pemindahan data masa nyata yang sangat berskala yang boleh digunakan untuk memproses sejumlah besar data acara. Dalam panduan ini, kami akan membincangkan cara menggunakan pakej azeventhubs untuk berinteraksi dengan Azure Event Hubs dan menunjukkan cara menggunakan ciri acara terkini untuk mendapatkan data masa nyata. Melalui panduan artikel ini, anda akan dapat dengan mudah menggunakan Azure Event Hubs dalam aplikasi Go dan memanfaatkan keupayaan acara terkini untuk mendapatkan dan memproses data masa nyata.

Kandungan soalan

Saya berhijrah daripada azure-event-hubs-go/v3 kepada azeventhubs Go SDK yang lebih baharu. Dalam versi SDK yang lebih lama, terdapat parameter ReceiveOption yang membolehkan saya menentukan tempat untuk mula menggunakan acara.

Dalam SDK baharu, saya menggunakan kod berikut untuk memulakan pemproses:

processor, err := azeventhubs.NewProcessor(
    e.ConsumerClient, 
    checkpointStore, 
    &azeventhubs.ProcessorOptions{
        UpdateInterval: time.Second, 
        Prefetch: 0, 
        StartPositions: azeventhubs.StartPositions{
            Default: azeventhubs.StartPosition{
                Latest: to.Ptr(true), 
                EnqueuedTime: to.Ptr(time.Now()), 
                Inclusive: true
            }
        }
    }
)
Salin selepas log masuk

Walau bagaimanapun, saya perhatikan bahawa acara digunakan dari pusat pemeriksaan terakhir dan bukannya acara yang paling baru dihantar.

Apa yang saya cuba: Saya telah mencuba contoh ConsumingEventsUsingConsumerClient dan ConsumingEventsWithCheckpoints tetapi mereka berkelakuan dengan cara yang sama, menggunakan acara dari pusat pemeriksaan terakhir dan bukannya yang terbaru.

Jangkaan saya: Saya mahu pemproses mula menggunakan acara terkini yang dihantar daripada peranti yang menghantar mesej setiap saat. Bagaimanakah saya boleh mencapai tingkah laku ini menggunakan azeventhubs Go SDK?

Penyelesaian

Saya mengalami kesukaran untuk memahami mekanisme asas AMQP pada mulanya. Walau bagaimanapun, saya dengan sukacitanya melaporkan bahawa isu itu telah berjaya diselesaikan.

var wg sync.WaitGroup
wg.Add(1)

for _, partition := range p.PartitionIDs {
    go func(partition string) {
        defer wg.Done()

        partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
        if err != nil {
            panic(err)
        }

        receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
        defer cancel()

        for {
            events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)

            if err != nil && !errors.Is(err, context.DeadlineExceeded) {
                panic(err)
            }

            for _, evt := range events {
                fmt.Printf("partition: %s\n", partition)
                fmt.Printf("Body: %s\n", string(evt.Body))
            }
        }
    }(partition)
}

wg.Wait()
Salin selepas log masuk

Saya ingin mengucapkan terima kasih kepada pasukan Sokongan Pelanggan Azure atas bantuan berharga mereka.

Atas ialah kandungan terperinci Bagaimana untuk menggunakan acara terkini menggunakan Azure Event Hubs Go SDK (azeventhubs)?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:stackoverflow.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan