我是GC Dataflow的新手,并没有找到相关的答案。如果我发现这已经得到回答,请道歉。
我正在尝试使用v2.0 SDK创建一个简单的管道,并且无法使用BigQueryIO将数据读入我的PCollection。我正在使用.withQuery方法,我已经在BigQuery界面中测试了查询,它似乎工作正常。最初的PCollection似乎没有任何问题,但是当我想设置一个简单的ParDo函数将TableRow中的值转换为PCollection时,我在TableRow对象上执行.get的代码行上得到NullPointerException。
这是我的代码。 (我可能错过了一些简单的东西。我是Pipeline编程的新手。任何输入都会非常感激。)
public class ClientAutocompletePipeline {
private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class);
public static void main(String[] args) {
// create the pipeline
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
// A step to read in the product names from a BigQuery table
p.apply(BigQueryIO.read().fromQuery("SELECT name FROM [beaming-team-169321:Products.raw_product_data]"))
.apply("ExtractProductNames", ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
// Grab a row from the BigQuery Results
TableRow row = c.element();
// Get the value of the "name" column from the table row.
//NOTE: This is the line that is giving me the NullPointerException
String productName = row.get("name").toString();
// Make sure it isn't empty
if (!productName.isEmpty()) {
c.output(productName);
}
}
}))
查询肯定在BigQuery UI中工作,并且在测试查询时返回名为“name”的列。为什么我在这一行上得到NullPointerException:
String productName = row.get("name").toString();
有任何想法吗?
使用BigQuery和Dataflow时,这是一个常见问题(很可能该字段确实是null
)。如果你可以使用Scala,你可以看看Scio(这是一个Scala DSL for Dataflow)及其BigQuery IO。
只需使您的代码null
安全。替换这个:
String productName = row.get("name").toString();
有这样的事情:
String productName = String.valueOf(row.get("name"));
我想我迟到了,但你可以做一些像if(row.containsKey(“column-name”))。这基本上会告诉您字段是否为空。在BigQuery中,发生的是,在读取数据时,如果列值为null,则它不能作为该特定TableRow的一部分。因此,你得到了那个错误。您还可以执行if(null == row.get(“column-name”))之类的操作来检查字段是否为null。