我正在构建一个应用程序,该应用程序从 kinesis 数据流获取数据并使用端点将该数据发送到外部数据库。我使用 python table API 来实现这一点,当我运行以下代码时,它可以在我的本地系统中运行。由于我有多个 jar 文件,因此我将它们组合在一起作为一个 fat jar 来读取依赖项。但是,如果我在 aws 管理的 apache flink 中部署应用程序。该作业未运行。潜在的问题可能是什么?云观察日志并不是那么决定性。
tfrom pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda
is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment
if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/lib/mydep.jar",
)
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos):
"""
Defines the Kinesis source table schema and configuration.
"""
return f"""
CREATE TABLE {table_name} (
Trial_ID VARCHAR(255),
PC_Time_s DOUBLE,
Arduino_Time_ms BIGINT,
Resistivity_ohm DOUBLE,
Temperature_C DOUBLE,
PWM_value BIGINT
)
PARTITIONED BY (Trial_ID)
WITH (
'connector' = 'kinesis',
'stream' = '{stream_name}',
'aws.region' = '{region}',
'scan.stream.initpos' = '{stream_initpos}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
def create_jdbc_sink_table(table_name, url, username, password):
return f"""
CREATE TABLE IF NOT EXISTS {table_name} (
Trial_ID VARCHAR(255),
PC_Time_s DOUBLE,
Arduino_Time_ms BIGINT,
Resistivity_ohm DOUBLE,
Temperature_C DOUBLE,
PWM_value BIGINT
) WITH (
'connector' = 'jdbc',
'url' = '{url}',
'table-name' = '{table_name}',
'username' = '{username}',
'password' = '{password}'
)
"""
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
# Table names
input_table_name = "input_table"
output_table_name = "trainingdata"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
# Replace with your MySQL connection details
DATABASE_URL = "jdbc:mysql://***string**/db_name"
DATABASE_USERNAME = "username"
DATABASE_PASSWORD = "password"
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
# 4. Create a Kinesis source table
table_env.execute_sql(
create_source_table(input_table_name, input_stream, input_region, stream_initpos)
)
# 5. Create a JDBC sink table for MySQL database
table_env.execute_sql(
create_jdbc_sink_table(output_table_name, DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD)
)
# 6. Insert data from Kinesis stream to MySQL table
table_result = table_env.execute_sql(
" INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, input_table_name)
)
# 7. Wait for job completion (local mode) or print job status
if is_local:
table_result.wait()
else:
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
if __name__ == "__main__":
main()
我尝试使用 fat jar 并将整个 python 目录的 zip 上传到 s3,然后上传到 apache flink 它不起作用。但是,如果我在这里运行 github 的代码。它有效。
我没有看到有关您的问题的错误日志,但在 AWS Managed Flink 上运行我的应用程序时,我遇到了类似的问题。我的应用程序在本地运行良好,但部署到 AWS 时失败,并显示以下消息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kinesis
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:608)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:794)
我通过修改
maven-shade-plugin
选项解决了这个问题,包括以下行:
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
...
</transformers>