使用 Spark Listener 获取 Spark 作业中进行的所有读/写查询

问题描述 投票:0回答:2

我想获取当前 Spark 作业中已进行的所有读/写查询(使用数据集 API)的列表。例如,

Dataset<Row> readDataFrame = spark.read()
            .format("jdbc")
            .option("url", drivingUrl)
            .option("dbtable", "Select * from A where country_code='US'")
            .option("driver", driver)
            .load();

我希望捕获查询:

Select * from A where country_code='US'
。我尝试使用侦听器来实现此目的,以便我可以捕获我正在运行的任何 Spark-Submit 作业的信息,而无需更改主代码本身。

我尝试过的

  1. 查询执行监听器
@Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        SparkPlan sparkPlan = qe.executedPlan();
        //Tried to search the methods/properties inside it, but couldn't find anything
    }

我尝试在 SQLMetrics、子 Spark 计划等中查找,但无法获取我正在搜索的信息。

  1. SparkListenerSQLExecutionStart
@Override
    public void onOtherEvent(SparkListenerEvent event) {
        if (event instanceof SparkListenerSQLExecutionStart) {
            SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
            SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();


            System.out.println(sparkListenerSQLExecutionStart.description());
            System.out.println(sparkListenerSQLExecutionStart.details());
            System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
    }

同样,这些详细信息(以及我查看的其他详细信息)没有我正在寻找的查询信息。

我相信捕获这些信息是可能的,因为我已经看到了像 SparkSplineAgent 这样的项目以及像 this 这样的 StackOverflow 中的问题,但我一直无法弄清楚如何实现。

有人可以帮我吗?

apache-spark apache-spark-sql listener
2个回答
4
投票

经过多次尝试和错误,我终于找到了一种方法来做到上述。在实现 QueryExecutionListener 的监听器中,我添加了

@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
    LogicalPlan executedPlan = qe.analyzed();

    //maintain a queue to keep track of plans to process
    Queue<LogicalPlan> queue = new LinkedList<>();
    queue.add(executedPlan);

    while (!queue.isEmpty()) {
        //get the first plan from queue
        LogicalPlan curPlan = queue.remove();

        if (curPlan instanceof LogicalRelation) {
            LogicalRelation logicalRelation = (LogicalRelation) curPlan;
            BaseRelation baseRelation = logicalRelation.relation();

            if (baseRelation instanceof JDBCRelation) {
                JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
                System.out.println(jdbcRelation.jdbcOptions().table());
            }
            System.out.println(logicalRelation.relation());
           
        }

        //add all child plans to the queue
        Iterator<LogicalPlan> childItr = curPlan.children().iterator();
        while (childItr.hasNext()) {
            LogicalPlan logicalPlan = childItr.next();
            queue.add(logicalPlan);
        }
    }
}

这给了我想要的输出

SELECT * from A where country_code='US'

0
投票

老问题,但它是黄金问题....:-)

我发现启用 log4j 配置的唯一便宜且最好的方法 注意:我在 Spark 3.3.2 和 3.1.1 中测试了这一点,希望它能与 Spark 3.5(今天最新)一起使用

有一个名为 org.apache.spark.sql.execution.SparkSqlParser 的类,它首先包含此 sql。

下面是 log4j 配置,您可以通过 Spark-submit 或 Spark 驱动程序代码进行配置

log4j.rootLogger=INFO, console

# Console appender configuration
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} - %m%n

# Spark logging configuration
log4j.logger.org.apache.spark.sql.execution=DEBUG
log4j.logger.org.apache.spark.sql.catalyst.parser=DEBUG
log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=OFF

输出SQL:

DEBUG org.apache.spark.sql.execution.SparkSqlParser - Parsing command: CREATE DATABASE IF NOT EXISTS sample_db
 DEBUG org.apache.spark.sql.execution.SparkSqlParser - Parsing command: 
      INSERT INTO sample_table
      VALUES (1, 'Ram ', 30), (2, 'Bharanee', 25)
© www.soinside.com 2019 - 2024. All rights reserved.