Apache FlinkCEP でタイムアウトステータス監視を実装するための詳細な手順

Apache FlinkCEP でタイムアウトステータス監視を実装するための詳細な手順

CEP - 複合イベント処理。

ご注文後、一定期間内にお支払いの確認が取れませんでした。

タクシーの配車依頼は出されましたが、一定時間内に乗客が乗車することが確認されませんでした。

テイクアウト商品は、配達予定時間を過ぎても一定時間内に配達されることが確認できません。

Apache FlinkCEP API

CEPタイムアウトイベントジョブ

FlinkCEP ソースコードの簡単な分析

データストリームとパターンストリーム

DataStream は通常、同じタイプのイベントまたは要素で構成されます。DataStream は、Filter や Map などの一連の変換操作を通じて別の DataStream に変換できます。

PatternStream は、CEP パターン マッチング ストリームの抽象化であり、DataStream と Pattern を組み合わせ、select や flatSelect などのメソッドを提供します。 PatternStream は DataStream ではありません。一致するパターン シーケンスとそれに関連付けられたイベント (つまり、Map<パターン名、List<イベント>>) で構成されるマップを、DataStream である SingleOutputStreamOperator に送信するメソッドを提供します。

CEPOperatorUtils ツール クラスのメソッドと変数は、「PatternStream」を使用して名前が付けられます。次に例を示します。

公共
 
静的
 <イン、アウト> 
シングル出力ストリーム演算子
<OUT> createPatternStream(...){...}
公共

静的
 <入力、出力1、出力2> 
シングル出力ストリーム演算子
<OUT1> タイムアウトパターンストリームを作成します(...){...}

ファイナル
 
シングル出力ストリーム演算子
<OUT> パターンストリーム;

シングル出力ストリーム演算子

@公共

公共
 
クラス
 
シングル出力ストリーム演算子
<T> 
拡張する
 
データストリーム
<T> {...}

PatternStream構築方法:

パターンストリーム
(
ファイナル
 
データストリーム
<T> 入力ストリーム、 
ファイナル
 
パターン
<T, ?> パターン) {

  
これ
.inputStream = 入力ストリーム;

  
これ
.pattern = パターン;

  
これ
.コンパレータ = 
ヌル
;

}



パターンストリーム
(
ファイナル
 
データストリーム
<T> 入力ストリーム、 
ファイナル
 
パターン
<T, ?> パターン、 
ファイナル
 
イベントコンパレータ
<T> コンパレータ) {

  
これ
.inputStream = 入力ストリーム;

  
これ
.pattern = パターン;

  
これ
.comparator = コンパレータ;

}

パターン、量指定子、イベント比較子

パターンは、パターン定義 (ビルダー モード) の基本クラスです。定義されたパターンは、NFACompiler によって NFA を生成するために使用されます。

timeEnd など、next や followedBy に似たメソッドを独自に実装したい場合は、Pattern を拡張して書き直すことで実現できるはずです。

公共
クラス
パターン
<T,F 
拡張する
 T>
/** モード名 */
プライベート
ファイナル
弦
 名前;
/** 前のパターン*/
プライベート
ファイナル
パターン
<T、? 
拡張する
 T>前へ;
/** 現在のパターンに一致するイベントが満たす必要がある制約 */
プライベート
反復条件
<F> 条件;
/** 時間ウィンドウの長さ、時間長内​​でのパターンマッチング*/
プライベート
時間
 ウィンドウ時間;
/** パターン量指定子。パターンが複数のイベントに一致することを意味します。デフォルトでは 1 つに一致します。*/
プライベート
数量詞
 量指定子 = 
数量詞
。1つ(
消費戦略
。厳しい);
/** ループ状態へのイベントの収集を停止するためにイベントが満たす必要がある条件 */
プライベート
反復条件
<F> 条件まで;
/**
   * {@code times} モードに適用可能。モードでイベントが連続して発生する回数を管理するために使用されます*/
