我应该将凤凰数据读入pyspark。
编辑:我正在使用Spark HBase转换器:
这是一段代码:
port="2181"
host="zookeperserver"
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "camel", "hbase.mapreduce.scan.columns": "data:a"}
sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf)
追溯:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 547, in newAPIHadoopRDD
jconf, batchSize)
File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: No table was provided.
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:130)
任何帮助将非常感激。
谢谢! /蒂娜
建议使用spark phoenix插件。请查看有关凤凰火花插件here的详细信息
环境:使用AWS EMR 5.10,PySpark进行测试
以下是步骤
- 创建ZooKeeper URL,替换为您的集群zookeeper仲裁,您可以从hbase-site.xml进行检查
emrMaster = "ZooKeeper URL"
df = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("table", "TableName") \
.option("zkUrl", emrMaster) \
.load()
df.show()
df.columns
df.printSchema()
df1=df.replace(['foo'], ['foo1'], 'DOMAIN')
df1.show()
df1.write \
.format("org.apache.phoenix.spark") \
.mode("overwrite") \
.option("table", "TableName") \
.option("zkUrl", emrMaster) \
.save()
有两种方法可以做到这一点 : 1)由于Phoenix有一个JDBC层,你可以使用Spark JdbcRDD从Spark https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/rdd/JdbcRDD.html中的Phoenix读取数据
2)使用Spark HBAse转换器:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala https://github.com/apache/spark/tree/master/examples/src/main/python