首頁 > web前端 > js教程 > 10需要知道的RXJS功能與示例

10需要知道的RXJS功能與示例

Joseph Gordon-Levitt
發布: 2025-02-17 10:08:10
原創
194 人瀏覽過

10 Need-to-Know RxJS Functions with Examples

本文由 Florian Rappl 和 Moritz Kröger 共同評審。感謝所有 SitePoint 的同行評審員,使 SitePoint 的內容盡善盡美!

隨著對函數式反應式編程 (FRP) 興趣的增長,RxJS 已成為此範例中最流行的 JavaScript 庫之一。在本文中,我們將探討我認為 RxJS 中十大必知函數。

注意:本文假定您熟悉 RxJS 的基礎知識,如文章《使用 RxJS 入門函數式反應式編程》中所述。

關鍵要點

  • RxJS 利用類似於隨時間推移填充的數組的可觀察對象來促進函數式反應式編程 (FRP),從而允許在應用程序中進行更聲明式和強大的錯誤處理。
  • RxJS 中簡單流的核心操作,例如 map()filter()reduce()take(),鏡像數組操作,但應用於隨時間發出值的數流。
  • flatMap()switch() 這樣的專用函數對於分別處理複雜數據結構和管理多個流至關重要,這使得它們對於高級反應式編程任務至關重要。
  • 可以使用 concat()merge()combineLatest() 等運算符有效地組合多個流,每個運算符在流管理和數據同步中發揮不同的作用。
  • takeUntil() 函數提供了一種基於外部條件取消訂閱的機制,這說明了 RxJS 在流控制和資源管理方面的靈活性。

反應式編程

反應式編程是一種編程範例,它將稱為可觀察對象的數據流作為其基本的編程單元。

流——或 RxJS 行話中的可觀察對象——類似於事件監聽器:兩者都等待某些事情發生,並在發生時通知您。從 onClick 監聽器獲得的一系列異步通知是數據流的完美示例。

換句話說,可觀察對像只不過是一個隨時間推移填充的數組。

該數組的元素可以來自幾乎任何地方:文件系統、DOM 事件、API 調用,甚至轉換後的同步數據,如數組。從根本上說,反應式編程只不過是用可觀察對像作為程序的構建塊。

與數組的關係

數組很簡單,因為它們的內容是最終的,除非明確更改。從這個意義上說,數組中沒有什麼本質上的時間性。

另一方面,可觀察對象由時間定義。您最多只能知道流到目前為止已接收 [1, 2, 3]。您不能確定您是否會得到 4——或者不會——並且是數據源,而不是您的程序,決定了這一點。

流和數組之間的關係是如此深刻,以至於大多數反應式擴展都源於函數式編程的世界,其中列表操作是核心。

熟悉 RxJS

考慮一下常見的待辦事項應用程序。讓我們看看使用 RxJS 如何顯示用戶未完成任務的名稱的問題:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

到目前為止,這只不過是數組擴展,但它演示了反應式編程的函數式風格。

通過添加更複雜、“現實世界”的功能,其聲明性性質變得清晰。假設我們想要:

  • 響應用戶選擇查看已完成或未完成的任務來啟動請求;
  • 每秒只發送對上次選擇的請求一次,以免在用戶快速更改選擇時浪費帶寬;
  • 最多重試三次失敗的請求;以及
  • 只有當服務器發送與上次不同的響應時才重繪視圖。
const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

逐步分解:

  • parameter_stream 告訴我們用戶是否想要已完成或未完成的任務,並將選擇存儲在 parameter 中;
  • debounce() 確保我們每秒鐘只關注最後一次按鈕點擊;
  • getTasks() 周圍的部分與之前相同;
  • distinctUntilChanged() 確保我們只在服務器的響應與上次不同時才關注;以及
  • update() 負責更新 UI 以反映我們從服務器獲得的內容。

在命令式、基於回調的樣式中處理 debounce、retry 和“distinct until changed”邏輯是有效的,但它既脆弱又復雜。