プライベート
タイムズ
 回;
// イベントに一致した後の戦略をスキップする private
ファイナル
試合後のスキップ戦略
 afterMatchSkipStrategy;
  ...
}

量指定子は特定のパターン動作を記述するために使用され、主に次の 3 つのカテゴリがあります。

シングル-シングルマッチ、ループ-ループマッチ、タイム-特定の回数または時間範囲内でのマッチ。

各パターンはオプション (単一一致またはループ一致) であり、ConsumingStrategy セットを持つことができます。

ループと時間には、パターンで受信されたイベント間で使用される追加の内部 ConsumingStrategy もあります。

公共
クラス
数量詞
 {
  ...
/**
   * 5 つの属性を組み合わせることができますが、すべての組み合わせが有効というわけではありません */
公共
列挙型
量指定子プロパティ
 {
    シングル、
    ループ、
    タイムズ、
    オプション、
    よく深い
  }
/**
   * このパターンでどのイベントにマッチするかを記述する戦略 */
公共
列挙型
消費戦略
 {
    厳しい、
    次へスキップ、
    SKIP_TILL_ANY、
    フォローしない、
    次ではない
  }
/**
   * 現在のパターンでイベントが連続して発生する回数を記述します。たとえば、パターン条件はブール値にすぎず、真の条件を満たすイベントは連続して発生するか、2 回から 4 回などの回数の範囲で発生します。2 回、3 回、4 回はすべて現在のパターンに一致するため、同じイベントが繰り返し一致します。*/
公共
静的
クラス
タイムズ
 {
プライベート
ファイナル
整数
 から;
プライベート
ファイナル
整数
 に;
プライベート
タイムズ
(
整数
 から、 
整数
 に)
前提条件
.checkArgument(から> 
0
、 
「from は 0 より大きい正の数である必要があります。」
);
前提条件
.checkArgument(to >= from, 
「to は from 以上の数値である必要があります:」
 + から + 
「。」
);
これ
.from = から;
これ
.to = 〜へ;
    }
公共
整数
 getFrom() {
戻る
 から;
    }
公共
整数
 getTo() {
戻る
 に;
    }
//数値範囲公開
静的
タイムズ
 の(
整数
 から、 
整数
 に)
戻る
新しい
タイムズ
(から、まで)
    }
//公開回数を指定する
静的
タイムズ
 の(
整数
 回)
戻る
新しい
タイムズ
(回、回)
    }
@オーバーライド
公共
ブール値
 等しい(
物体
 o) {
もし
 (
これ
 == o) {
戻る
真実
;
      }
もし
 (o == 
ヌル
 || getClass() != o.getClass()) {
戻る
間違い
;
      }
タイムズ
 回 = (
タイムズ
) o;
戻る
 from == 回.from &&
        to == times.to;
    }
@オーバーライド
公共
整数
 ハッシュコード() {
戻る
オブジェクト
.hash(から、へ);
    }
  }
  ...
}

カスタム イベント コンパレータである EventComparator は、EventComparator インターフェイスを実装します。

公共
 
インタフェース
 
イベントコンパレータ
<T> 
拡張する
 
コンパレータ
<T>、 
シリアル化可能
 {
長さ
 シリアルバージョンUID = 
1L
;
}

NFACompiler と NFA

NFACompiler は、パターンを NFA または NFAFactory にコンパイルするメソッドを提供します。NFAFactory を使用して複数の NFA を作成できます。

