ホームページ > バックエンド開発 > Golang > ailsafe-go lib を使用したマイクロサービス間の通信の回復力

ailsafe-go lib を使用したマイクロサービス間の通信の回復力

PHPz
リリース: 2024-08-27 06:04:02
オリジナル
657 人が閲覧しました

Resilience in communication between microservices using the failsafe-go lib

最初から始めましょう。レジリエンスとは何ですか?この投稿の定義が気に入っています:

予想される条件と予想外の条件の両方で必要な動作を維持できるように、変化や障害の発生前、発生中、または発生後にシステムの機能を調整するシステム本来の能力

これは広範な用語なので、この投稿ではマイクロサービス間の通信に焦点を当てます。これを行うために、Go を使用して 2 つのサービス、serviceA と serviceB を作成しました (この投稿を書いているとき、私の創造性はそれほど高くありませんでした)。

両方の初期コードは次のとおりです:

package main

// serviceA
import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        resp, err := http.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

ログイン後にコピー
package main

//serviceB
import (
    "net/http"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

ログイン後にコピー

コードでわかるように、serviceB に問題がある場合、serviceA は通信障害を処理しないため、serviceA の機能に影響します。 lib failsafe-go を使用してこれを改善します。

公式ウェブサイトのドキュメントによると:

Failsafe-go は、回復力とフォールト トレラントな Go アプリケーションを構築するためのライブラリです。これは、必要に応じて組み合わせて構成できる 1 つ以上の復元ポリシーで関数をラップすることによって機能します。

利用可能なポリシーをいくつか適用し、その構成をテストすることから始めましょう。

タイムアウト

テストする最初のポリシーは最も単純なもので、サービス B の応答に時間がかかりすぎ、クライアントがその理由を知っている場合に接続を確実に中断するためのタイムアウトが含まれています。

最初のステップは、シナリオのデモを容易にするために遅延が含まれるようにサービス B を変更することでした。

package main
//serviceB
import (
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        time.Sleep(5 * time.Second) //add a delay to simulate a slow service
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

ログイン後にコピー

failsafe-go をインストールした後、次のコマンドを使用します:

❯ cd serviceA
❯ go get github.com/failsafe-go/failsafe-go
ログイン後にコピー

serviceA/main.go のコードは次のとおりです:

package main

import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a Timeout for 1 second
        timeout := newTimeout(logger)

        // Use the Timeout with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, timeout)
        client := &http.Client{Transport: roundTripper}
        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](1 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

ログイン後にコピー

動作をテストするために、curl を使用してサービス A にアクセスしました:

❯ curl -v http://localhost:3000
* Host localhost:3000 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:3000...
* Connected to localhost (::1) port 3000
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/8.7.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 500 Internal Server Error
< Date: Fri, 23 Aug 2024 19:43:23 GMT
< Content-Length: 45
< Content-Type: text/plain; charset=utf-8
<
* Connection #0 to host localhost left intact
Get "http://localhost:3001": timeout exceeded⏎
ログイン後にコピー

次の出力がサービス A によって生成されます:

go run main.go
{"time":"2024-08-20T08:37:36.852886-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-20T08:37:36.856079-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-20T08:37:35.851262-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63409","referer":"","length":0},"response":{"time":"2024-08-20T08:37:36.856046-03:00","latency":1004819000,"status":500,"length":45},"id":""}
ログイン後にコピー

このようにして、クライアント (この場合はカール) が効果的な応答を示し、serviceA が大きな影響を与えていないことがわかります。

別の有益なポリシーである再試行を調査して、アプリケーションの復元力を向上させましょう。

リトライ

ここでも、サービス B に変更を加えてランダム エラーを追加する必要がありました。

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

ログイン後にコピー

わかりやすくするために、一度に 1 つのポリシーを表示しています。これが、serviceA がタイムアウトのあるバージョンではなく、元のバージョンに変更された理由です。後で、アプリケーションの復元力を高めるために複数のポリシーを構成する方法を検討します。

コード serviceA/main.go は次のようになります:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a RetryPolicy that only handles 500 responses, with backoff delays between retries
        retryPolicy := newRetryPolicy(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, retryPolicy)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

ログイン後にコピー

このように、serviceB がステータス StatusServiceUnavailable (コード 503) を返した場合、関数設定 WithBackoff のおかげで、徐々に接続が再試行されます。 carl 経由でアクセスした場合、serviceA の出力は次のようになります。

go run main.go
{"time":"2024-08-20T08:43:38.297621-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:38.283715-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63542","referer":"","length":0},"response":{"time":"2024-08-20T08:43:38.297556-03:00","latency":13840708,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:39.946562-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:39.943394-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63544","referer":"","length":0},"response":{"time":"2024-08-20T08:43:39.946545-03:00","latency":3151000,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:40.845862-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-20T08:43:41.85287-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-20T08:43:43.860694-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:40.841468-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63545","referer":"","length":0},"response":{"time":"2024-08-20T08:43:43.860651-03:00","latency":3019287458,"status":200,"length":71},"id":""}
ログイン後にコピー

この例では、serviceB へのアクセス時にエラーが発生し、成功するまでライブラリが接続を再実行したことがわかります。接続でエラーが発生し続ける場合、クライアントはエラー メッセージ「http://localhost:3001」: retries completed.

を受け取ります。

