使用Spark 2.3写入Hbase并分配时间戳

问题描述 投票:0回答:1

我正在使用 HBase-Spark 连接器在 Hbase 中编写内容,但我想根据 DS 中的列为每一行分配一个时间戳。有可能做这样的事情吗?这是我想做的一个例子

case class Person(name: String,
email: String,
date: Long)

import spark.implicits._

val personDS =  Seq(Person("alice", "[email protected]", 1577836800000L), Person("bob", "[email protected]", 1609459200000L)).toDS()
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", "name STRING :key, email STRING c:email, date Long c:date_to_remove")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()

每一行必须有日期值的时间戳

问候

这就是插入代码

alice                                    column=c:date_to_remove, timestamp=1695042159830, value=\x00\x00\x01\x8A\xA7\xF2\xA3\x11                              
 alice                                    column=c:email, timestamp=1695042159830, [email protected]                                                        
 bob                                      column=c:date_to_remove, timestamp=1695042159674, value=\x00\x00\x01\x8A\xA7\xF2\xA3\x11                              
 bob                                      column=c:email, timestamp=1695042159674, [email protected]    

我想要(我现在只插入 date_to_remove 因为所有列都必须在映射中)

 alice                                    column=c:email, timestamp=1577836800000, [email protected]                                                                              
 bob                                      column=c:email, timestamp=1609459200000, [email protected]     

@ziya-mert-karakas 以及您的答案,映射需要是

        .option("hbase.columns.mapping", "name STRING :key, email STRING c:email, timestamp Long c:date_to_remove, date Long c:date")

结果是

 alice                                    column=c:date, timestamp=1695210586700, value=\x00\x00\x01o^f\xE8\x00                                                 
 alice                                    column=c:date_to_remove, timestamp=1695210586700, value=\x00\x00\x01o^f\xE8\x00                                       
 alice                                    column=c:email, timestamp=1695210586700, [email protected]                                                        
 bob                                      column=c:date, timestamp=1695210586753, value=\x00\x00\x01v\xBB>p\x00                                                 
 bob                                      column=c:date_to_remove, timestamp=1695210586753, value=\x00\x00\x01v\xBB>p\x00                                       
 bob                                      column=c:email, timestamp=1695210586753, [email protected]    
scala apache-spark hbase
1个回答
0
投票

在HBase中,您可以在使用HBase-Spark连接器写入数据时为每个单元设置自定义时间戳;为了实现这一点,您需要在将数据帧写入 HBase 之前对其进行修改。本示例中的时间戳基于您的日期列。

import org.apache.spark.sql.functions.col

val personDS = Seq(Person("alice", "[email protected]", 1577836800000L), Person("bob", "[email protected]", 1609459200000L)).toDS()

// this creates a new dF with the custom timestamp column using withColumn.
val personWithTimestamp = personDS.withColumn("timestamp", col("date"))

// writes the dF to Hbase with the custom timestamp
personWithTimestamp.write
  .format("org.apache.hadoop.hbase.spark")
  .option("hbase.columns.mapping", "name STRING :key, email STRING c:email, timestamp Long c:date_to_remove")
  .option("hbase.table", "person")
  .option("hbase.spark.use.hbasecontext", false)
  .save()

我们通过基于日期列添加自定义时间戳列来创建一个新的 DataFrame personWithTimestamp。此时间戳列将用于设置 HBase 单元的时间戳。然后我们使用自定义时间戳列写入 Hbase。这样,HBase 表中的每一行都将具有 date_to_remove 列族中单元格的指定时间戳,如您所愿。

© www.soinside.com 2019 - 2024. All rights reserved.