公共
クラス
NFAコンパイラ
 {
  ...
/**
   * NFAFactory は NFA のインターフェースを作成します*
   * @param <T> NFAによって処理される入力イベントのタイプ
   */
公共
インタフェース
NFAファクトリー
<T> 
拡張する
シリアル化可能
 {
    NFA<T> を作成します。
  }
  
/**
   * NFAFactory の具体的な実装 NFAFactoryImpl
   *
   * <p>実装は入力タイプシリアライザ、ウィンドウ時間、および
   * 状態とその遷移から NFA を作成できるようにします。
   *
   * @param <T> NFAによって処理される入力イベントのタイプ
   */
プライベート
静的
クラス
NFAファクトリ実装
<T> 
実装する
NFAファクトリー
<T> {
    
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = 
8939783698296714379L
;
    
プライベート
ファイナル
長さ
 ウィンドウ時間;
プライベート
ファイナル
コレクション
<
州
<T>> の状態;
プライベート
ファイナル
ブール値
 タイムアウト処理;
    
プライベート
NFAファクトリ実装
(
長さ
 ウィンドウ時間、
コレクション
<
州
<T>>は次のように述べています。
ブール値
 タイムアウト処理) {
      
これ
.windowTime = ウィンドウ時間;
これ
.states = 状態;
これ
.timeoutHandling = タイムアウト処理;
    }
    
@オーバーライド
公共
 NFA<T> 作成NFA() {
// NFA は、状態セット、時間ウィンドウの長さ、およびタイムアウトを処理するかどうかで構成されます。
新しい
 NFA<>(状態、ウィンドウ時間、タイムアウト処理);
    }
  }
}

NFA: 非決定性有限オートマトン - 非決定性有限(状態)オートマトン。

詳細については、

https://zh.wikipedia.org/wiki/非決定性有限状態オートマトン

公共
クラス
 NFA<T> {
/**
   * NFACompiler によって返されるすべての有効な NFA 状態のセット * これらは、ユーザー指定のパターンから直接派生されます。
   */
プライベート
ファイナル
地図
<
弦
、 
州
<T>> の状態;
  
/**
   * Pattern.within(Time) は時間ウィンドウの長さを指定します */
プライベート
ファイナル
長さ
 ウィンドウ時間;
  
/**
   * タイムアウトマッチマーカー */
プライベート
ファイナル
ブール値
 ハンドルタイムアウト;
  ...
}

パターン選択関数とパターンフラット選択関数

一致したイベントを含むマップがパターン名を介してアクセスできる場合、PatternSelectFunction の select() メソッドが呼び出されます。パターン名は、パターンを定義するときに指定されます。 select() メソッドは、正確に 1 つの結果を返します。複数の結果を返す必要がある場合は、PatternFlatSelectFunction を実装できます。

公共
 
インタフェース
 
パターン選択関数
<イン、アウト> 
拡張する
 
関数
、 
シリアル化可能
 {



  
/**

   * 指定されたイベント マップから結果を生成します。これらのイベントは、関連付けられているスキーマの名前によって一意に識別されます */

  OUT選択(
地図
<
弦
、 
リスト
<IN>>パターン) 
投げる
 
例外
;

}

PatternFlatSelectFunction は、OUT を返す代わりに、Collector を使用して一致したイベントを収集します。

公共
インタフェース
パターンフラット選択関数
<イン、アウト> 
拡張する
関数
、 
シリアル化可能
 {
  
/**
   * 1つ以上の結果を生成する */
空所
 フラット選択(
地図
<
弦
、 
リスト
<IN>> パターン、 
コレクタ
<OUT> アウト) 
投げる
例外
;
}

SelectTimeoutCepOperator、パターンタイムアウト関数

SelectTimeoutCepOperator は、CEPOperatorUtils で createTimeoutPatternStream() メソッドが呼び出されたときに作成されます。

演算子の反復によって呼び出される SelectTimeoutCepOperator 内のメソッドは、processMatchedSequences() と processTimedOutSequences() です。

テンプレート メソッドは、抽象クラス AbstractKeyedCEPPatternOperator の processEvent() メソッドと advanceTime() メソッドに対応します。

FlatSelectTimeoutCepOperator と対応する PatternFlatTimeoutFunction もあります。

