如何在Flink服务器上连续运行apache flink流作业

问题描述 投票:0回答:2

你好, 我为流作业编写了代码,其中源和目标是 PostgreSQL 数据库。我使用 JDBCInputFormat/JDBCOutputFormat 来读取和写入记录(参考示例)。 代码:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.SELECT_FROM_SOURCE)
            .setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);

    SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
                @Override
                public long extractAscendingTimestamp(Row row) {
                    Date dt = (Date) row.getField(2);
                    return dt.getTime();
                }
            })
            .keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .fold(null, new FoldFunction<Row, Row>(){
                @Override
                public Row fold(Row row1, Row row) throws Exception {
                    return row;
                }
            });

    source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery("insert into tablename(id, name) values (?,?)")
            .setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
            .finish());

此代码执行正确,但未在 Flink 服务器上连续运行(Select 查询仅执行一次。) 预计在 flink 服务器上连续运行

java postgresql streaming apache-flink
2个回答
1
投票

可能,您必须定义自己的 Flink Source 或 JDBCInputFormat,因为您在这里使用的会在从数据库获取所有结果时停止 SourceTask。解决此问题的一种方法是基于

JDBCInputFormat
创建自己的 jdbc 输入格式,尝试重新执行 SQL 查询,同时从
nextRecord
中的数据库读取最后一行。


0
投票

解决方案1: 如果你继续使用这个已弃用的数据集API,让它持续查询的秘密就在于查询语句,这里是一个示例:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class FlinkPostgresDataSource {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // PostgreSQL connection parameters
        String driverName = "org.postgresql.Driver";
        String dbUrl = "jdbc:postgresql://localhost:5432/mydatabase";
        String username = "username";
        String password = "password";

        // Query parameters
        String query = "SELECT id, name FROM your_table WHERE id >= ? AND id < ?";
        int batchSize = 100; // Number of records per batch
        int startId = 0; // Initial start id

        // Build JDBCInputFormat for PostgreSQL
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername(driverName)
                .setDBUrl(dbUrl)
                .setQuery(query)
                .setUsername(username)
                .setPassword(password)
                .setRowTypeInfo(new RowTypeInfo(org.apache.flink.api.common.typeinfo.Types.INT, org.apache.flink.api.common.typeinfo.Types.STRING))
                .setFetchSize(batchSize)
                .setParametersProvider(new JDBCInputFormat.ParameterProvider() {
                    @Override
                    public Object[][] getParameters() {
                        int endId = startId + batchSize;
                        return new Object[][] { { startId }, { endId } };
                    }
                })
                .finish();

        // Create DataSet from JDBCInputFormat
        env.createInput(jdbcInputFormat)
                .map(new org.apache.flink.api.common.functions.MapFunction<Row, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map(Row row) throws Exception {
                        return Tuple2.of(row.getField(0), row.getField(1));
                    }
                })
                .print(); // Example: Output to stdout

        // Execute the Flink job
        env.execute("Flink PostgreSQL DataSource");
    }
}

解决方案2: 使用数据流 api 代替

public class FlinkJdbcExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Add JDBC source function
        env.addSource(new JdbcSourceFunction())
           .print();

        // Execute the program
        env.execute("Flink JDBC Example");
    }

    public static class JdbcSourceFunction implements SourceFunction<Row> {
    private volatile boolean isRunning = true;
    private transient Connection connection;
    private transient PreparedStatement preparedStatement;

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
        connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/your_database", "your_username", "your_password");
        preparedStatement = connection.prepareStatement("SELECT * FROM your_table");
        preparedStatement.setFetchSize(fetchSize); // Set fetchSize

        while (isRunning) {
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                // Convert ResultSet row to Flink Row object
                Row row = Row.of(resultSet.getInt("id"), resultSet.getString("name"));
                ctx.collect(row);
            }
            Thread.sleep(1000); // Example: Sleep for 1 second before querying again
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // Log or handle the exception
        }
    }
}

来自 Flink 故障排除

© www.soinside.com 2019 - 2024. All rights reserved.