你好, 我为流作业编写了代码,其中源和目标是 PostgreSQL 数据库。我使用 JDBCInputFormat/JDBCOutputFormat 来读取和写入记录(参考示例)。 代码:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.SELECT_FROM_SOURCE)
.setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);
SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row row) {
Date dt = (Date) row.getField(2);
return dt.getTime();
}
})
.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(null, new FoldFunction<Row, Row>(){
@Override
public Row fold(Row row1, Row row) throws Exception {
return row;
}
});
source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery("insert into tablename(id, name) values (?,?)")
.setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
.finish());
此代码执行正确,但未在 Flink 服务器上连续运行(Select 查询仅执行一次。) 预计在 flink 服务器上连续运行。
可能,您必须定义自己的 Flink Source 或 JDBCInputFormat,因为您在这里使用的会在从数据库获取所有结果时停止 SourceTask。解决此问题的一种方法是基于
JDBCInputFormat
创建自己的 jdbc 输入格式,尝试重新执行 SQL 查询,同时从 nextRecord
中的数据库读取最后一行。
解决方案1: 如果你继续使用这个已弃用的数据集API,让它持续查询的秘密就在于查询语句,这里是一个示例:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class FlinkPostgresDataSource {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// PostgreSQL connection parameters
String driverName = "org.postgresql.Driver";
String dbUrl = "jdbc:postgresql://localhost:5432/mydatabase";
String username = "username";
String password = "password";
// Query parameters
String query = "SELECT id, name FROM your_table WHERE id >= ? AND id < ?";
int batchSize = 100; // Number of records per batch
int startId = 0; // Initial start id
// Build JDBCInputFormat for PostgreSQL
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driverName)
.setDBUrl(dbUrl)
.setQuery(query)
.setUsername(username)
.setPassword(password)
.setRowTypeInfo(new RowTypeInfo(org.apache.flink.api.common.typeinfo.Types.INT, org.apache.flink.api.common.typeinfo.Types.STRING))
.setFetchSize(batchSize)
.setParametersProvider(new JDBCInputFormat.ParameterProvider() {
@Override
public Object[][] getParameters() {
int endId = startId + batchSize;
return new Object[][] { { startId }, { endId } };
}
})
.finish();
// Create DataSet from JDBCInputFormat
env.createInput(jdbcInputFormat)
.map(new org.apache.flink.api.common.functions.MapFunction<Row, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Row row) throws Exception {
return Tuple2.of(row.getField(0), row.getField(1));
}
})
.print(); // Example: Output to stdout
// Execute the Flink job
env.execute("Flink PostgreSQL DataSource");
}
}
解决方案2: 使用数据流 api 代替
public class FlinkJdbcExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Add JDBC source function
env.addSource(new JdbcSourceFunction())
.print();
// Execute the program
env.execute("Flink JDBC Example");
}
public static class JdbcSourceFunction implements SourceFunction<Row> {
private volatile boolean isRunning = true;
private transient Connection connection;
private transient PreparedStatement preparedStatement;
@Override
public void run(SourceContext<Row> ctx) throws Exception {
connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/your_database", "your_username", "your_password");
preparedStatement = connection.prepareStatement("SELECT * FROM your_table");
preparedStatement.setFetchSize(fetchSize); // Set fetchSize
while (isRunning) {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
// Convert ResultSet row to Flink Row object
Row row = Row.of(resultSet.getInt("id"), resultSet.getString("name"));
ctx.collect(row);
}
Thread.sleep(1000); // Example: Sleep for 1 second before querying again
}
}
@Override
public void cancel() {
isRunning = false;
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// Log or handle the exception
}
}
}