公共
クラス
タイムアウトCep演算子を選択
<IN、OUT1、OUT2、キー>
拡張する
抽象キー付きCEPパターン演算子
<IN、キー、OUT1、 
タイムアウトCep演算子を選択
。
選択ラッパー
<IN、OUT1、OUT2>> {
プライベート
出力タグ
<OUT2> timedOut出力タグ;
公共
タイムアウトCep演算子を選択
(
タイプシリアライザー
<IN> 入力シリアライザー、
ブール値
 処理時間、
NFAコンパイラ
。
NFAファクトリー
<IN> nfaファクトリー、
ファイナル
イベントコンパレータ
<IN> コンパレータ、
試合後のスキップ戦略
 スキップ戦略、
// パラメータの命名によりフラットな名前が混乱します...SelectWrapper クラスのメンバーの命名も含みます...
パターン選択関数
<IN, OUT1> フラット選択関数、
パターンタイムアウト関数
<IN, OUT2> フラットタイムアウト関数、
出力タグ
<OUT2> 出力タグ、
出力タグ
<IN> 遅延データ出力タグ) {
素晴らしい
(
      入力シリアライザー、
      処理時間、
      nfaファクトリー、
      コンパレータ、
      スキップ戦略、
新しい
選択ラッパー
<>(フラット選択関数、フラットタイムアウト関数)、
      遅延データ出力タグ);
これ
.timedOutOutputTag = 出力タグ;
  }
  ...
}
公共
インタフェース
パターンタイムアウト関数
<イン、アウト> 
拡張する
関数
、 
シリアル化可能
 {
  OUTタイムアウト(
地図
<
弦
、 
リスト
<IN>> パターン、 
長さ
 タイムアウトタイムスタンプ) 
投げる
例外
;
}
公共
インタフェース
パターンフラットタイムアウト関数
<イン、アウト> 
拡張する
関数
、 
シリアル化可能
 {
空所
 タイムアウト(
地図
<
弦
、 
リスト
<IN>> パターン、 
長さ
 タイムアウトタイムスタンプ、 
コレクタ
<OUT> アウト) 
投げる
例外
;
}

CEP と CEPOperatorUtils

CEP は、PatternStream を作成するためのツール クラスです。PatternStream は、DataStream と Pattern を組み合わせたものです。

公共
クラス
 CEP {
  
公共
静的
 <T> 
パターンストリーム
<T> パターン(
データストリーム
<T> 入力、 
パターン
<T, ?> パターン) {
戻る
新しい
パターンストリーム
<>(入力、パターン);
  }
  
公共
静的
 <T> 
パターンストリーム
<T> パターン(
データストリーム
<T> 入力、 
パターン
<T, ?> パターン、 
イベントコンパレータ
<T> コンパレータ) {
戻る
新しい
パターンストリーム
<>(入力、パターン、比較子);
  }
}

CEPOperatorUtils は、PatternStream の select() メソッドと flatSelect() メソッドが呼び出されたときに、SingleOutputStreamOperator (DataStream) を作成します。

