首頁 > Java > java教程 > Java Flink視窗觸發器Trigger怎麼使用

Java Flink視窗觸發器Trigger怎麼使用

王林
發布: 2023-05-03 13:10:10
轉載
1720 人瀏覽過

定義

Trigger確定視窗(由視窗分配器形成)何時準備好由視窗函數處理。每個WindowAssigner都帶有一個預設值Trigger。如果預設觸發器不符合您的需求,您可以使用trigger(…)。

Trigger 原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

public abstract class Trigger<T, W extends Window> implements Serializable {

    /**

     只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法

     * @param element 收到的元素

     * @param timestamp 元素抵达时间.

     * @param window 元素所属的window窗口.

     * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.

     */

    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;

     

     /**

     * processing-time 定时器回调函数

     *

     * @param time 定时器触发的时间.

     * @param window 定时器触发的窗口对象.

     * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.

     */

    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

 

    /**

     * event-time 定时器回调函数

     *

     * @param time 定时器触发的时间.

     * @param window 定时器触发的窗口对象.

     * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.

     */

    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

 

     /**

     * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow

     *

     * @param window 合并后的新窗口对象

     * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态

     */

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {

        throw new UnsupportedOperationException("This trigger does not support merging.");

    }

     

    /**

     * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据

     */

    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;

    }

登入後複製

TriggerResult 原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public enum TriggerResult {

    // 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。

    CONTINUE(false, false),

    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

    FIRE_AND_PURGE(true, true),

    // 触发窗口计算,但是保留窗口元素

    FIRE(true, false),

    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。

    PURGE(false, true);

 

    private final boolean fire;

    private final boolean purge;

 

    private TriggerResult(boolean fire, boolean purge) {

        this.purge = purge;

        this.fire = fire;

    }

 

    public boolean isFire() {

        return this.fire;

    }

 

    public boolean isPurge() {

        return this.purge;

    }

}

登入後複製

一旦觸發器確定視窗已準備好進行處理,就會觸發,返回狀態可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發視窗計算並保留視窗內容,而FIRE_AND_PURGE是觸發視窗計算並刪除視窗內容。預設情況下,預實現的觸發器只是簡單地FIRE不清除視窗狀態。

Flink 預設的Trigger

  • EventTimeTrigger:透過比較EventTime和視窗的Endtime決定是否觸發視窗計算,如果EventTime大於Window EndTime觸發,否則不觸發,視窗將繼續等待。

  • ProcessTimeTrigger:透過比較ProcessTime和視窗EndTme確定是否觸發窗口,如果ProcessTime大於EndTime則觸發計算,否則窗口繼續等待。

  • ContinuousEventTimeTrigger:根據間隔時間週期性觸發視窗或Window的結束時間小於目前EndTime觸發視窗計算。

  • ContinuousProcessingTimeTrigger:根據間隔時間週期性觸發視窗或Window的結束時間小於目前ProcessTime觸發視窗計算。

  • CountTrigger:根據存取資料量是否超過設定的闕值判斷是否觸發視窗計算。

  • DeltaTrigger:根據存取資料計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發視窗計算。

  • PurgingTrigger:可以將任意觸發器作為參數轉換為Purge類型的觸發器,計算完成後資料將會被清理。

  • NeverTrigger:任何時候都不觸發視窗運算

Java Flink視窗觸發器Trigger怎麼使用

#主要看看EventTimeTrigger和ProcessingTimeTrigger的原始碼。

EventTimeTrigger原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {

    private static final long serialVersionUID = 1L;

 

    private EventTimeTrigger() {

    }

 

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {

            return TriggerResult.FIRE;

        } else {

            ctx.registerEventTimeTimer(window.maxTimestamp());

            return TriggerResult.CONTINUE;

        }

    }

 

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {

        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;

    }

 

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {

        return TriggerResult.CONTINUE;

    }

 

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

        ctx.deleteEventTimeTimer(window.maxTimestamp());

    }

 

    public boolean canMerge() {

        return true;

    }

 

    public void onMerge(TimeWindow window, OnMergeContext ctx) {

        long windowMaxTimestamp = window.maxTimestamp();

        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {

            ctx.registerEventTimeTimer(windowMaxTimestamp);

        }

 

    }

 

    public String toString() {

        return "EventTimeTrigger()";

    }

 

    public static EventTimeTrigger create() {

        return new EventTimeTrigger();

    }

}

登入後複製

ProcessingTimeTrigger原始碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {

    private static final long serialVersionUID = 1L;

 

    private ProcessingTimeTrigger() {

    }

 

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {

        ctx.registerProcessingTimeTimer(window.maxTimestamp());

        return TriggerResult.CONTINUE;

    }

 

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {

        return TriggerResult.CONTINUE;

    }

 

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {

        return TriggerResult.FIRE;

    }

 

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

        ctx.deleteProcessingTimeTimer(window.maxTimestamp());

    }

 

    public boolean canMerge() {

        return true;

    }

 

    public void onMerge(TimeWindow window, OnMergeContext ctx) {

        long windowMaxTimestamp = window.maxTimestamp();

        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {

            ctx.registerProcessingTimeTimer(windowMaxTimestamp);

        }

 

    }

 

    public String toString() {

        return "ProcessingTimeTrigger()";

    }

 

    public static ProcessingTimeTrigger create() {

        return new ProcessingTimeTrigger();

    }

}

登入後複製

在onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會註冊一個ProcessingTime計時器,時間Processing參數是window.maxTimestamp(),也就是視窗的最終時間,當時間到達這個視窗最終時間,計時器觸發並呼叫onProcessingTime()方法,在onProcessingTime() 方法中,return TriggerResult.FIRE 即傳回FIRE,觸發視窗中資料的計算,但是會保留視窗元素。

要注意的是ProcessingTimeTrigger類別只會在視窗的最終時間到達的時候觸發視窗函數的計算,計算完成後並不會清除視窗中的數據,這些資料儲存在記憶體中,除非呼叫PURGE或FIRE_AND_PURGE,否則資料將一直存在記憶體中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對視窗中的資料進行清除。

常見窗口的Trigger

滾動窗口

1

2

3

4

5

6

TumblingEventTimeWindows :EventTimeTrigger

public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

            return EventTimeTrigger.create();

        }

}

登入後複製

TumblingProcessingTimeWindows :ProcessingTimeTrigger

1

2

3

4

5

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

        return ProcessingTimeTrigger.create();

    }

}

登入後複製

滑動窗口

1

2

3

4

5

6

SlidingEventTimeWindows:EventTimeTrigger

public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

        return EventTimeTrigger.create();

    }

}

登入後複製

SlidingProcessingTimeWindows :ProcessingTimeTrigger

1

2

3

4

5

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

            return ProcessingTimeTrigger.create();

        }

}

登入後複製

會話視窗

1

2

3

4

5

6

EventTimeSessionWindows:EventTimeTrigger

public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

        return EventTimeTrigger.create();

    }

}

登入後複製

ProcessingTimeSessionWindows:ProcessingTimeTrigger

1

2

3

4

5

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

        return ProcessingTimeTrigger.create();

    }

}

登入後複製

全域視窗

1

2

3

4

5

6

GlobalWindows :NeverTrigger

public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {

     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {

            return new GlobalWindows.NeverTrigger();

        }

}

登入後複製

以上是Java Flink視窗觸發器Trigger怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板