關鍵在於,使用 RxJS 進行編程允許:

  1. 聲明式程序;
  2. 可擴展的系統;以及
  3. 簡單直接、強大的錯誤處理。

在瀏覽 RxJS 十大必知函數的過程中,我們將遇到上述示例中的每個函數。

簡單流操作

簡單流(發出簡單值,如字符串的流)的基本函數包括:

  • map()
  • filter()
  • reduce()
  • take() / takeWhile()

除了 take() 和 takeWhile() 之外,這些都類似於 JavaScript 的高階數組函數。

我們將通過解決一個示例問題來應用這些函數:查找數據庫中所有具有 .com 或 .org 網站的用戶,併計算其網站名稱的平均長度。

JSONPlaceholder 將作為我們的用戶來源。這是我們將使用的用戶數據的 JSON 表示。

1. 使用 map() 轉換數據

在可觀察對像上使用 map() 與在數組上使用它相同。它:

  1. 接受回調作為參數;
  2. 在您調用的數組的每個元素上執行它;以及
  3. 返回一個新數組,其中原始數組的每個元素都被調用回調在其上產生的結果所替換。

在可觀察對像上使用 map() 時,唯一的區別是:

  1. 它返回一個新的可觀察對象,而不是一個新的數組;以及
  2. 它在可觀察對象發出新項目時執行,而不是立即全部執行一次。

我們可以使用 map() 將我們的用戶數據流轉換為僅包含其網站名稱的列表:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

在這裡,我們使用 map 將傳入流中的每個用戶對象替換為每個用戶的網站。

RxJS 還允許您調用 map() as select()。這兩個名稱都指代相同的函數。

2. 過濾結果

像 map() 一樣,filter() 在可觀察對像上的作用與在數組上的作用大致相同。要查找每個具有 .net 或 .org 網站地址的用戶,我們可以這樣寫:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

這將只選擇其網站以“net”或“org”結尾的用戶。

filter() 也有別名 where()。

3. 使用 reduce() 收集結果

reduce() 允許我們使用所有單個值並將它們轉換為單個結果。

reduce() 往往是最令人困惑的基本列表操作,因為與 filter() 或 map() 不同,它的行為因使用而異。

通常,reduce() 獲取值的集合,並將其轉換為單個數據點。在我們的例子中,我們將向它提供一個網站名稱流,並使用 reduce() 將該流轉換為一個對象,該對象計算我們找到的網站數量以及其名稱長度的總和。

source.
  map((user) => user.website)
登入後複製
登入後複製
登入後複製

在這裡,我們將流簡化為單個對象,它跟踪:

  1. 我們已經看到了多少個站點;以及
  2. 所有名稱的總長度。

請記住,reduce() 只有在源可觀察對象完成時才返回結果。如果您想在每次流接收新項目時都知道累加器的狀態,請改用 scan()。

4. 使用 take() 限制結果

take() 和 takeWhile() 補充了簡單流的基本函數。

take(n) 從流中讀取 n 個值,然後取消訂閱。

我們可以使用 scan() 在每次我們收到網站時發出我們的對象,並且只 take() 前兩個值。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})
登入後複製
登入後複製

RxJS 還提供 takeWhile(),它允許您在某個布爾測試成立之前獲取值。我們可以這樣使用 takeWhile() 來編寫上述流:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })
登入後複製

高階流操作

除了它們在可觀察對象而不是數組上工作之外,這些函數幾乎與熟悉的列表操作相同。

“[I]f you know how to program against Arrays using the Array#extras, then you already know how to use RxJS!” ~ RxJS 文檔

正如數組可以包含比簡單值(如數組或對象)更複雜的數據一樣,可觀察對像也可以發出高階數據,如 Promise 或其他可觀察對象。這就是更專業的工具發揮作用的地方。

5. 使用 flatMap() 壓平流

……事實上,我們已經在使用了!

當我們定義源流時,我們調用了 fromPromise() 和 flatMap():

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);
登入後複製

這使用了三個新的機制:

  1. fromPromise;
  2. Rx.Observable.from;以及
  3. flatMap。

來自 promise 的可觀察對象

