火花2.2结构流的foreach作家JDBC水槽滞后

问题描述 投票:2回答:3

我在一个项目中使用的火花2.2结构流读卡夫卡味精到Oracle数据库。消息流进卡夫卡大约是每秒4000-6000消息。


使用时,HDFS文件系统作为沉目的地,它只是正常工作。使用的foreach JDBC作家的时候,它会随着时间而有巨大的延迟。我认为,延迟是由foreach循环引起的。

JDBC的接收器类(独立的类文件):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "oracle.jdbc.driver.OracleDriver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }

  def process(value: org.apache.spark.sql.Row): Unit = {
    statement.setString(1, value(0).toString)
    statement.setString(2, value(1).toString)
    statement.setString(3, value(2).toString)
    statement.setString(4, value(3).toString)
    statement.executeUpdate()        
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

水槽部分:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
  .option("subscribe", "rawdb.raw_data")
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .filter(some logic).select(some logic) 
  .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

如果我改变的最后一行

.writeStream.format("csv")...

成JDBC的foreach水槽如下:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()

滞后出现。

我想最有可能通过foreach循环机制,它不是在批处理模式下处理像几千行的批次,作为Oracle DBA要么,我已经微调Oracle数据库方面,主要是数据库正在等待空闲事件发生的原因。过度提交正试图通过已经设定connection.setAutoCommit(false)被避免,任何建议将非常感激。

apache-spark jdbc foreach spark-streaming sink
3个回答
2
投票

我虽然不具备什么花费最长的时间在你的应用程序的实际轮廓,我会以为这是由于这样的事实,使用ForeachWriter将有效地关闭并重新打开每次运行你的JDBC连接,因为这是如何工作的ForeachWriter

我会建议,而不是使用它,在那里你控制如何连接被打开或关闭编写JDBC自定义Sink

还有,你可以采取偷看看到一个可能的方法来实施开放pull request to add a JDBC driver to Spark


2
投票

由结果注入到另一个话题卡夫卡解决的问题,然后写在新的话题读取另一个程序并将其写入到数据库批次。

我想在明年发布的火花,他们可能提供的JDBC水槽,并有一些参数设置批量大小。

主代码是如下:

写一个话题:

  .writeStream.format("kafka")
  .option("kafka.bootstrap.servers", "x.x.x.x:9092")
  .option("topic", "fastdbtest")
  .option("checkpointLocation", "/user/root/chk")
  .start()

阅读的主题和写入数据库,我使用的C3P0连接池

lines.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
      //get a connection from connection pool
      val conn = ConnManager.getManager.getConnection
      val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)")
      try {
        conn.setAutoCommit(false)
        partitionRecords.foreach(record => {
          insertIntoDB(ps, record)
        }
        )
        ps.executeBatch()
        conn.commit()
      } catch {
        case e: Exception =>{}
        // do some log
      } finally {
        ps.close()
        conn.close()
      }
    })
  }
})

0
投票

你有没有使用触发器试过吗?

我注意到,当我没有用一个触发我的foreach水槽打开和关闭几次连接到数据库。

writeStream.foreach(writer).start()

但是,当我使用的触发,在Foreach只有打开和关闭连接一次,处理例如200次的查询,当微批次结束,直到一个新的微批次收到关闭了连接。

writeStream.trigger(Trigger.ProcessingTime("3 seconds")).foreach(writer).start()

我用例是从只有一个分区的卡夫卡话题读书,所以星火我想用一个分区。我不知道这是否解决方案的工作原理相同多个星火分区,但在这里,我的结论是在这个过程中方法的foreach过程中的所有微批次在时间(逐行),不调用open()和close()像很多人每一行认为。

© www.soinside.com 2019 - 2024. All rights reserved.