J'ai réglé le cadre de ce système de collecte de journaux, comme indiqué ci-dessous
La logique globale du code à implémenter cette fois est :
L'adresse complète du code est : https://github.com/pythonsite/logagent
introduction etcd
Le stockage clé-valeur distribué hautement disponible peut être utilisé pour le partage de configuration et la découverte de services
Projets similaires : zookeeper et consul
Langage de développement : go
Interface : fournit une interface reposante , utilisez un simple
algorithme de mise en œuvre : forte cohérence basée sur l'algorithme radeau, répertoire de stockage de services hautement disponible
scénarios d'application etcd :
Découverte de services et enregistrement de services
.2. Centre de configuration (obligatoire pour le client de collecte de journaux que nous avons implémenté)
3. Verrouillage distribué
4. Élection du maître
Site officiel pour etcd Il existe un. introduction très concise :
etcd build :
Adresse de téléchargement : https://github.com/coreos/etcd/releases/
Selon Just download the version correspondante dans votre propre environnement et démarrez-la
Après le démarrage, vous pouvez la vérifier avec la commande suivante :
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#
introduction du contexte et utilisation
En fait, cette chose se traduit par gestion du contexte. Alors quel est le rôle du contexte ? Il a principalement les deux fonctions suivantes :
1. Contrôler le délai d'attente de la goroutine
2, Enregistrer les données de contexte
Comprenez à travers l'exemple simple suivant :
package main import ( "fmt" "time" "net/http" "context" "io/ioutil" ) type Result struct{ r *http.Response err error } func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err != nil{ fmt.Println("http request failed,err:",err) return } // 如果请求成功了会将数据存入到管道中 go func(){ resp,err := client.Do(req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!") case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return } func main() { process() }
Écrivez un contexte pour enregistrer le contexte via le contexte, des exemples de code tels que :
package main import ( "github.com/Go-zh/net/context" "fmt" ) func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b } func calc(ctx context.Context,a, b int) int{ traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) //再将ctx传入到add中 return add(ctx,a,b) } func main() { //将ctx传递到calc中 ctx := context.WithValue(context.Background(),"trace_id","123456") calc(ctx,20,30) }
Combinaison d'etcd et de contexte
Un exemple simple de connexion d'etcd via go : (Il y a un petit problème qui nécessite une attention ici est la méthode de démarrage d'etcd. Le démarrage par défaut peut ne vous connectez pas, surtout si vous Il est installé virtuellement, vous devez donc le démarrer avec la commande suivante :
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client- urls http://0.0.0.0 : 2371 --listen-peer-urls http://0.0.0.0:2381
)
package main import ( etcd_client "github.com/coreos/etcd/clientv3" "time" "fmt" ) func main() { cli, err := etcd_client.New(etcd_client.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close() }
L'exemple suivant consiste à connecter etcd, à enregistrer et à obtenir la valeur
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err != nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err != nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }
À propos du contexte Le site officiel propose également un exemple très utile, qui est utilisé pour contrôler la sortie d'une goroutine ouverte. Le code est le suivant :
package main import ( "context" "fmt" ) func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }
Concernant l'exemple de code du. Démonstration WithDeadline dans le document du site officiel :
package main import ( "context" "fmt" "time" ) func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) } }
Grâce au code ci-dessus, nous avons Une utilisation de base, alors si nous utilisons etcd pour la gestion de la configuration, si la configuration change, comment notifier les changements de configuration du serveur correspondants ? L'exemple suivant démontre :
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } defer cli.Close() // 这里会阻塞 rch := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{ fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }
Implémentation simple d'un code consommateur kafka Exemple :
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "time" ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } }(pc) } time.Sleep(time.Hour) consumer.Close() }
Cependant, le code ci-dessus n'est pas le meilleur code car on finit par attendre l'exécution de goroutine au fil du temps .sleep. Nous pouvons le modifier pour utiliser sync.WaitGroup pour implémenter
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "sync" ) var ( wg sync.WaitGroup ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ wg.Add(1) for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close() }
Mettez les informations de journal que le client doit collecter dans etcd
Le code pour le traitement etcd est :
package main import ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt" ) var Client *clientv3.Client var logConfChan chan string // 初始化etcd func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err != nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, key := range keys{ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) // 从etcd中获取要收集日志的信息 resp,err := Client.Get(ctx,key) cancel() if err != nil { logs.Warn("get key %s failed,err:%v",key,err) continue } for _, ev := range resp.Kvs{ logs.Debug("%q : %q\n", ev.Key, ev.Value) logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return } func WatchEtcd(keys []string){ // 这里用于检测当需要收集的日志信息更改时及时更新 var watchChans []clientv3.WatchChan for _,key := range keys{ rch := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{ select{ case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) } default: } } time.Sleep(time.Second) } waitGroup.Done() } func GetLogConf()chan string{ return logConfChan }
De même, ajoutez la bonne limite ici. Après tout, le programme de collecte de journaux ne peut pas affecter les performances de l'activité en cours, donc limit.go est ajouté pour limiter la vitesse :
package main import ( "time" "sync/atomic" "github.com/astaxie/beego/logs" ) type SecondLimit struct { unixSecond int64 curCount int32 limit int32 } func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit } func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count)) } func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit { time.Sleep(time.Microsecond) logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount) continue } if sec != atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false } }
Recommandé : tutoriel de langue go
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!