Promise 代表我們將異步獲得的單個未來值——例如,對服務器的調用的結果。

Promise 的一個定義特徵是它只代表一個未來的值。它不能返回多個異步數據;這是可觀察對象所做的,也是兩者之間的一個根本區別。

這意味著,當我們使用 Rx.Observable.fromPromise() 時,我們得到一個可觀察對象,它發出單個值——或者:

  1. Promise 解析到的值;或
  2. Promise 拒絕的值。

當 Promise 返回字符串或數字時,我們不需要做任何特殊的事情。但是,當它返回數組時(在我們的例子中就是這樣),我們更希望創建一個可觀察對象,該對象發出數組的內容,而不是數組本身作為單個值。

6. 使用 flatMap()

此過程稱為扁平化,flatMap() 會處理它。它有很多重載,但我們只使用最簡單和最常用的重載。

使用 flatMap() 時,我們:

  1. 在發出 Promise 的單值解析或拒絕的可觀察對像上調用 flatMap();以及
  2. 傳遞一個函數來創建一個新的可觀察對象。

在我們的例子中,我們傳遞 Rx.Observable.from(),它從數組的值創建一個序列:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

這涵蓋了我們簡短序言中的代碼:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

RxJS 也為 flatMap() 提供了一個別名:selectMany()。

組合多個流

通常,我們將有多個需要組合的流。組合流的方法有很多,但有一些比其他的出現頻率更高。

7. 使用 concat() 和 merge() 組合流

連接和合併是組合流的兩種最常見方法。

連接通過發出第一個流的值直到它完成,然後發出第二個流的值來創建一個新流。

合併通過發出任何活動流的值來從多個流創建新流

想想在 Facebook Messenger 上同時與兩個人交談。 concat() 是您從雙方收到消息,但在回复另一個人之前完成與一個人的對話的情況。 merge() 就像創建一個群聊並同時接收兩條消息流。

source.
  map((user) => user.website)
登入後複製
登入後複製
登入後複製

concat() 流將首先打印 source1 的所有值,並且只有在 source1 完成後才開始打印 source2 的值。

merge() 流將根據接收到的順序打印 source1 和 source2 的值:它不會等待第一個流完成,然後再發出第二個流的值。

8. 使用 switch()

通常,我們想監聽發出可觀察對象的可觀察對象,但只關注來自源的最新發射。

為了進一步擴展 Facebook Messenger 的類比,switch() 是您……好吧,根據當前正在發送消息的人來切換您回复的人。

為此,RxJS 提供了 switch。

用戶界面為 switch() 提供了幾個很好的用例。如果我們的應用程序每次用戶選擇他們想要搜索的內容時都會發出請求,我們可以假設他們只想查看最新選擇的結果。因此,我們使用 switch() 只監聽最新選擇的結果。

順便說一下,我們應該確保不要浪費帶寬,而只針對用戶每秒進行的最後一次選擇訪問服務器。我們為此使用的函數稱為 debounce()

如果您想朝另一個方向前進,並且只遵守第一次選擇,則可以使用 throttle()。它具有相同的 API,但行為相反。

9. 協調流

如果我們想允許用戶搜索具有特定 ID 的帖子或用戶怎麼辦?

為了演示,我們將創建另一個下拉菜單,並允許用戶選擇他們想要檢索的項目的 ID。

有兩種情況。當用戶:

  1. 更改任一選擇;或
  2. 更改兩個選擇。

使用 combineLatest() 響應任一流的更改

在第一種情況下,我們需要創建一個流,該流使用以下內容啟動網絡請求:

  1. 用戶最近選擇的端點;以及
  2. 用戶最近選擇的 ID。

……並在用戶更新任一選擇時執行此操作。

這就是 combineLatest() 的用途:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

每當任一流發出值時,combineLatest() 都會獲取發出的值並將其與其他流發出的最後一個項目配對,並將該對以數組的形式發出。

這在圖表中更容易可視化:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

使用 zip 只響應兩個流的更改

要等到用戶更新其對 id 和端點字段的選擇後,請將 combineLatest() 替換為 zip()。

