我需要在所有列中禁止来自数据集中API的一组关键字。我目前喜欢类似于以下内容的条款。
SPARK版本:2.1.0
where lower(c) not like '%gun%' and lower(c) not like '%torture%' and lower(c) not like '%tough-mudder%' and lower(c) not like '%health & beauty - first aid%' and lower(c) not like '%hippie-butler%'
我收到以下错误
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0" grows beyond 64 KB
为了减轻这种情况,我尝试通过每15个关键字获取结果,然后在之前获得的结果上应用下15个关键字,将问题分解为子问题。因此,对于每15个代码,我都会调用一种方法来应用当前的15个代码集,并将获得的结果作为输入传递给同一函数,直到所有单词都结束为止。
dataSet = getTransformedDataset(dataSet, word);
我的查询看起来像这样:
select uuid , c from cds where lower(c) not like '%gun%' and lower(c) not like '%kill%' and lower(c) not like '%murder%' ..
现在,它对于较小的数据集可以很好地工作。但是对于较大的数据集,它所需要的内存比我们已经配置的允许的更多。
Job aborted due to stage failure: Task 5 in stage 13.0 failed 4 times, most recent failure: Lost task 5.3 in stage 13.0 (TID 2093, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 50.0 GB of 49 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
添加了代码。任何帮助将不胜感激。
private Dataset<Row> getTransformedRowDataset(final Dataset<Row> input,Dataset<Row> concatenatedColumnsDS, final List<String> regexToSuppress, final List<String> wordsToSuppress) {
final String regexString = regexToSuppress.stream().collect(Collectors.joining("|", "(", ")"));
final String likeString = wordsToSuppress
.stream()
.map(s -> " lower(c) not like '%" + s.toLowerCase() + "%' ")
.collect(Collectors.joining(" and "));
if(!likeString.isEmpty()) {
concatenatedColumnsDS = concatenatedColumnsDS.where(likeString);
}
final Dataset<Row> joinedDs = input.join(concatenatedColumnsDS, "uuid");
return "()".equals(regexString) || regexString.isEmpty() ? joinedDs.drop("c") :
joinedDs.where(" c not rlike '" + regexString + "'").drop("c");
}
尝试过滤器
{ package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col}
object filterWorld extends App {
val spark = SparkSession.builder()
.master("local")
.appName("Mapper")
.getOrCreate()
import spark.implicits._
case class Person(
ID: Int,
firstName: String,
lastName: String,
description: String,
comment: String
)
val personDF = Seq(
Person(1, "FN1", "LN1", "TEST", "scala"),
Person(2, "FN2", "LN2", "develop", "spark"),
Person(3, "FN3", "LN3", "test", "sql"),
Person(4, "FN4", "LN4", "develop", "java"),
Person(5, "FN5", "LN5", "test", "c#"),
Person(6, "FN6", "LN6", "architect", "python"),
Person(7, "FN7", "LN7", "test", "spark"),
Person(8, "FN8", "LN8", "architect", "scala"),
Person(9, "FN9", "LN9", "qa", "hql"),
Person(10, "FN10", "LN10", "manager", "haskell")
).toDF()
personDF.show(false)
// +---+---------+--------+-----------+-------+
// |ID |firstName|lastName|description|comment|
// +---+---------+--------+-----------+-------+
// |1 |FN1 |LN1 |TEST |scala |
// |2 |FN2 |LN2 |develop |spark |
// |3 |FN3 |LN3 |test |sql |
// |4 |FN4 |LN4 |develop |java |
// |5 |FN5 |LN5 |test |c# |
// |6 |FN6 |LN6 |architect |python |
// |7 |FN7 |LN7 |test |spark |
// |8 |FN8 |LN8 |architect |scala |
// |9 |FN9 |LN9 |qa |hql |
// |10 |FN10 |LN10 |manager |haskell|
// +---+---------+--------+-----------+-------+
//
val fltr = !col("description").like("%e%") && !col("comment").like("%s%")
val res = personDF.filter(fltr)
res.show(false)
// +---+---------+--------+-----------+-------+
// |ID |firstName|lastName|description|comment|
// +---+---------+--------+-----------+-------+
// |9 |FN9 |LN9 |qa |hql |
// +---+---------+--------+-----------+-------+
}
}