在 Flink Python UDF 中使用 Numba

问题描述 投票:0回答:1

我想在依赖于 Numba (>= 0.50) 的 UDF 中使用 Python 库(pyod,最新)。我用 Python 创建了一个聚合 UDF,我对这个概念并不陌生。

在作业执行后立即启动作业时出现错误。

Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 463, in get
    processor = bundle_processor.BundleProcessor(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 921, in create_execution_tree
    return collections.OrderedDict([(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
    transform_consumers = {
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in get_operation
    return transform_factory.create_operation(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 89, in create_group_window_aggregate_function
    return _create_user_defined_function_operation(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 174, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 210, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 214, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.generate_operation
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 446, in __init__
    super(StreamGroupWindowAggregateOperation, self).__init__(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 309, in __init__
    super(AbstractStreamGroupAggregateOperation, self).__init__(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 281, in __init__
    super(BaseStatefulOperation, self).__init__(serialized_fn)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__
    self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/table/operations.py", line 329, in generate_func
    extract_user_defined_aggregate_function(
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 221, in extract_user_defined_aggregate_function
    user_defined_agg = load_aggregate_function(user_defined_function_proto.payload)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 281, in load_aggregate_function
    return pickle.loads(payload)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
  File "/tmp/python-dist-ca64683e-f3c8-4ff9-b2a8-8c95c5d508bd/python-files/blob_p-1eee456524b0a216bf998cb36288df034d60c922-5797c5572fd29f1e17b5dd686b627324/dbscan_udf.py", line 31, in <module>
    from pyod.models.ecod import ECOD
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/__init__.py", line 4, in <module>
    from . import utils
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/__init__.py", line 12, in <module>
    from .stat_models import pairwise_distances_no_broadcast
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/pyod/utils/stat_models.py", line 11, in <module>
    from numba import njit
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/__init__.py", line 38, in <module>
    from numba.core.decorators import (cfunc, generated_jit, jit, njit, stencil,
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/decorators.py", line 12, in <module>
    from numba.stencils.stencil import stencil
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/stencils/stencil.py", line 11, in <module>
    from numba.core import types, typing, utils, ir, config, ir_utils, registry
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/registry.py", line 4, in <module>
    from numba.core import utils, typing, dispatcher, cpu
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/dispatcher.py", line 13, in <module>
    from numba.core import (
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/compiler.py", line 6, in <module>
    from numba.core import (utils, errors, typing, interpreter, bytecode, postproc,
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/callconv.py", line 12, in <module>
    from numba.core.base import PYOBJECT, GENERIC_POINTER
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/base.py", line 24, in <module>
    from numba.cpython import builtins
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/cpython/builtins.py", line 524, in <module>
    from numba.core.typing.builtins import IndexValue, IndexValueType
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/builtins.py", line 22, in <module>
    @infer_global(print)
  File "/home/ubuntu/miniconda3/envs/py38/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1278, in register_global
    if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has no attribute 'print'

我的图书馆版本(重要的):

  • numba==0.55.1
  • numpy==1.19.5
  • apache-beam==2.27.0
  • apache-flink==1.15.1
  • pyod==1.0.4

由于这是 Beam 引起的一个非常奇怪的错误,我无法解释它。

有人知道吗?

apache-flink apache-beam apache-beam-internals
1个回答
0
投票

添加以下行使代码能够运行。

但是!我不知道这是否是想要的,或者副作用可能是什么:

import pyflink.fn_execution.beam.beam_sdk_worker_main
pyflink.fn_execution.beam.beam_sdk_worker_main.print = print

例如,使用

KeyedProcessFunction
,您可以将其添加到 open 中:

    def open(self, runtime_context: RuntimeContext):
        import pyflink.fn_execution.beam.beam_sdk_worker_main
        pyflink.fn_execution.beam.beam_sdk_worker_main.print = print
        ######### <ONLY under here import packages that uses Numba>
        from mylib.that.do.numba import foo
        foo()

相关Flink slack线程:https://apache-flink.slack.com/archives/C065944F9M2/p1708943670843189

© www.soinside.com 2019 - 2024. All rights reserved.