有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Flink CEP不在事件时间工作,但在处理时间工作

当我使用Flink CEP代码处理时间(默认配置)时,我能够获得所需的模式匹配,但在配置env to Event time时,我无法获得任何模式匹配

 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(3000) // checkpoint every 3000 msec
     val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))

  val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here

    val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
   val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
      val patternStream = CEP.pattern(event, pattern1)
    val warnID = patternStream.sideOutputLateData(latedata).select(value =>  {
      val v = value.mapValues(c => c.toList.toString)
      Json(DefaultFormats).write(v).replace("\\\"", "\"")
        //.replace("List(","{").replace(")","}")
    })
    val latedatastream = warnID.getSideOutput(latedata)
    latedatastream.print("late_data")


    warnID.print("warning")
    event.print("event")

时间戳提取代码

object ExtractAndAssignEventTime {
  def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
    if(!(timeFormat.equalsIgnoreCase("Unix"))){
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
        }
      })
      EventTimeStream
    }
    else{
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          (t.get(timeColumn).toString.toLong)
        }
      })
      EventTimeStream
    }
  }

请帮我解决这个问题。提前谢谢


共 (2) 个答案

  1. # 1 楼答案

    因为您使用的是AssingerWithPeriodicWatermark,所以还需要设置setAutowatermarkInterval,以便Flink使用这个间隔来生成水印

    您可以通过调用env.getConfig.setAutoWatermarkInterval([interval])来实现这一点

    对于事件时间,CEP基于水印,因此如果不生成水印,则基本上不会有输出

  2. # 2 楼答案

    我也有同样的问题,我刚刚“解决”了它,但答案没有多大意义(至少对我来说),你会看到的

    说明:

    在我最初的代码中,我有:

    var env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(1)
    
    ...
    
    var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
        
        
    var tupleStream = stream.map(new S2TMapFunction())
    tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
    
    val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
    
    val patternStream = CEP.pattern(newTupleStream,pattern)
    
    val result = patternStream.process(new MyPatternProcessFunction())
    

    根据我的日志记录,我看到SameRegionFunctionMyPatternProcessFunction都没有被执行,至少可以说这是非常出乎意料的

    答复:

    由于我一无所知,我决定测试让我的流再经过一个转换函数,只是为了检查我的事件是否真的被插入流中。因此,我将tupleStream提交给一个映射操作,生成newTupleStream,如下所示:

    var env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(1)
    
    ...
    
    var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
    
    
    /* I created 'DoNothingMapFunction', where the output event = input event*/
    var tupleStream = stream.map(new S2TMapFunction())
    var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
    
    
    val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
    
    val patternStream = CEP.pattern(newTupleStream,pattern)
    
    val result = patternStream.process(new MyPatternProcessFunction())
    

    然后SameRegionFunctionMyPatternProcessFunction决定运行

    Obs:

    我换了一行:

    var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
    

    为此:

    var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
    

    它也起了作用。显然,仅仅是另一个间接层次就足以让它发挥作用,尽管我不清楚它为什么会发生