我正在使用 HBase-Spark 连接器在 Hbase 中编写内容,但我想根据 DS 中的列为每一行分配一个时间戳。有可能做这样的事情吗?这是我想做的一个例子
case class Person(name: String,
email: String,
date: Long)
import spark.implicits._
val personDS = Seq(Person("alice", "[email protected]", 1577836800000L), Person("bob", "[email protected]", 1609459200000L)).toDS()
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", "name STRING :key, email STRING c:email, date Long c:date_to_remove")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
每一行必须有日期值的时间戳
问候
这就是插入代码
alice column=c:date_to_remove, timestamp=1695042159830, value=\x00\x00\x01\x8A\xA7\xF2\xA3\x11
alice column=c:email, timestamp=1695042159830, [email protected]
bob column=c:date_to_remove, timestamp=1695042159674, value=\x00\x00\x01\x8A\xA7\xF2\xA3\x11
bob column=c:email, timestamp=1695042159674, [email protected]
我想要(我现在只插入 date_to_remove 因为所有列都必须在映射中)
alice column=c:email, timestamp=1577836800000, [email protected]
bob column=c:email, timestamp=1609459200000, [email protected]
@ziya-mert-karakas 以及您的答案,映射需要是
.option("hbase.columns.mapping", "name STRING :key, email STRING c:email, timestamp Long c:date_to_remove, date Long c:date")
结果是
alice column=c:date, timestamp=1695210586700, value=\x00\x00\x01o^f\xE8\x00
alice column=c:date_to_remove, timestamp=1695210586700, value=\x00\x00\x01o^f\xE8\x00
alice column=c:email, timestamp=1695210586700, [email protected]
bob column=c:date, timestamp=1695210586753, value=\x00\x00\x01v\xBB>p\x00
bob column=c:date_to_remove, timestamp=1695210586753, value=\x00\x00\x01v\xBB>p\x00
bob column=c:email, timestamp=1695210586753, [email protected]
在HBase中,您可以在使用HBase-Spark连接器写入数据时为每个单元设置自定义时间戳;为了实现这一点,您需要在将数据帧写入 HBase 之前对其进行修改。本示例中的时间戳基于您的日期列。
import org.apache.spark.sql.functions.col
val personDS = Seq(Person("alice", "[email protected]", 1577836800000L), Person("bob", "[email protected]", 1609459200000L)).toDS()
// this creates a new dF with the custom timestamp column using withColumn.
val personWithTimestamp = personDS.withColumn("timestamp", col("date"))
// writes the dF to Hbase with the custom timestamp
personWithTimestamp.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", "name STRING :key, email STRING c:email, timestamp Long c:date_to_remove")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
我们通过基于日期列添加自定义时间戳列来创建一个新的 DataFrame personWithTimestamp。此时间戳列将用于设置 HBase 单元的时间戳。然后我们使用自定义时间戳列写入 Hbase。这样,HBase 表中的每一行都将具有 date_to_remove 列族中单元格的指定时间戳,如您所愿。