我想在依赖于 Numba (>= 0.50) 的 UDF 中使用 Python 库(pyod,最新)。我用 Python 创建了一个聚合 UDF,我对这个概念并不陌生。
在作业执行后立即启动作业时出现错误。
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 463, in get
processor = bundle_processor.BundleProcessor(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 921, in create_execution_tree
return collections.OrderedDict([(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
transform_consumers = {
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in get_operation
return transform_factory.create_operation(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 89, in create_group_window_aggregate_function
return _create_user_defined_function_operation(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 174, in _create_user_defined_function_operation
return beam_operation_cls(
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 210, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 214, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.generate_operation
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 446, in __init__
super(StreamGroupWindowAggregateOperation, self).__init__(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 309, in __init__
super(AbstractStreamGroupAggregateOperation, self).__init__(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 281, in __init__
super(BaseStatefulOperation, self).__init__(serialized_fn)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 329, in generate_func
extract_user_defined_aggregate_function(
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 221, in extract_user_defined_aggregate_function
user_defined_agg = load_aggregate_function(user_defined_function_proto.payload)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 281, in load_aggregate_function
return pickle.loads(payload)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
File "/tmp/python-dist-ca64683e-f3c8-4ff9-b2a8-8c95c5d508bd/python-files/blob_p-1eee456524b0a216bf998cb36288df034d60c922-5797c5572fd29f1e17b5dd686b627324/dbscan_udf.py", line 31, in <module>
from pyod.models.ecod import ECOD
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/__init__.py", line 4, in <module>
from . import utils
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/__init__.py", line 12, in <module>
from .stat_models import pairwise_distances_no_broadcast
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/stat_models.py", line 11, in <module>
from numba import njit
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/__init__.py", line 38, in <module>
from numba.core.decorators import (cfunc, generated_jit, jit, njit, stencil,
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/decorators.py", line 12, in <module>
from numba.stencils.stencil import stencil
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/stencils/stencil.py", line 11, in <module>
from numba.core import types, typing, utils, ir, config, ir_utils, registry
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/registry.py", line 4, in <module>
from numba.core import utils, typing, dispatcher, cpu
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/dispatcher.py", line 13, in <module>
from numba.core import (
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/compiler.py", line 6, in <module>
from numba.core import (utils, errors, typing, interpreter, bytecode, postproc,
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/callconv.py", line 12, in <module>
from numba.core.base import PYOBJECT, GENERIC_POINTER
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/base.py", line 24, in <module>
from numba.cpython import builtins
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/cpython/builtins.py", line 524, in <module>
from numba.core.typing.builtins import IndexValue, IndexValueType
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/builtins.py", line 22, in <module>
@infer_global(print)
File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1278, in register_global
if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has no attribute 'print'
我的图书馆版本(重要的):
由于这是 Beam 引起的一个非常奇怪的错误,我无法解释它。
有人知道吗?
添加以下行使代码能够运行。
但是!我不知道这是否是想要的,或者副作用可能是什么:
import pyflink.fn_execution.beam.beam_sdk_worker_main
pyflink.fn_execution.beam.beam_sdk_worker_main.print = print
例如,使用
KeyedProcessFunction
,您可以将其添加到 open 中:
def open(self, runtime_context: RuntimeContext):
import pyflink.fn_execution.beam.beam_sdk_worker_main
pyflink.fn_execution.beam.beam_sdk_worker_main.print = print
######### <ONLY under here import packages that uses Numba>
from mylib.that.do.numba import foo
foo()
相关Flink slack线程:https://apache-flink.slack.com/archives/C065944F9M2/p1708943670843189