首页 > 后端开发 > Golang > 使用failsafe-go 库实现微服务之间通信的弹性

使用failsafe-go 库实现微服务之间通信的弹性

PHPz
发布: 2024-08-27 06:04:02
原创
661 人浏览过

Resilience in communication between microservices using the failsafe-go lib

让我们从头开始吧。什么是韧性?我喜欢这篇文章中的定义:

系统在变化和干扰之前、期间或之后调整其功能的内在能力,以便它能够在预期和意外条件下维持所需的操作

由于它是一个广泛的术语,因此我将在这篇文章中重点讨论微服务之间的通信。为此,我使用 Go 创建了两个服务:serviceA 和 serviceB(写这篇文章时我的创造力不高)。

两者的初始代码如下:

package main

// serviceA
import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        resp, err := http.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

登录后复制
package main

//serviceB
import (
    "net/http"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

登录后复制

从代码中可以看到,如果serviceB出现问题,将会影响serviceA的功能,因为它不处理任何通信故障。我们将通过使用 lib failsafe-go 来改进这一点。

根据官网文档:

Failsafe-go 是一个用于构建弹性、容错 Go 应用程序的库。它的工作原理是用一个或多个弹性策略包装函数,这些弹性策略可以根据需要进行组合和组合。

让我们首先应用一些可用的策略并测试它们的组成。

暂停

我们要测试的第一个策略是最简单的,包括超时,以确保如果 serviceB 响应时间过长且客户端知道原因,连接会中断。

第一步是更改 serviceB,使其包含延迟,以便更轻松地演示场景:

package main
//serviceB
import (
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        time.Sleep(5 * time.Second) //add a delay to simulate a slow service
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

登录后复制

安装failsafe-go后,使用命令:

❯ cd serviceA
❯ go get github.com/failsafe-go/failsafe-go
登录后复制

serviceA/main.go 的代码是:

package main

import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a Timeout for 1 second
        timeout := newTimeout(logger)

        // Use the Timeout with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, timeout)
        client := &http.Client{Transport: roundTripper}
        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](1 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

登录后复制

为了测试它的工作原理,我使用curl来访问服务A:

❯ curl -v http://localhost:3000
* Host localhost:3000 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:3000...
* Connected to localhost (::1) port 3000
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/8.7.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 500 Internal Server Error
< Date: Fri, 23 Aug 2024 19:43:23 GMT
< Content-Length: 45
< Content-Type: text/plain; charset=utf-8
<
* Connection #0 to host localhost left intact
Get "http://localhost:3001": timeout exceeded⏎
登录后复制

以下输出由 serviceA 生成:

go run main.go
{"time":"2024-08-20T08:37:36.852886-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-20T08:37:36.856079-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-20T08:37:35.851262-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63409","referer":"","length":0},"response":{"time":"2024-08-20T08:37:36.856046-03:00","latency":1004819000,"status":500,"length":45},"id":""}
登录后复制

这样,就可以看到客户端(在本例中为curl)做出了有效的响应,并且serviceA没有产生重大影响。

让我们通过研究另一项有益的政策来提高应用程序的弹性:重试。

重试

再次,有必要对 serviceB 进行更改以添加随机错误:

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

登录后复制

为了更容易理解,我一次显示一项政策,这就是为什么 serviceA 更改为原始版本而不是超时版本的原因。稍后,我们将研究如何编写多个策略以使应用程序更具弹性。

代码 serviceA/main.go 如下所示:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a RetryPolicy that only handles 500 responses, with backoff delays between retries
        retryPolicy := newRetryPolicy(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, retryPolicy)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

登录后复制

这样,如果 serviceB 返回状态 StatusServiceUnavailable(代码 503),由于函数配置 WithBackoff,将以渐进的间隔再次尝试连接。通过curl 访问时serviceA 的输出应该类似于:

go run main.go
{"time":"2024-08-20T08:43:38.297621-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:38.283715-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63542","referer":"","length":0},"response":{"time":"2024-08-20T08:43:38.297556-03:00","latency":13840708,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:39.946562-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:39.943394-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63544","referer":"","length":0},"response":{"time":"2024-08-20T08:43:39.946545-03:00","latency":3151000,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:40.845862-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-20T08:43:41.85287-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-20T08:43:43.860694-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:40.841468-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63545","referer":"","length":0},"response":{"time":"2024-08-20T08:43:43.860651-03:00","latency":3019287458,"status":200,"length":71},"id":""}
登录后复制

在这个例子中,可以看到访问serviceB时发生了错误,lib再次执行连接,直到成功。如果连接继续出错,客户端将收到错误消息“http://localhost:3001”:重试次数超出。

让我们通过在项目中添加断路器来更深入地了解弹性。

断路器

