l'éditeur php Strawberry vous propose un guide sur la façon d'utiliser les derniers événements à l'aide du SDK Azure Event Hubs Go (azeventhubs). Azure Event Hubs est un service de transfert de données en temps réel hautement évolutif qui peut être utilisé pour traiter de grandes quantités de données d'événements. Dans ce guide, nous expliquerons comment utiliser le package azeventhubs pour interagir avec Azure Event Hubs et montrerons comment utiliser la dernière fonctionnalité d'événements pour obtenir des données en temps réel. Grâce aux conseils de cet article, vous pourrez facilement utiliser Azure Event Hubs dans les applications Go et profiter des dernières fonctionnalités d'événement pour obtenir et traiter des données en temps réel.
Je migre de azure-event-hubs-go/v3 vers le nouveau azeventhubs Go SDK. Dans les anciennes versions du SDK, il y avait un paramètre ReceiveOption qui me permettait de spécifier par où commencer à consommer les événements.
Dans le nouveau SDK, j'utilise le code suivant pour initialiser le processeur :
processor, err := azeventhubs.NewProcessor( e.ConsumerClient, checkpointStore, &azeventhubs.ProcessorOptions{ UpdateInterval: time.Second, Prefetch: 0, StartPositions: azeventhubs.StartPositions{ Default: azeventhubs.StartPosition{ Latest: to.Ptr(true), EnqueuedTime: to.Ptr(time.Now()), Inclusive: true } } } )
Cependant, j'ai remarqué que les événements sont consommés à partir du dernier point de contrôle au lieu de l'événement le plus récemment envoyé.
Ce que j'ai essayé : J'ai essayé les exemples ConsumingEventsUsingConsumerClient et ConsumingEventsWithCheckpoints mais ils se comportent de la même manière, consommant les événements du dernier point de contrôle au lieu du plus récent.
Mes attentes : Je souhaite que le processeur commence à utiliser les derniers événements envoyés par l'appareil qui envoie un message toutes les secondes. Comment puis-je obtenir ce comportement à l’aide du SDK azeventhubs Go ?
J'ai eu du mal à comprendre le mécanisme sous-jacent de l'AMQP au début. Cependant, je suis heureux d'annoncer que le problème a été résolu avec succès.
var wg sync.WaitGroup wg.Add(1) for _, partition := range p.PartitionIDs { go func(partition string) { defer wg.Done() partitionClient, err := consumerClient.NewPartitionClient(partition, nil) if err != nil { panic(err) } receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30) defer cancel() for { events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil) if err != nil && !errors.Is(err, context.DeadlineExceeded) { panic(err) } for _, evt := range events { fmt.Printf("partition: %s\n", partition) fmt.Printf("Body: %s\n", string(evt.Body)) } } }(partition) } wg.Wait()
Je tiens à exprimer ma gratitude à l'équipe de support client Azure pour son aide précieuse.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!