公共
クラス
CEP オペレータユーティリティ
 {
  ...
プライベート
静的
 <イン、アウト、K> 
シングル出力ストリーム演算子
<OUT> パターンストリームを作成します(
ファイナル
データストリーム
<IN> 入力ストリーム、
ファイナル
パターン
<IN, ?> パターン、
ファイナル
タイプ情報
<OUT> 出力タイプ情報、
ファイナル
ブール値
 タイムアウト処理、
ファイナル
イベントコンパレータ
<IN> コンパレータ、
ファイナル
オペレータビルダー
<IN, OUT> 演算子ビルダー) {
ファイナル
タイプシリアライザー
inputStream にシリアライザーを追加します。
    
// 処理時間を使用するかどうかをチェックする
ファイナル
ブール値
 isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
時間特性
。
処理時間
;
    
// パターンを NFAFactory にコンパイルして、後で NFA をインスタンス化します
ファイナル
NFAコンパイラ
。
NFAファクトリー
<IN> nfaファクトリー = 
NFAコンパイラ
.compileFactory(パターン、タイムアウト処理);
    
ファイナル
シングル出力ストリーム演算子
<OUT> パターンストリーム;
    
もし
 (入力ストリーム 
インスタンス
キーストリーム
){
キーストリーム
<IN, K> キー付きストリーム = (
キーストリーム
<IN, K>) 入力ストリーム;
      パターンストリーム = キー付きストリーム.transform(
        演算子ビルダー.getKeyedOperatorName()、
        出力タイプ情報、
        演算子ビルダー.ビルド(
          入力シリアライザー、
          処理時間、
          nfaファクトリー、
          コンパレータ、
          パターン.getAfterMatchSkipStrategy()));
    } 
それ以外
 {
キーセレクタ
<IN、 
バイト
> キーセレクタ = 
新しい
NullByteKeySelector
<>();
      パターンストリーム = inputStream.keyBy(keySelector).transform(
        演算子ビルダー.getOperatorName()、
        出力タイプ情報、
        演算子ビルダー.ビルド(
          入力シリアライザー、
          処理時間、
          nfaファクトリー、
          コンパレータ、
          パターン.getAfterMatchSkipStrategy()
        )).forceNonParallel();
    }
    
戻る
 パターンストリーム;
  }
  ...
}

FlinkCEP 実装手順

  1. IN: データソース -> データストリーム -> 変換 -> データストリーム
  2. パターン: Pattern.begin.where.next.where...times...
  3. パターンストリーム: CEP.pattern(データストリーム、パターン)
  4. データストリーム: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. 出力: データストリーム -> 変換 -> データストリーム -> データシンク

FlinkCEP マッチングタイムアウトの実装手順

TimeoutCEP のストリームには、keyBy、つまり KeyedStream が必要です。inputStream が KeyedStream でない場合は、新しい 0 バイトのキーが作成されます (上記の CEPOperatorUtils ソース コードに記載されています)。

キーセレクタ
<IN、 
バイト
> キーセレクタ = 
新しい
 
NullByteKeySelector
<>();

パターンは最終的に within を呼び出してウィンドウ時間を設定します。 主キーでグループ化すると、時間ウィンドウ内で一致するタイムアウト イベントは最大 1 つになるため、PatternStream.select(...) を使用できます。

  1. IN: データソース -> データストリーム -> 変換 -> データストリーム -> keyBy -> KeyedStream
  2. パターン: Pattern.begin.where.next.where...within(Time windowTime)
  3. パターンストリーム: CEP.pattern(KeyedStream, パターン)
  4. 出力タグ: 新しい出力タグ(...)
  5. 単一の出力ストリーム演算子: PatternStream.flatSelect(OutputTag、PatternFlatTimeoutFunction、PatternFlatSelectFunction)
  6. データストリーム: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. 出力: データストリーム -> 変換 -> データストリーム -> データシンク

FlinkCEPのタイムアウトが不十分です

Flink ウィンドウの集約と同様に、依存イベントによって生成されたイベント時間とウォーターマークを使用して前進する場合、ウィンドウがトリガーされて結果を計算して出力する前に、後続のイベントが到着する必要があります。

FlinkCEP タイムアウト完了デモ

公共
クラス
CEPタイムアウトイベントジョブ
 {
プライベート
静的
ファイナル
弦
 ローカルKAFKAブローカー = 
"ローカルホスト:9092"
;
プライベート
静的
ファイナル
弦
 グループID = 
CEPタイムアウトイベントジョブ
。
クラス
.getSimpleName();
プライベート
静的
ファイナル
弦
 GROUP_TOPIC = グループID;
  
公共
静的
空所
 主要(
弦
[] 引数) 
投げる
例外
 {
// パラメータツール
 パラメータ = 
パラメータツール
.fromArgs(引数);
    
ストリーム実行環境
 環境 = 
ストリーム実行環境
.getExecutionEnvironment();
// イベント時間を使用する env.setStreamTimeCharacteristic(
時間特性
。
イベントタイム
);
    env.enableチェックポイント(
5000
);
    env.getCheckpointConfig().enableExternalizedCheckpoints() を設定します。
チェックポイント設定
。
外部化されたチェックポイントクリーンアップ
.RETAIN_ON_CANCELLATION);
    env.getConfig().sysoutLogging() を無効にします。
    env.getConfig().setRestartStrategy() を設定します。
