AWS Apache flink,应用程序不在aws中运行,但在本地运行。可能是什么问题?

问题描述 投票:0回答:1

我正在构建一个应用程序,该应用程序从 kinesis 数据流获取数据并使用端点将该数据发送到外部数据库。我使用 python table API 来实现这一点,当我运行以下代码时,它可以在我的本地系统中运行。由于我有多个 jar 文件,因此我将它们组合在一起作为一个 fat jar 来读取依赖项。但是,如果我在 aws 管理的 apache flink 中部署应用程序。该作业未运行。潜在的问题可能是什么?云观察日志并不是那么决定性。

tfrom pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json


# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

statement_set = table_env.create_statement_set()

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/lib/mydep.jar",
        )

def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_source_table(table_name, stream_name, region, stream_initpos):
    """
    Defines the Kinesis source table schema and configuration.
    """
    return f"""
    CREATE TABLE {table_name} (
        Trial_ID VARCHAR(255),
        PC_Time_s DOUBLE,
        Arduino_Time_ms BIGINT,
        Resistivity_ohm DOUBLE,
        Temperature_C DOUBLE,
        PWM_value BIGINT
    )
    PARTITIONED BY (Trial_ID)
    WITH (
        'connector' = 'kinesis',
        'stream' = '{stream_name}',
        'aws.region' = '{region}',
        'scan.stream.initpos' = '{stream_initpos}',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )
    """

def create_jdbc_sink_table(table_name, url, username, password):

    return f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        Trial_ID VARCHAR(255),
        PC_Time_s DOUBLE,
        Arduino_Time_ms BIGINT,
        Resistivity_ohm DOUBLE,
        Temperature_C DOUBLE,
        PWM_value BIGINT
    ) WITH (
        'connector' = 'jdbc',
        'url' = '{url}',
        'table-name' = '{table_name}',
        'username' = '{username}',
        'password' = '{password}'
    )
    """

def main():

    # Application Property Keys
    input_property_group_key = "consumer.config.0"

    # Table names
    input_table_name = "input_table"
    output_table_name = "trainingdata"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    # get application properties
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)

    # Replace with your MySQL connection details
    DATABASE_URL = "jdbc:mysql://***string**/db_name"
    DATABASE_USERNAME = "username"
    DATABASE_PASSWORD = "password"


    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    # 4. Create a Kinesis source table
    table_env.execute_sql(
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 5. Create a JDBC sink table for MySQL database
    table_env.execute_sql(
        create_jdbc_sink_table(output_table_name, DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD)
    )

    # 6. Insert data from Kinesis stream to MySQL table
    table_result = table_env.execute_sql(
        " INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, input_table_name)
    )
    # 7. Wait for job completion (local mode) or print job status
    if is_local:
        table_result.wait()
    else:
        # get job status through TableResult
        print(table_result.get_job_client().get_job_status())


if __name__ == "__main__":
    main()

我尝试使用 fat jar 并将整个 python 目录的 zip 上传到 s3,然后上传到 apache flink 它不起作用。但是,如果我在这里运行 github 的代码。它有效。

amazon-web-services dependencies apache-flink pyflink amazon-kinesis-analytics
1个回答
0
投票

我没有看到有关您的问题的错误日志,但在 AWS Managed Flink 上运行我的应用程序时,我遇到了类似的问题。我的应用程序在本地运行良好,但部署到 AWS 时失败,并显示以下消息:

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kinesis
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:608)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:794)

我通过修改

maven-shade-plugin
选项解决了这个问题,包括以下行:

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    ...
</transformers>
© www.soinside.com 2019 - 2024. All rights reserved.