我正在使用Spark Streaming从S3中读取一些CSV文件。文件有3列。一列称为movie_plot
,我需要做的是过滤掉与该列的值无法匹配某些查询的记录。查询可以由一个或多个术语组成(具有一个或多个距离词)。例如:
This is a sample of plot that is bad because includes John Smith as actor.
Also it is a drama romantic adventure war movie.
由于某些OR条件应被过滤掉:
sample
项John Smith
作为演员dram-
(可能是drama
,dramatic
,dramatisation
等)romantic
和war
,它们之间只有一个单词距离(中间是adventure
)为了流式传输文件(当前只是在控制台上打印出结果),我这样写:
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.streaming._
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
object MoviesStream extends App {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.appName("MoviesStream")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
val movieFilesSchema = new StructType()
.add("movie_id", "string")
.add("movie_name", "string")
.add("movie_plot", "string")
val moviesReadStream = spark.readStream
.schema(movieFilesSchema)
// .option("latestFirst", "true")
.option("maxFilesPerTrigger", "20")
.csv("s3a://movies-bucket/*")
.select("*")
val moviesWriteStream = moviesReadStream
.writeStream
.foreach(
new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = true
def process(row: Row) = {
row.getValuesMap(row.schema.fieldNames).// do something here to filter out by plot
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
)
.start()
moviesWriteStream.awaitTermination()
}
如何在写过滤器中完成def process(row: Row)
方法?由于某些原因,正则表达式在我的情况下效果不佳。但是我不知道该怎么做,因为Spark MLlib的NGram
看起来没有这个要求。我以为Lucene不确定该方法是否正确,因此会在该方法中进行一些inline
过滤。有想法吗?
对于给定的句子(字符串数组),您可以创建n-gram,如示例中所示-https://spark.apache.org/docs/latest/ml-features.html#n-gram
基本上,
读取行,用标准分隔符(空格,逗号,分号)分割
将字符串数组传递给org.apache.spark.ml.feature.NGram
Ngram.transform将为该句子创建n-gram,并将其作为新列添加到您的输入数据框中。
您可以对此n-gram列进行过滤。