再起動戦略
.fixedDelayRestart() は、
5
、 
10000
));
    
// POJOタイムファイナルは使用しないでください
定期的なウォーターマークの割り当て
 抽出器 = 
新しい
摂取時間抽出
<POJO>();
    
// Kafka Topic の Partitionenv.setParallelism( との一貫性を保つ
3
);
    
プロパティ
 カフカプロパティ = 
新しい
プロパティ
();
    kafkaProps.setProperty() は、
「bootstrap.servers」
、LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty() は、
「グループID」
、GROUP_ID);
    
// Kafka メッセージにアクセスする FlinkKafkaConsumer011
<POJO> コンシューマー = 
新しい
フリンクカフカコンシューマー011
<>(GROUP_TOPIC、 
新しい
POJOスキーマ
(), kafkaProps);
データストリーム
<POJO> pojoDataStream = env.addSource(コンシューマー)
        .assignTimestampsAndWatermarks(抽出子);
    pojoDataStream.print();
    
// 主キーによるグループ化、つまり各 POJO イベントで一致検出を実行します [異なるタイプの POJO は異なる within 時間を使用できます]
// 1.
データストリーム
<POJO> キー付きPojos = pojoDataStream
        .keyBy() は、
"援助"
);
    
// 初期化から最終状態まで - 完全な POJO イベント シーケンス // 2.
パターン
<POJO, POJO> 完了Pojo =
パターン
.<POJO>開始(
「初期化」
)
            。どこ(
新しい
シンプル条件
<POJO>() {
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = -
6847788055093903603L
;
              
@オーバーライド
公共
ブール値
 フィルター(POJO pojo) 
投げる
例外
 {
戻る
「02」
.equals(pojo.getAstatus());
              }
            })
            。に続く(
"終わり"
)
// .next("終了")
            。どこ(
新しい
シンプル条件
<POJO>() {
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = -
2655089736460847552L
;
              
@オーバーライド
公共
ブール値
 フィルター(POJO pojo) 
投げる
例外
 {
戻る
「00」
.equals(pojo.getAstatus()) が返されます。 
「01」
.equals(pojo.getAstatus());
              }
            });
    
// 1 分以内に最終状態に到達していないイベント エイドを検索します [テスト目的]
// 異なるタイプで異なる時間がある場合、たとえば、タイムアウトが 1 分の場合もあれば、タイムアウトが 1 時間の場合もあり、複数の PatternStreams を生成します。
// 3.
パターンストリーム
<POJO> patternStream = CEP.pattern(keyedPojos, completePojo.within(
時間
。分(
1
)));
    
// サイド出力タイムアウトを定義する
// 4.
出力タグ
<POJO> タイムアウト = 
新しい
出力タグ
<POJO>(
「タイムアウト」
){
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = 
773503794597666247L
;
    };
    
// OutputTag<L> timeoutOutputTag、PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction、PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
シングル出力ストリーム演算子
<POJO> timeoutPojos = patternStream.flatSelect(
        タイムアウト、
新しい
POJOタイムアウト
()、
新しい
フラット選択なし
()
    );
    
//タイムアウトPOJOを出力する
// 6.7.
    タイムアウトPojos.getSideOutput(タイムアウト).print();
    タイムアウトPojos.print();
    env.execute() を実行します。
CEPタイムアウトイベントジョブ
。
クラス
.getSimpleName());
  }
  
/**
   * タイムアウトイベントを収集する */
