我想获取当前 Spark 作业中已进行的所有读/写查询(使用数据集 API)的列表。例如,
Dataset<Row> readDataFrame = spark.read()
.format("jdbc")
.option("url", drivingUrl)
.option("dbtable", "Select * from A where country_code='US'")
.option("driver", driver)
.load();
我希望捕获查询:
Select * from A where country_code='US'
。我尝试使用侦听器来实现此目的,以便我可以捕获我正在运行的任何 Spark-Submit 作业的信息,而无需更改主代码本身。
我尝试过的
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
SparkPlan sparkPlan = qe.executedPlan();
//Tried to search the methods/properties inside it, but couldn't find anything
}
我尝试在 SQLMetrics、子 Spark 计划等中查找,但无法获取我正在搜索的信息。
@Override
public void onOtherEvent(SparkListenerEvent event) {
if (event instanceof SparkListenerSQLExecutionStart) {
SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();
System.out.println(sparkListenerSQLExecutionStart.description());
System.out.println(sparkListenerSQLExecutionStart.details());
System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
}
同样,这些详细信息(以及我查看的其他详细信息)没有我正在寻找的查询信息。
我相信捕获这些信息是可能的,因为我已经看到了像 SparkSplineAgent 这样的项目以及像 this 这样的 StackOverflow 中的问题,但我一直无法弄清楚如何实现。
有人可以帮我吗?
经过多次尝试和错误,我终于找到了一种方法来做到上述。在实现 QueryExecutionListener 的监听器中,我添加了
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
LogicalPlan executedPlan = qe.analyzed();
//maintain a queue to keep track of plans to process
Queue<LogicalPlan> queue = new LinkedList<>();
queue.add(executedPlan);
while (!queue.isEmpty()) {
//get the first plan from queue
LogicalPlan curPlan = queue.remove();
if (curPlan instanceof LogicalRelation) {
LogicalRelation logicalRelation = (LogicalRelation) curPlan;
BaseRelation baseRelation = logicalRelation.relation();
if (baseRelation instanceof JDBCRelation) {
JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
System.out.println(jdbcRelation.jdbcOptions().table());
}
System.out.println(logicalRelation.relation());
}
//add all child plans to the queue
Iterator<LogicalPlan> childItr = curPlan.children().iterator();
while (childItr.hasNext()) {
LogicalPlan logicalPlan = childItr.next();
queue.add(logicalPlan);
}
}
}
这给了我想要的输出
SELECT * from A where country_code='US'
老问题,但试图以 log4j 方式回答....:-)
我发现启用 log4j 配置的唯一便宜且最好的方法 注意:我在 Spark 3.3.2 和 3.1.1 中测试了这一点,希望它能与 Spark 3.5(今天最新)一起使用
有一个名为
org.apache.spark.sql.execution.SparkSqlParser
的类,它首先有这个sql。
下面是 log4j 配置,您可以通过它进行配置
spark-提交:
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \
或例如 pyspark 的 Spark 驱动程序代码
logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org.apache.spark.sql.execution").setLevel(logger.Level.DEBUG)
logger.LogManager.getLogger("org.apache.spark.sql.catalyst.parser").setLevel(logger.Level.DEBUG)
logger.LogManager.getLogger("org.apache.spark.sql.execution.WholeStageCodegenExec").setLevel(logger.Level.OFF)
我使用的log4j.rootLogger=INFO, console
# Console appender configuration
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} - %m%n
# Spark logging configuration
log4j.logger.org.apache.spark.sql.execution=DEBUG
log4j.logger.org.apache.spark.sql.catalyst.parser=DEBUG
log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=OFF
DEBUG org.apache.spark.sql.execution.SparkSqlParser - Parsing command: CREATE DATABASE IF NOT EXISTS sample_db
DEBUG org.apache.spark.sql.execution.SparkSqlParser - Parsing command:
INSERT INTO sample_table
VALUES (1, 'Ram ', 30), (2, 'Bharanee', 25)