我在 Spark 中工作,我有许多包含行的 csv 文件,一行看起来像这样:
2017,16,16,51,1,1,4,-79.6,-101.90,-98.900
它可以包含更多或更少的字段,取决于 csv 文件
每个文件对应一个 cassandra 表,我需要在其中插入文件包含的所有行,所以我基本上所做的就是获取该行,拆分其元素并将它们放入 List[Double]
sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val nameTable = "artport"
val ligne = "20171,16,165481,51,1,1,4,-79.6000,-101.7000,-98.9000"
val linetoinsert : List[String] = ligne.split(",").toList
var ainserer : Array[Double] = new Array[Double](linetoinsert.length)
for (l <- 0 to linetoinsert.length)yield {ainserer(l) = linetoinsert(l).toDouble}
val liste = ainserer.toList
val rdd = sc.parallelize(liste)
rdd.saveToCassandra("db", nameTable) //db is the name of my keyspace in cassandra
当我运行代码时,出现此错误
java.lang.IllegalArgumentException: requirement failed: Columns not found in Double: [collecttime, sbnid, enodebid, rackid, shelfid, slotid, channelid, c373910000, c373910001, c373910002]
at scala.Predef$.require(Predef.scala:224)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:37)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:28)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:382)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
... 60 elided
我发现如果我的 RDD 类型为 :
,插入就可以工作rdd: org.apache.spark.rdd.RDD[(Double, Double, Double, Double, Double, Double, Double, Double, Double, Double)]
但是我从我正在做的事情中得到的是 RDD
org.apache.spark.rdd.RDD[Double]
例如,我无法使用 scala Tuple9,因为我不知道执行前列表将包含的元素数量,这个解决方案也不适合我的问题,因为有时我的 csv 中有超过 100 列,元组停在 Tuple22
感谢您的帮助
正如 @SergGr 提到的,Cassandra 表具有包含已知列的架构。因此,在保存到 Cassandra 数据库之前,您需要将
Array
映射到 Cassandra schema
。您可以使用 Case Class
来实现此目的。尝试以下代码,我假设 Cassandra
表中的每一列都是 Double
类型。
//create a case class equivalent to your Cassandra table
case class Schema(collecttime: Double,
sbnid: Double,
enodebid: Double,
rackid: Double,
shelfid: Double,
slotid: Double,
channelid: Double,
c373910000: Double,
c373910001: Double,
c373910002: Double)
object test {
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
def main(args: Array[String]): Unit = {
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val nameTable = "artport"
val ligne = "20171,16,165481,51,1,1,4,-79.6000,-101.7000,-98.9000"
//parse ligne string Schema case class
val schema = parseString(ligne)
//get RDD[Schema]
val rdd = sc.parallelize(Seq(schema))
//now you can save this RDD to cassandra
rdd.saveToCassandra("db", nameTable)
}
//function to parse string to Schema case class
def parseString(s: String): Schema = {
//get each field from string array
val Array(collecttime, sbnid, enodebid, rackid, shelfid, slotid,
channelid, c373910000, c373910001, c373910002, _*) = s.split(",").map(_.toDouble)
//map those fields to Schema class
Schema(collecttime,
sbnid,
enodebid,
rackid,
shelfid,
slotid,
channelid,
c373910000,
c373910001,
c373910002)
}
}
当您在连接列中看到此情况时 - 有时您需要将连接值包装在
Tuple1
中
例如
map({case a=> Tuple1(a.p:BigInt) })
.joinWithCassandraTable("keyspacename", "tableName", joinColumns = SomeColumns("columnName"))