公共
静的
クラス
POJOタイムアウト
実装する
パターンフラットタイムアウト関数
<POJO、POJO> {
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = -
4214641891396057732L
;
    
@オーバーライド
公共
空所
 タイムアウト(
地図
<
弦
、 
リスト
<POJO>> マップ、 
長さ
 l、 
コレクタ
<POJO> コレクター) 
投げる
例外
 {
もし
 (
ヌル
 != マップ.get(
「初期化」
)) {
のために
 (POJO pojoInit: map.get(
「初期化」
)) {
システム
.out.println(
「タイムアウト初期化:」
 + pojoInit.getAid());
          コレクター。収集します(pojoInit);
        }
      }
// 終了がタイムアウトし、終了を受信して​​いないため、ここでは終了を取得できません。
.out.println(
「タイムアウト終了:」
 + マップを取得します(
"終わり"
));
    }
  }
  
/**
   * 通常は何もしませんが、一致するすべてのイベントを下流に送信することもできます。近接性が弱い場合は、無視されたイベントや侵入されたイベントを選択して下流に送信することはできません。* 1 分以内に開始データと終了データを完了します。*
   * @param <T>
   */
公共
静的
クラス
フラット選択なし
<T> 
実装する
パターンフラット選択関数
<T, T> {
プライベート
静的
ファイナル
長さ
 シリアルバージョンUID = -
3029589950677623844L
;
    
@オーバーライド
公共
空所
 フラット選択(
地図
<
弦
、 
リスト
<T>>パターン、 
コレクタ
<T> コレクター) {
システム
.out.println(
"フラット選択: "
 + パターン);
    }
  }
}

テスト結果 (followedBy):

3
> POJO{援助=
'ID000-0'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419728242
、エネルギー=
529.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-1'
、スタイル=
'スタイル000-2'
、名前=
'名前-1'
、ログ時間=
1563419728783
、エネルギー=
348.00
、年齢=
26
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-0'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419749259
、エネルギー=
492.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'00'
、作成時間=
ヌル
、更新時間=
ヌル
}
flatSelect: {init=[POJO{aid=
'ID000-0'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419728242
、エネルギー=
529.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、createTime=
ヌル
、更新時間=
ヌル
}], 
終わり
=[POJO{援助=
'ID000-0'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419749259
、エネルギー=
492.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'00'
、作成時間=
ヌル
、更新時間=
ヌル
}]}
タイムアウト初期化:ID000-
1
3
> POJO{援助=
'ID000-1'
、スタイル=
'スタイル000-2'
、名前=
'名前-1'
、ログ時間=
1563419728783
、エネルギー=
348.00
、年齢=
26
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、作成時間=
ヌル
、更新時間=
ヌル
}
タイムアウト 
終わり
: 
ヌル
3
> POJO{援助=
'ID000-2'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419829639
、エネルギー=
467.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'03'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-2'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419841394
、エネルギー=
107.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'00'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-3'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419967721
、エネルギー=
431.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-3'
、スタイル=
'スタイル000-2'
、名前=
'名前-0'
、ログ時間=
1563419979567
、エネルギー=
32.00
、年齢=
26
、 tt=
2019
-
07
-
18
、ステータス=
'03'
、createTime=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-3'
、スタイル=
'スタイル000-2'
、名前=
'名前-0'
、ログ時間=
1563419993612
、エネルギー=
542.00
、年齢=
26
、 tt=
2019
-
07
-
18
、ステータス=
'01'
、作成時間=
ヌル
、更新時間=
ヌル
}
flatSelect: {init=[POJO{aid=
'ID000-3'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563419967721
、エネルギー=
431.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、createTime=
ヌル
、更新時間=
ヌル
}], 
終わり
=[POJO{援助=
'ID000-3'
、スタイル=
'スタイル000-2'
、名前=
'名前-0'
、ログ時間=
1563419993612
、エネルギー=
542.00
、年齢=
26
、 tt=
2019
-
07
-
18
、ステータス=
'01'
、createTime=
ヌル
、更新時間=
ヌル
}]}
3
> POJO{援助=
'ID000-4'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563420063760
、エネルギー=
122.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、作成時間=
ヌル
、更新時間=
ヌル
}
3
> POJO{援助=
'ID000-4'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563420078008
、エネルギー=
275.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'03'
、作成時間=
ヌル
、更新時間=
ヌル
}
タイムアウト初期化:ID000-
4
3
> POJO{援助=
'ID000-4'
、スタイル=
'スタイル000-0'
、名前=
'名前-0'
、ログ時間=
1563420063760
、エネルギー=
122.00
、年齢=
0
、 tt=
2019
-
07
-
18
、ステータス=
'02'
、createTime=
ヌル
、更新時間=
ヌル
}
タイムアウト 
終わり
: 
ヌル