断路器概念是一种更先进的策略,可以更好地控制对服务的访问。模式断路器以三种状态工作:关闭(无错误)、打开(有错误,中断传输)和半打开(向难以测试其恢复的服务发送有限数量的请求)。

为了使用此政策,我制作了新版本的 serviceB,以便它可以生成更多错误场景和延迟:

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        if sleep() {
            time.Sleep(1 * time.Second)
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

func sleep() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

登录后复制

以及 serviceA 的代码:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a CircuitBreaker that handles 503 responses and uses a half-open delay based on the Retry-After header
        circuitBreaker := newCircuitBreaker(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, circuitBreaker)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}

登录后复制

正如我们在 serviceA 的输出中看到的,断路器正在工作:

❯ go run main.go
{"time":"2024-08-20T08:51:37.770611-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-20T08:51:38.771682-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:38.776743-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:39.777821-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:39.784897-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:40.786209-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:40.792457-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-20T08:51:40.792733-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:51:37.756947-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63699","referer":"","length":0},"response":{"time":"2024-08-20T08:51:40.792709-03:00","latency":3036065875,"status":200,"length":71},"id":""}
登录后复制

此策略可以更好地控制错误,允许 serviceB 在遇到问题时进行恢复。

但是当 serviceB 由于某种原因无法再返回时,你该怎么办?在这些情况下,我们可以使用后备。

倒退

此政策的想法是,如果所需的服务出现更严重的问题并且需要很长时间才能返回,则可以有替代方案。为此,我们将更改代码 serviceA:

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        fallback := newFallback(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        type response struct {
            Message string `json:"message"`
        }
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

登录后复制

在 newFallback 函数中,我们可以看到创建了一个 http.Response,如果用户 serviceB 没有响应,库将使用该 http.Response。

此功能使我们能够响应客户,同时负责 serviceB 的团队有时间重新启动并运行服务。

serviceA 的输出与此类似:

❯ go run main.go
{"time":"2024-08-20T08:55:27.326475-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:27.31306-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63772","referer":"","length":0},"response":{"time":"2024-08-20T08:55:27.326402-03:00","latency":13343208,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:31.756765-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:31.754348-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63774","referer":"","length":0},"response":{"time":"2024-08-20T08:55:31.756753-03:00","latency":2404750,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:34.091845-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:33.086273-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63775","referer":"","length":0},"response":{"time":"2024-08-20T08:55:34.091812-03:00","latency":1005580625,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:37.386512-03:00","level":"INFO","msg":"Fallback executed result"}
{"time":"2024-08-20T08:55:37.386553-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:37.38415-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63777","referer":"","length":0},"response":{"time":"2024-08-20T08:55:37.386544-03:00","latency":2393916,"status":200,"length":76},"id":""}
登录后复制

In the next step, we will combine the concepts we've seen to create a more resilient application.

Policy composition

To do this, we need to change the code of serviceA so that it makes use of the policies we have seen so far:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        retryPolicy := newRetryPolicy(logger)
        fallback := newFallback(logger)
        circuitBreaker := newCircuitBreaker(logger)
        timeout := newTimeout(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](10 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}


登录后复制

In the code:

roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
登录后复制

It is possible to view the use of all defined policies. The lib will execute it in the "rightmost" order, that is:

timeout -> circuitBreaker -> retryPolicy -> fallback
登录后复制

We can see the execution of the policies by observing the serviceA output:

go run main.go
{"time":"2024-08-19T10:15:29.226553-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:29.226841-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:30.227941-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:30.234182-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:30.234258-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:32.235282-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:42.23622-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:15:42.237942-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-19T10:15:42.238043-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:29.215709-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52527","referer":"","length":0},"response":{"time":"2024-08-19T10:15:42.238008-03:00","latency":13022704750,"status":500,"length":45},"id":""}
{"time":"2024-08-19T10:15:56.53476-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:56.534803-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:57.535108-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:57.53889-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:57.538911-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:59.539948-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:59.544425-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:59.544575-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:56.5263-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52542","referer":"","length":0},"response":{"time":"2024-08-19T10:15:59.544557-03:00","latency":3018352000,"status":500,"length":245},"id":""}
{"time":"2024-08-19T10:16:11.044207-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:16:11.046026-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:16:01.043317-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52544","referer":"","length":0},"response":{"time":"2024-08-19T10:16:11.045601-03:00","latency":10002596334,"status":500,"length":45},"id":""}
登录后复制

Conclusion

One of the advantages of microservices architecture is that we can break a complex domain into smaller, specialized services that communicate with each other to complete the necessary logic. Ensuring that this communication is resilient and will continue to work even in the face of failures and unforeseen events is fundamental. Using libraries such as failsafe-go makes this process easier.

You can find the codes presented in this post on my Github.

Originally published at https://eltonminetto.dev on August 24, 2024

以上是使用failsafe-go 库实现微服务之间通信的弹性的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板