プロジェクトにサーキット ブレーカーを追加して、レジリエンスをさらに深く掘り下げてみましょう。

サーキットブレーカー

サーキット ブレーカーの概念は、サービスへのアクセスに対するより高度な制御を提供する、より高度なポリシーです。パターン サーキット ブレーカーは、クローズ (エラーなし)、オープン (エラーあり、送信中断)、セミオープン (回復テストが困難な場合に限られた数のリクエストをサービスに送信する) の 3 つの状態で動作します。

このポリシーを使用するために、サービス B の新しいバージョンを作成して、より多くのエラー シナリオと遅延を生成できるようにしました。

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        if sleep() {
            time.Sleep(1 * time.Second)
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

func sleep() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

ログイン後にコピー

serviceA のコード:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a CircuitBreaker that handles 503 responses and uses a half-open delay based on the Retry-After header
        circuitBreaker := newCircuitBreaker(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, circuitBreaker)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}

ログイン後にコピー

serviceA の出力でわかるように、サーキット ブレーカーが動作しています。

❯ go run main.go
{"time":"2024-08-20T08:51:37.770611-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-20T08:51:38.771682-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:38.776743-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:39.777821-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:39.784897-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:40.786209-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:40.792457-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-20T08:51:40.792733-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:51:37.756947-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63699","referer":"","length":0},"response":{"time":"2024-08-20T08:51:40.792709-03:00","latency":3036065875,"status":200,"length":71},"id":""}
ログイン後にコピー

このポリシーにより、エラーの制御が強化され、問題が発生した場合にサービス B を回復できるようになります。

しかし、何らかの理由でサービス B が返せなくなったらどうすればよいでしょうか?このような場合、フォールバックを使用できます。

後退する

このポリシーの考え方は、希望するサービスにさらに深刻な問題があり、復帰までに時間がかかる場合に代替手段を用意することです。これを行うには、コードserviceA:
を変更します。

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        fallback := newFallback(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        type response struct {
            Message string `json:"message"`
        }
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

ログイン後にコピー

関数 newFallback では、ユーザー serviceB が応答しない場合にライブラリが使用する http.Response が 1 つ作成されていることがわかります。

この機能により、サービス B を担当するチームがサービスを再開する時間を確保している間に、クライアントに対応することができます。

serviceA の出力は次のようになります。

❯ go run main.go
{"time":"2024-08-20T08:55:27.326475-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:27.31306-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63772","referer":"","length":0},"response":{"time":"2024-08-20T08:55:27.326402-03:00","latency":13343208,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:31.756765-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:31.754348-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63774","referer":"","length":0},"response":{"time":"2024-08-20T08:55:31.756753-03:00","latency":2404750,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:34.091845-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:33.086273-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63775","referer":"","length":0},"response":{"time":"2024-08-20T08:55:34.091812-03:00","latency":1005580625,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:37.386512-03:00","level":"INFO","msg":"Fallback executed result"}
{"time":"2024-08-20T08:55:37.386553-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:37.38415-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63777","referer":"","length":0},"response":{"time":"2024-08-20T08:55:37.386544-03:00","latency":2393916,"status":200,"length":76},"id":""}
ログイン後にコピー

In the next step, we will combine the concepts we've seen to create a more resilient application.

Policy composition

To do this, we need to change the code of serviceA so that it makes use of the policies we have seen so far:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        retryPolicy := newRetryPolicy(logger)
        fallback := newFallback(logger)
        circuitBreaker := newCircuitBreaker(logger)
        timeout := newTimeout(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](10 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}


ログイン後にコピー

In the code:

roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
ログイン後にコピー

It is possible to view the use of all defined policies. The lib will execute it in the "rightmost" order, that is:

timeout -> circuitBreaker -> retryPolicy -> fallback
ログイン後にコピー

We can see the execution of the policies by observing the serviceA output:

go run main.go
{"time":"2024-08-19T10:15:29.226553-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:29.226841-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:30.227941-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:30.234182-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:30.234258-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:32.235282-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:42.23622-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:15:42.237942-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-19T10:15:42.238043-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:29.215709-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52527","referer":"","length":0},"response":{"time":"2024-08-19T10:15:42.238008-03:00","latency":13022704750,"status":500,"length":45},"id":""}
{"time":"2024-08-19T10:15:56.53476-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:56.534803-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:57.535108-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:57.53889-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:57.538911-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:59.539948-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:59.544425-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:59.544575-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:56.5263-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52542","referer":"","length":0},"response":{"time":"2024-08-19T10:15:59.544557-03:00","latency":3018352000,"status":500,"length":245},"id":""}
{"time":"2024-08-19T10:16:11.044207-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:16:11.046026-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:16:01.043317-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52544","referer":"","length":0},"response":{"time":"2024-08-19T10:16:11.045601-03:00","latency":10002596334,"status":500,"length":45},"id":""}
ログイン後にコピー

Conclusion

One of the advantages of microservices architecture is that we can break a complex domain into smaller, specialized services that communicate with each other to complete the necessary logic. Ensuring that this communication is resilient and will continue to work even in the face of failures and unforeseen events is fundamental. Using libraries such as failsafe-go makes this process easier.

You can find the codes presented in this post on my Github.

Originally published at https://eltonminetto.dev on August 24, 2024

以上がailsafe-go lib を使用したマイクロサービス間の通信の回復力の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート