hBase java api,批量加载时出错添加了一个词法上不大于先前排序的键(使用 JavaPairRDD<ImmutableBytesWritable, KeyValue>)

问题描述 投票:0回答:1

我使用bulkLoadHFiles.bulkLoad。我有 org.apache.spark.sql.Dataset,其中包含两列字符串(键和值)。我会将这段文本转换为 JavaPairRDD。如果数据集未预先排序,则会收到错误“IOException:添加了一个在词法上不大于先前的键...”

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));

BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);

public Tuple2<ImmutableBytesWritable, KeyValue> convertToKV (final Row row, final String columnFamily,final String column) {
        final String key = row.getString(0);
        final String value = row.getString(1);
        return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
                new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily), 
                             Bytes.toBytes(column), Bytes.toBytes(value)));
}

如果我提交预排序的数据集,那么此代码可以稳定运行。但实际上,在工业环境中,无序的数据集可能会到达这里。 我尝试插入:pairsToBulkLoad =pairsToBulkLoad.sortByKey(true);

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));

pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);

BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);

在这种情况下,我收到另一个错误:作业由于阶段失败而中止:阶段 0.0 中的任务 0.0 (TID 0) 有一个不可序列化的结果:org.apache.hadoop.hbase.io.ImmutableBytesWritable - 对象不可序列化(类: org.apache.hadoop.hbase.io.ImmutableBytesWritable

我不明白如何寻找解决方案。

  1. 如果我使用 Spark 对数据集进行排序,这是否可以保证 JavaPairRDD 的排序
  2. 或者第二种解决方案是对 JavaPairRDD 进行排序,但是为什么它会落在排序“对象不可序列化(类:org.apache.hadoop.hbase.io.ImmutableBytesWritable”?

(Java 8、Spark 3.HBase 2.4.2)

我将不胜感激任何建议)

java apache-spark hbase java-pair-rdd
1个回答
0
投票

我自己设法解决了这个问题。您可以按如下顺序对对象进行排序:

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset
    .rdd()
    .toJavaRDD()
    .mapToPair(row -> new Tuple2<>(row.getString("0"),row.getString("1"))));
    .sortByKey(true)
    .mapToPair(pair -> convertToKV (row, "cf", "column", pair._1, pair._2));
    
private static Tuple2<ImmutableBytesWritable, KeyValue> convertToKVCol(
                final String columnFamily,
                final String column,
                final String key,
                final String value) {
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
                    new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)));
    
}
© www.soinside.com 2019 - 2024. All rights reserved.