我正在尝试读取和解析ResultScanner扫描仪,但是在调用next()
时出现异常。
这是我的代码的相关部分:
var scan: Scan = new Scan()
val keyRegEx : RegexStringComparator = new RegexStringComparator("^.*"+"123123123123")
val rowFilter : RowFilter = new RowFilter(CompareOp.EQUAL, keyRegEx)
scan.setFilter(rowFilter)
scan.setCaching(3000)
// Apply the scan to the Table
val scanner = table.getScanner(scan)
val scanOutput: Seq[(String, String)] = iterateScannerAddingRowkey[T](scanner, Seq())
def iterateScannerAddingRowkey[T](scanner: ResultScanner, acc: Seq[(String,String)])(implicit m: Manifest[T]) : Seq[(String,String)] = {
// **Line below is triggering the exception**
val result = scanner.next()
if (result == null) acc
else {
val rowKey = result.rawCells().head.toString.split("/")(0)
// Parsing the rawCells content into a JSONObject
val response : JSONObject = getJson[T](result.rawCells())
iterateScannerAddingRowkey[T](scanner, Seq((rowKey, response.toString)) ++ acc)
}
}
这是例外:
java.lang.RuntimeException:java.io.InterruptedIOException在org.apache.hadoop.hbase.client.AbstractClientScanner $ 1.hasNext(AbstractClientScanner.java:97)在com.myproject.framework.hbase.HBaseUtils.iterateScannerAddingRowkey(HBaseUtils.scala:85)在com.myproject.framework.hbase.HBaseAPI.hbaseGetRowByRegEx(HBaseAPI.scala:323)位于com.myproject.core.ComparePrefixVsRegex $ .main(App.scala:46)com.myproject.core.ComparePrefixVsRegex.main(App.scala)位于sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:498)在org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 3.run(ApplicationMaster.scala:686)由以下原因引起:java.io.InterruptedIOExceptionorg.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:203)在org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61)在org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)在org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)在org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:401)在org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:364)在org.apache.hadoop.hbase.client.AbstractClientScanner $ 1.hasNext(AbstractClientScanner.java:94)
据我所知,在scanner
为空的情况下,next()
将返回null
任何人都知道我在想什么吗?
尝试了几样之后,我就能够解决问题。
主要原因是我要查询的表的大小确实很大,所以在处理扫描仪时达到了超时。为了解决这个问题,我进行了两项更改:
我增加了火花广播超时
val spark = SparkSession
.builder
.config("spark.sql.broadcastTimeout", "36000")
.getOrCreate()
并且我在扫描中添加了要读取的列的选择,以减小结果的大小:
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C1"))
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C2"))