我使用Pub / Sub到BigQuery template来传输发送到Pub / Sub主题的JSON数据。通过Dataflow,我希望将数据展平以匹配BigQuery架构并对其进行流式处理。
以下是Dataflow流程的Javascript UDF:
function transform(inJson) {
var obj = JSON.parse(inJson);
// variable declarations
// ...
data['domain'] = obj['data']['domain']; // line 18
...
return JSON.stringify(data);
}
我也尝试过:
data.domain = obj.data.domain;
我刚刚从here复制了这个例子并将其扩展为扁平化JSON数据。
这是错误消息:
TypeError: Cannot read property "domain" from undefined in <eval> at line number 18
和堆栈跟踪:
javax.script.ScriptException: TypeError: Cannot read property "domain" from undefined in <eval> at line number 18
at jdk.nashorn.api.scripting.NashornScriptEngine.throwAsScriptException(NashornScriptEngine.java:470)
at jdk.nashorn.api.scripting.NashornScriptEngine.invokeImpl(NashornScriptEngine.java:392)
at jdk.nashorn.api.scripting.NashornScriptEngine.invokeFunction(NashornScriptEngine.java:190)
at com.google.cloud.teleport.templates.common.JavascriptTextTransformer$JavascriptRuntime.invoke(JavascriptTextTransformer.java:156)
at com.google.cloud.teleport.templates.common.JavascriptTextTransformer$FailsafeJavascriptUdf$1.processElement(JavascriptTextTransformer.java:315)
at com.google.cloud.teleport.templates.common.JavascriptTextTransformer$FailsafeJavascriptUdf$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:609)
at com.google.cloud.teleport.templates.PubSubToBigQuery$PubsubMessageToFailsafeElementFn.processElement(PubSubToBigQuery.java:412)
at com.google.cloud.teleport.templates.PubSubToBigQuery$PubsubMessageToFailsafeElementFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: <eval>:18 TypeError: Cannot read property "domain" from undefined
at jdk.nashorn.internal.runtime.ECMAErrors.error(ECMAErrors.java:57)
at jdk.nashorn.internal.runtime.ECMAErrors.typeError(ECMAErrors.java:213)
at jdk.nashorn.internal.runtime.ECMAErrors.typeError(ECMAErrors.java:185)
at jdk.nashorn.internal.runtime.ECMAErrors.typeError(ECMAErrors.java:172)
at jdk.nashorn.internal.runtime.Undefined.get(Undefined.java:157)
at jdk.nashorn.internal.scripts.Script$Recompilation$1$7667A$\^eval\_.transform(<eval>:18)
at jdk.nashorn.internal.runtime.ScriptFunctionData.invoke(ScriptFunctionData.java:639)
at jdk.nashorn.internal.runtime.ScriptFunction.invoke(ScriptFunction.java:494)
at jdk.nashorn.internal.runtime.ScriptRuntime.apply(ScriptRuntime.java:393)
at jdk.nashorn.api.scripting.ScriptObjectMirror.callMember(ScriptObjectMirror.java:199)
at jdk.nashorn.api.scripting.NashornScriptEngine.invokeImpl(NashornScriptEngine.java:386)
... 42 more
当我通过传递一些示例数据在本地尝试Javascript时,它按预期工作,没有任何错误。
UPDATE
事实证明,Pub / Sub发送的数据包含在“所以我必须从字符串的开头和最后删除它们。而且JSON中的每一个”都被转义为\因此我也必须删除它们才能继续没有任何错误。
事实证明Pub / Sub发送包含在"
中的数据,所以我必须从字符串的开头和最后删除它们。此外,JSON中的每个"
都使用\
进行了转义,因此我也必须删除它们才能继续运行而不会出现任何错误。
好像您的JavaScript代码中存在错误。
data['domain'] = obj['data']['domain']; // line 18
引发以下错误
TypeError: Cannot read property "domain" from undefined in <eval> at line number 18
如果变量没有在某处重新初始化,您可以尝试在发布到Pub / Sub时将JSON放在一行中,即
{"name":"Sam","age":21}