Flink没有向Elasticsearch添加任何数据,但没有错误

问题描述 投票:0回答:1

[伙计,我是所有这些数据流处理过程的新手,但我能够构建并提交Flink作业,该作业将从Kafka读取一些CSV数据并将其汇总,然后将其放入Elasticsearch。

我能够完成前两部分,并将我的汇总打印到STDOUT。但是,当我添加代码以将其放入Elasticsearch时,似乎没有任何反应(未添加任何数据)。我查看了Flink作业管理器日志,它看起来不错(没有错误),并说:

2020-03-03 16:18:03,877 INFO 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge
- Created Elasticsearch RestHighLevelClient connected to [http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200]

这是我的代码:

/*
 * This Scala source file was generated by the Gradle 'init' task.
 */
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    println("***** NEW EXECUTION STARTED AT " + LocalDateTime.now() + " *****")

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc")
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    // Elasticsearch elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

我不确定如何调试这个野兽...想知道是否有人可以帮助我弄清楚为什么Flink作业没有向Elasticsearch添加数据:(从我的Flink集群中,我可以很好地(手动)查询Elasticsearch并将记录添加到我的索引中:

curl -XPOST "http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200/transfers-sum/_doc" -H 'Content-Type: application/json' -d'{"curr_careUnit":"TEST123","sum":"123"}'
elasticsearch apache-flink flink-streaming
1个回答
0
投票

Flink mailist中一位善良的灵魂指出了这样一个事实,它可能是Elasticsearch缓冲我的记录...是的。 ;)

我已经在Elasticsearch连接器中添加了以下选项:

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)
© www.soinside.com 2019 - 2024. All rights reserved.