要約する

上記は、私が紹介した Apache FlinkCEP でタイムアウト ステータス監視を実装するための手順です。お役に立てば幸いです。ご質問がある場合は、メッセージを残してください。すぐに返信いたします。

以下もご興味があるかもしれません:
  • Flink エントリーレベルのアプリケーションドメイン名処理の例
  • Flinkのコア原則を分析し、コア抽象化を実装する
  • IDEA で Flink タスクを実行するための実践的なチュートリアル
  • IDEAでFlink開発環境を構築してテストする方法
  • Apache Hudi と Flink を組み合わせて数十億のデータをレイクに保存する実践の分析

<<:  MySQL 圧縮版 zip のインストールに関する問題の解決策

>>:  Vue印刷機能を実装する2つの方法の概要

推薦する

HTMLページの読み込みと解析プロセスの詳細な紹介

ブラウザがHTMLを読み込みレンダリングする順序1. IE は上から下へダウンロードし、上から下へレ...

tomcat8の最新のLinuxインストールプロセス

ダウンロード参考:ダウンロードするコアパッケージを選択してくださいダウンロード後、ファイルをサーバー...

ニューススタイルのウェブサイトデザイン例25選

bmi ボイジャーピッチフォークアルスター食料品店チャウ真/斜めポスタこれは偽のDIYですクリエイテ...

Vue3 がデータ監視を実装するためにプロキシを使用する理由の分析

Vue データの双方向バインディング原則ですが、この方法には欠点があり、配列とオブジェクトの部分的な...

シンプルなタブバー切り替えコンテンツバーを実装するJavaScript

この記事では、タブバーの切り替えコンテンツバーを簡単に実現するためのJavaScriptの具体的なコ...

Pythonで書かれたWebアプリケーションをDockerでデプロイする実践

目次1. Dockerをインストールする2. コードを書く3. Dockerfileを書く4. 画像...

CSS3 で Taobao に空白スペースを実装する方法

Taobao用の空白スペースを作成します。 ブラウザページを縮小すると、コンテンツ領域は縮小されませ...

Win10 64ビットMySQL8.0のダウンロードとインストールのチュートリアル図

公式サイトから MySQL をダウンロードしてインストールし、クライアントにログインするにはどうすれ...

docker compose を使用して consul クラスタ環境を構築する例

領事の基本概念サーバーモードとクライアントモードサーバー モードとクライアント モードは、consu...

Vue プロジェクトに ECharts を導入する

目次1. インストール2. はじめに3. 使用4. 必要に応じてEChartsチャートとコンポーネン...

MYSQL から MARIADB へのプロジェクト移行に関するチュートリアル

データベース (MySQL) を準備します。すでに MySQL をお持ちの場合は、これを無視できます...

jsとcssのブロッキング問題の詳細な分析

目次DOMContentLoadedとロードjs ブロッキングとは何ですか? CSS ブロッキングと...

どのような種類の MYSQL 接続クエリを知っていますか?

序文クエリ情報が複数のテーブルから取得される場合、クエリのためにこれらのテーブルを結合する必要があり...

Linux のユーザーとグループ管理によく使われるコマンドの概要

この記事では、Linux のユーザーとグループの管理によく使用されるコマンドをまとめます。ご参考まで...