我使用bulkLoadHFiles.bulkLoad。我有 org.apache.spark.sql.Dataset,其中包含两列字符串(键和值)。我会将这段文本转换为 JavaPairRDD
JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));
BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);
public Tuple2<ImmutableBytesWritable, KeyValue> convertToKV (final Row row, final String columnFamily,final String column) {
final String key = row.getString(0);
final String value = row.getString(1);
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily),
Bytes.toBytes(column), Bytes.toBytes(value)));
}
如果我提交预排序的数据集,那么此代码可以稳定运行。但实际上,在工业环境中,无序的数据集可能会到达这里。 我尝试插入:pairsToBulkLoad =pairsToBulkLoad.sortByKey(true);
JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));
pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);
BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);
在这种情况下,我收到另一个错误:作业由于阶段失败而中止:阶段 0.0 中的任务 0.0 (TID 0) 有一个不可序列化的结果:org.apache.hadoop.hbase.io.ImmutableBytesWritable - 对象不可序列化(类: org.apache.hadoop.hbase.io.ImmutableBytesWritable
我不明白如何寻找解决方案。
(Java 8、Spark 3.HBase 2.4.2)
我将不胜感激任何建议)
我自己设法解决了这个问题。您可以按如下顺序对对象进行排序:
JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
inputDataset
.rdd()
.toJavaRDD()
.mapToPair(row -> new Tuple2<>(row.getString("0"),row.getString("1"))));
.sortByKey(true)
.mapToPair(pair -> convertToKV (row, "cf", "column", pair._1, pair._2));
private static Tuple2<ImmutableBytesWritable, KeyValue> convertToKVCol(
final String columnFamily,
final String column,
final String key,
final String value) {
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)));
}