无法理解TwoPhaseCommitSinkFunction生命周期

问题描述 投票:0回答:1

我需要一个到Postgres DB的接收器,所以我开始构建一个自定义的Flink SinkFunction。当FlinkKafkaProducer实现TwoPhaseCommitSinkFunction时,我决定做同样的事情。如O'Reilley的书使用Apache Flink进行流处理中所述,您只需要实现抽象方法,启用检查点就可以了。但是,当我运行我的代码时,真正发生的是commit方法仅被调用一次,并且在invoke之前被调用,这完全是意外的,因为如果您准备好了一套可以提交事务为空。最糟糕的是,提交后,我文件中存在的所有事务行都调用invoke,然后调用abort,这更加意外。

初始化接收器时,据我了解,将发生以下情况:

  1. beginTransaction被调用并发送标识符以调用
  2. 根据收到的标识符,调用将行添加到事务中
  3. 预提交对当前交易数据进行所有最终修改
  4. commit处理预先提交的数据的最终事务

所以,我不明白为什么我的程序没有显示此行为。

这里是我的接收器代码:

package PostgresConnector

import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}




class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){

    private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()

    private var parsedQuery : PreparedStatement = _

    private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"

    override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val res = this.transactionMap.get(transaction)

        if(res.isDefined){

            var array = res.get

            array = array ++ Array(value)

            this.transactionMap += (transaction -> array)

        }else{

            val array = Array(value)

            this.transactionMap += (transaction -> array)


        }

        LOG.info("\n\nPassing through invoke\n\n")

        ()

    }

    override def beginTransaction(): String = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val identifier = UUID.randomUUID.toString

        LOG.info("\n\nPassing through beginTransaction\n\n")

        identifier


    }

    override def preCommit(transaction: String): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        try{

            val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)

            if(tuple.isDefined){

                tuple.get.foreach( (value : (String,String,String,String)) => {

                    LOG.info("\n\n"+value.toString()+"\n\n")

                    this.parsedQuery.setString(1,value._1)
                    this.parsedQuery.setString(2,value._2)
                    this.parsedQuery.setString(3,value._3)
                    this.parsedQuery.setString(4,value._4)
                    this.parsedQuery.addBatch()

                })

            }

        }catch{

            case e : SQLException =>
                LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")

            case f : ParseException =>
                LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")

            case g : NoSuchElementException =>
                LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")

            case h : Exception =>
                LOG.info("\n\nError when adding transaction to batch: Exception\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through preCommit...\n\n")
    }

    override def commit(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        if(this.parsedQuery != null) {
            LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
        }

        try{

            this.parsedQuery.executeBatch
            val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
            LOG.info("\n\nExecuting batch\n\n")

        }catch{

            case e : SQLException =>
                val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
                LOG.info("\n\n"+"Error : SQLException"+"\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through commit...\n\n")

    }

    override def abort(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through abort...\n\n")

    }

    override def open(parameters: Configuration): Unit = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val driver = props.getProperty("driver")
        val url = props.getProperty("url")
        val user = props.getProperty("user")
        val password = props.getProperty("password")
        Class.forName(driver)
        val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
        this.parsedQuery = connection.prepareStatement(insertionString)

        LOG.info("\n\nConfiguring BD conection parameters\n\n")
    }
}

这是我的主程序:

package FlinkCEPClasses

import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}

class FlinkCEPPipeline {

  val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
  LOG.info("\n\nStarting the pipeline...\n\n")

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10)
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  env.setParallelism(1)

  //var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")

  var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")

  var properties : Properties = new Properties()

  properties.setProperty("driver","org.postgresql.Driver")
  properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
  properties.setProperty("user","luca")
  properties.setProperty("password","root")

  tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
  tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)

  env.execute()


}

我的S2PMapFunction代码:

package FlinkCEPClasses

import org.apache.flink.api.common.functions.MapFunction

case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {

    override def map(value: String): (String, String, String,String) = {


            var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')

            (tuple(0),tuple(1),tuple(2),tuple(3))

    }
}

我的管道是这样的:我从文件中读取行,将它们映射到字符串元组,并使用元组中的数据将其保存在Postgres DB中]]

[如果要模拟数据,只需创建带有以下格式的行的文件:(field1,field2,pointx,pointy)

我需要一个到Postgres DB的接收器,所以我开始构建一个自定义的Flink SinkFunction。当FlinkKafkaProducer实现TwoPhaseCommitSinkFunction时,我决定做同样的事情。如O'Reilley's ...

scala intellij-idea stream apache-flink
1个回答
0
投票

我不是这个主题的专家,但是有两个猜测:

© www.soinside.com 2019 - 2024. All rights reserved.