In web development, we often encounter situations where we need to make simultaneous requests between two separate rate limiting endpoints. At this point, we need to find a way to ensure that requests are sent within the appropriate time and wait when the rate limit is reached. In this article, PHP editor Apple will introduce a solution to help you implement this synchronous request function and ensure data accuracy and stability. Let’s take a look at the specific implementation of this solution!
I'm using some 3rd party api's and each api has its own rate limit. Endpoint 1 has a rate limit of 10/s and Endpoint 2 has a rate limit of 20/s.
I need to process data through endpoint 1 which will return an array of objects (between 2-3000 objects). I then need to get each object and send some data to a second endpoint while respecting the second endpoint's rate limit.
I plan to batch send 10 requests at a time in my go routine, ensuring that if all 10 requests complete in
Ultimately, I would like to be able to limit the number of concurrent responses each endpoint can issue at one time. Especially if I have to retry a failed request due to something like 500+ responses from the server.
For the purpose of the question, I used httpbin requests to simulate the following scenario:
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "sync" "time" ) type HttpBinGetRequest struct { url string } type HttpBinGetResponse struct { Uuid string `json:"uuid"` StatusCode int } type HttpBinPostRequest struct { url string uuid string // Item to post to API } type HttpBinPostResponse struct { Data string `json:"data"` StatusCode int } func main() { // Prepare GET requests for 500 requests var requests []*HttpBinGetRequest for i := 0; i < 500; i++ { uri := "https://httpbin.org/uuid" request := &HttpBinGetRequest{ url: uri, } requests = append(requests, request) } // Create semaphore and rate limit for the GET endpoint getSemaphore := make(chan struct{}, 10) getRate := make(chan struct{}, 10) for i := 0; i < cap(getRate); i++ { getRate <- struct{}{} } go func() { // ticker corresponding to 1/10th of a second ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for range ticker.C { _, ok := <-getRate if !ok { return } } }() // Send our GET requests to obtain a random UUID var wg sync.WaitGroup for _, request := range requests { wg.Add(1) // Go func to make request and receive the response go func(r *HttpBinGetRequest) { defer wg.Done() // Check the rate limiter and block if it is empty getRate <- struct{}{} // Add a token to the semaphore getSemaphore <- struct{}{} // Remove token when function is complete defer func() { <-getSemaphore }() resp, _ := get(r) fmt.Printf("%+v\n", resp) }(request) } wg.Wait() // I need to add code that obtains the response data from the above for loop // then sends the UUID it to its own go routines for a POST request, following a similar pattern above // To not violate the rate limit of the second endpoint which is 20 calls per second // postSemaphore := make(chan struct{}, 20) // postRate := make(chan struct{}, 20) // for i := 0; i < cap(postRate); i++ { // postRate <- struct{}{} // } } func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) { httpResp := &HttpBinGetResponse{} client := &http.Client{} req, err := http.NewRequest("GET", hbgr.url, nil) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println(err) fmt.Println("error getting response") return httpResp, err } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode return httpResp, nil } // Method to post data to httpbin func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) { httpResp := &HttpBinPostResponse{} client := &http.Client{} req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid))) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println("error getting response") return httpResp, err } if resp.StatusCode == 429 { fmt.Println(resp.Header.Get("Retry-After")) } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode fmt.Printf("%+v", httpResp) return httpResp, nil }
This is the producer/consumer pattern. You can use chan to connect them.
Regarding the rate limiter, I would use package golang.org/x/time/rate
.
Since we decided to use chan to connect producers and consumers, it is natural to send failed tasks to the same chan so that consumers can try again.
I have encapsulated the logic into the scheduler[t]
type. See demo below. Please note that this demo was hastily written to illustrate the idea only. Not thoroughly tested.
package main import ( "context" "fmt" "io" "log" "math/rand" "net/http" "net/http/httptest" "sort" "sync" "time" "golang.org/x/time/rate" ) type task[t any] struct { param t failedcount int } type scheduler[t any] struct { name string limit int maxtries int wg sync.waitgroup tasks chan task[t] action func(param t) error } // newscheduler creates a scheduler that runs the action with the specified rate limit. // it will retry the action if the action returns a non-nil error. func newscheduler[t any](name string, limit, maxtries, chansize int, action func(param t) error) *scheduler[t] { return &scheduler[t]{ name: name, limit: limit, maxtries: maxtries, tasks: make(chan task[t], chansize), action: action, } } func (s *scheduler[t]) addtask(param t) { s.wg.add(1) s.tasks <- task[t]{param: param} } func (s *scheduler[t]) retrylater(t task[t]) { s.wg.add(1) s.tasks <- t } func (s *scheduler[t]) run() { lim := rate.newlimiter(rate.limit(s.limit), 1) for t := range s.tasks { t := t if err := lim.wait(context.background()); err != nil { log.fatalf("wait: %s", err) return } go func() { defer s.wg.done() err := s.action(t.param) if err != nil { log.printf("task %s, param %v failed: %v", s.name, t.param, err) t.failedcount++ if t.failedcount == s.maxtries { log.printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxtries) return } s.retrylater(t) } }() } } func (s *scheduler[t]) wait() { s.wg.wait() close(s.tasks) } func main() { s := &server{} ts := httptest.newserver(s) defer ts.close() schedulerpost := newscheduler("post", 20, 3, 1, func(param string) error { return post(fmt.sprintf("%s/%s", ts.url, param)) }) go schedulerpost.run() schedulerget := newscheduler("get", 10, 3, 1, func(param int) error { id, err := get(fmt.sprintf("%s/%d", ts.url, param)) if err != nil { return err } schedulerpost.addtask(id) return nil }) go schedulerget.run() for i := 0; i < 100; i++ { schedulerget.addtask(i) } schedulerget.wait() schedulerpost.wait() s.printstats() } func get(url string) (string, error) { resp, err := http.get(url) if err != nil { return "", err } defer resp.body.close() if resp.statuscode != 200 { return "", fmt.errorf("unexpected status code: %d", resp.statuscode) } body, err := io.readall(resp.body) if err != nil { return "", err } return string(body), nil } func post(url string) error { resp, err := http.post(url, "", nil) if err != nil { return err } defer resp.body.close() if resp.statuscode != 200 { return fmt.errorf("unexpected status code: %d", resp.statuscode) } return nil } type server struct { gmu sync.mutex gets []int64 pmu sync.mutex posts []int64 } func (s *server) servehttp(w http.responsewriter, r *http.request) { log.printf("%s: %s", r.method, r.url.path) // collect request stats. if r.method == http.methodget { s.gmu.lock() s.gets = append(s.gets, time.now().unixmilli()) s.gmu.unlock() } else { s.pmu.lock() s.posts = append(s.posts, time.now().unixmilli()) s.pmu.unlock() } n := rand.intn(1000) // simulate latency. time.sleep(time.duration(n) * time.millisecond) // simulate errors. if n%10 == 0 { w.writeheader(http.statusinternalservererror) return } if r.method == http.methodget { fmt.fprintf(w, "%s", r.url.path[1:]) return } } func (s *server) printstats() { log.printf("gets (total: %d):\n", len(s.gets)) printstats(s.gets) log.printf("posts (total: %d):\n", len(s.posts)) printstats(s.posts) } func printstats(ts []int64) { sort.slice(ts, func(i, j int) bool { return ts[i] < ts[j] }) count := 0 to := ts[0] + 1000 for i := 0; i < len(ts); i++ { if ts[i] < to { count++ } else { fmt.printf(" %d: %d\n", to, count) i-- // push back the current item count = 0 to += 1000 } } if count > 0 { fmt.printf(" %d: %d\n", to, count) } }
The output looks like this:
... 2023/03/25 21:03:30 GETS (total: 112): 1679749398998: 10 1679749399998: 10 1679749400998: 10 1679749401998: 10 1679749402998: 10 1679749403998: 10 1679749404998: 10 1679749405998: 10 1679749406998: 10 1679749407998: 10 1679749408998: 10 1679749409998: 2 2023/03/25 21:03:30 POSTS (total: 111): 1679749399079: 8 1679749400079: 8 1679749401079: 12 1679749402079: 8 1679749403079: 10 1679749404079: 9 1679749405079: 9 1679749406079: 8 1679749407079: 14 1679749408079: 12 1679749409079: 9 1679749410079: 4
The above is the detailed content of Synchronize requests between two separate rate limiting endpoints. For more information, please follow other related articles on the PHP Chinese website!