我知道我的 Spark Scala 数据帧的第 n 行存在一些问题(假设数据类型不正确)。当我尝试使用 Spark 结构化流在 cassandra 中写入此数据帧时,它失败并且整个过程停止在那里。现在,我希望在这种情况下,错误的记录应该被过滤并插入到其他数据库中,并继续写入其余记录的 cassandra 。需要这样做,因为在我们识别并删除错误记录之前,流程不会向前推进,并且会在 Kafka 生产者中造成巨大的滞后。 是否可以识别和过滤此类记录,因为我在互联网上找不到任何此类解决方案。
谢谢,
没有得到任何有用的尝试。
找到了解决方案。我们可以使用 foreachPartitions 与 Cassandra 连接来更快地处理每个记录并分离出错误的记录。