同樣,這在圖表中更容易理解:

source.
  map((user) => user.website)
登入後複製
登入後複製
登入後複製

與 combineLatest() 不同,zip() 會等到兩個可觀察對像都發出新內容後才會發出其更新值的數組。

10. takeUntil

最後,takeUntil() 允許我們監聽第一個流,直到第二個流開始發出值。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})
登入後複製
登入後複製

當您需要協調流但不需要組合它們時,這很有用。

總結

僅僅向數組添加時間維度就開啟了對程序進行全新思考的大門。

RxJS 的內容遠不止我們在這裡看到的這些,但這已經足夠走得很遠了。

從 RxJS Lite 開始,隨時準備參考文檔,並抽出時間動手實踐。在您不知不覺中,一切都會看起來像一個流……因為一切都是。

關於 RxJS 函數的常見問題解答 (FAQ)

RxJS 與傳統 JavaScript 的主要區別是什麼?

RxJS 是一個使用可觀察對象的反應式編程庫,用於簡化異步或基於回調的代碼的組合。這與使用更命令式編程風格的傳統 JavaScript 相比。關鍵區別在於它們如何處理數據——RxJS 將數據視為流,可以使用各種運算符對其進行操作和轉換,而傳統 JavaScript 則以更線性的方式處理數據。

如何在 RxJS 中創建可觀察對象?

在 RxJS 中,您可以使用新的 Observable() 構造函數創建可觀察對象。此構造函數將一個函數作為參數,稱為訂閱者函數,該函數在最初訂閱可觀察對象時執行。這是一個基本示例:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

RxJS 中的主要運算符是什麼以及它們如何工作?

RxJS 具有廣泛的運算符,可用於控制數據在可觀察對象和觀察者之間的流動方式。一些主要運算符包括 map()、filter()、reduce()、merge() 和 concat()。這些運算符中的每一個都以不同的方式操作數據流,例如轉換數據、過濾掉某些值或組合多個流。

如何在 RxJS 中處理錯誤?

RxJS 提供了幾個處理錯誤的運算符,例如 catchError()、retry() 和 retryWhen()。 catchError() 運算符用於捕獲可觀察流上的錯誤並返回新的可觀察對像或拋出錯誤。 retry() 運算符可用於在發生錯誤時重新訂閱可觀察對象。 retryWhen() 運算符類似,但它提供了對何時重試的更多控制。

如何取消 RxJS 中可觀察對象的訂閱?

當您訂閱可觀察對象時,您會收到一個 Subscription,它有一個 unsubscribe() 方法。您可以調用此方法來取消可觀察對象的執行並清理正在使用的任何資源。這是一個示例:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()
登入後複製
登入後複製
登入後複製
登入後複製
登入後複製

RxJS 中熱可觀察對象和冷可觀察對象的區別是什麼?

在 RxJS 中,可觀察對象可以是熱的或冷的。冷可觀察對像在訂閱時開始運行,而熱可觀察對象即使在訂閱之前也會產生值。換句話說,冷可觀察對像是惰性的,而熱可觀察對像不是。

如何在 RxJS 中組合多個可觀察對象?

RxJS 提供了幾個組合多個可觀察對象的運算符,例如 merge()、concat()、combineLatest() 和 zip()。這些運算符中的每一個都以不同的方式組合數據流,具體取決於您的特定需求。

RxJS 中主題的目的是什麼?

RxJS 中的主題是一種特殊類型的可觀察對象,它允許將值多播到多個觀察者。與普通可觀察對像不同,主題維護許多監聽器的註冊表。

如何將 RxJS 與 Angular 一起使用?

Angular 內置支持 RxJS,並在內部將其用於各種功能。您也可以在自己的代碼中使用 RxJS 來處理異步操作並實現自動完成、去抖動、節流、輪詢等功能。

RxJS 的一些常見用例是什麼?

RxJS 可用於需要處理異步數據的各種場景。一些常見的用例包括處理用戶輸入、發出 HTTP 請求、使用 WebSockets 和處理動畫。

以上是10需要知道的RXJS功能與示例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板