我遇到了一个问题,这让我很烦恼,我写这篇文章是为了寻找解决方案。
在我的旧架构中,一切都工作正常:
我们在 JDBC 中有一些数据,可以从特定子网访问
我们使用云函数提取数据并将其写入BigQuery
我的新设置:
相同的数据库,但具有额外的安全性,我必须使用证书来连接
我已在连接字符串中添加了一些参数 useSSL=true&requireSSL=true&verifyServerCertificate=true&serverTimezone=UTC&trustCertificateKeyStoreUrl=file:/extra_files/ca-root.pem
其余功能的设置相同
我遇到的错误,我无法解决它:java.sql.SQLException:无法创建 PoolableConnectionFactory(无法打开文件:/extra_files/ca-root.pem [/extra_files/ca-root.pem(没有这样的文件)或目录)]
从此https://cloud.google.com/dataflow/docs/guides/templates/ssl-certificates 就是这么简单,但我的设置有点复杂
我已经添加了模板参数,但作业仍然无法正常工作,文件不存在,我丢失了一些我不知道是什么的东西。
我的工作设置: 数据流 - MySQLToBigQuery.java:
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
/**
* MySQL to BigQuery pipeline.
*/
public class MySQLToBigQuery {
public static void main(String[] args) {
// Create pipeline
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(CustomPipelineOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return PipelineResult
*/
private static PipelineResult run(CustomPipelineOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Append TableRow to BigQuery via BigQueryIO
*/
pipeline
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from MySQL",
JdbcIO.<TableRow>read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
StaticValueProvider.of("com.mysql.cj.jdbc.Driver"), options.getConnectionURL())
.withUsername(options.getUsername())
.withPassword(options.getPassword())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery(options.getQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(getResultSetToTableRow()))
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));
// Execute the pipeline.
return pipeline.run();
}
/**
* Factory method for {@link ResultSetToTableRow}.
*/
private static JdbcIO.RowMapper<TableRow> getResultSetToTableRow() {
return new ResultSetToTableRow();
}
/**
* {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
*/
private static class ResultSetToTableRow implements JdbcIO.RowMapper<TableRow> {
@Override
public TableRow mapRow(ResultSet resultSet) throws Exception {
ResultSetMetaData metaData = resultSet.getMetaData();
TableRow outputTableRow = new TableRow();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
outputTableRow.set(metaData.getColumnName(i), resultSet.getObject(i));
}
return outputTableRow;
}
}
}
数据流:CustomPipelineOptions.java
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
public interface CustomPipelineOptions extends PipelineOptions {
@Description(
"The JDBC connection URL string. " + "for example: jdbc:mysql://some-host:3306/sampledb")
ValueProvider<String> getConnectionURL();
void setConnectionURL(ValueProvider<String> connectionURL);
@Description(
"JDBC connection property string. " + "for example: unicode=true;characterEncoding=UTF-8")
ValueProvider<String> getConnectionProperties();
void setConnectionProperties(ValueProvider<String> connectionProperties);
@Description("JDBC connection user name. ")
ValueProvider<String> getUsername();
void setUsername(ValueProvider<String> username);
@Description("JDBC connection password. ")
ValueProvider<String> getPassword();
void setPassword(ValueProvider<String> password);
@Description("Source data query string. " + "for example: select * from sampledb.sample_table")
ValueProvider<String> getQuery();
void setQuery(ValueProvider<String> query);
@Description(
"BigQuery Table spec to write the output to"
+ "for example: some-project-id:somedataset.sometable")
ValueProvider<String> getOutputTable();
void setOutputTable(ValueProvider<String> outputTable);
@Description("Temporary directory for BigQuery loading process")
ValueProvider<String> getBigQueryLoadingTemporaryDirectory();
void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> bigQueryLoadingTemporaryDirectory);
}
我的云函数index.js:
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload.
* @param {!Object} context Metadata for the event.
*/
exports.startDataflow = async (event, context) => {
// general constants
const REGION = process.env.REGION;
const ZONE = process.env.ZONE;
const BUCKET = process.env.BUCKET;
const SUBNET = process.env.SUBNET;
const NETWORK = process.env.NETWORK
const TOPIC_MONITOR = process.env.TOPIC_MONITOR;
// MySQL config
const mySqlConfig = {
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT,
username: process.env.MYSQL_USERNAME,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
properties: process.env.MYSQL_PROPERTIES,
timezone: process.env.MYSQL_TIMEZONE
};
// import modules
const DataflowUtil = require('./dataflow-util');
const dataflowUtil = new DataflowUtil();
const BqUtil = require('./bq-util.js');
const bqUtil = new BqUtil();
const QueryUtil = require('./query-util');
const queryUtil = new QueryUtil(BUCKET);
const SchedulerUtil = require('./scheduler-util');
const schedulerUtil = new SchedulerUtil();
const userData = Buffer.from(event.data, 'base64').toString();
console.log(`Input data received: ${userData}`);
const inputData = JSON.parse(userData);
async function main() {
// find Project ID
console.log('Finding current project ID.');
const projectId = await dataflowUtil.getProjectId();
// create Big Query staging and destination table
let stagingTableExists = await bqUtil.tableExists(inputData.stagingTable);
if (stagingTableExists) {
console.log('Deleting Big Query staging table.');
await bqUtil.deleteTable(inputData.stagingTable);
stagingTableExists = false;
}
const destinationTableExists = await bqUtil.tableExists(inputData.destinationTable);
if (!stagingTableExists || !destinationTableExists) {
// extract schema from MySQL
if (typeof inputData.schema !== "undefined") {
console.log('Extracting Big Query schema from input data.');
var fields = inputData.schema;
} else {
console.log('Extracting Big Query schema from MySQL.');
const fields = await bqUtil.extractFieldsFromMySql(inputData.columns, inputData.sourceTable, mySqlConfig);
}
if (!stagingTableExists) {
console.log('Creating Big Query staging table.');
await bqUtil.createTable(inputData.stagingTable, fields);
}
if (!destinationTableExists) {
console.log('Creating Big Query destination table.');
await bqUtil.createTable(inputData.destinationTable, fields);
}
}
// assemble source MySQL query
const queryFields = inputData.columns.join("`, `");
let query = `SELECT \`${queryFields}\` FROM \`${inputData.sourceTable}\``;
// get the latest offset in order to compose the query
if (typeof inputData.limitColumn !== 'undefined' && inputData.limitColumn.length > 0) {
console.log('Reading current offset from Cloud Storage.');
var limitStart = await queryUtil.lastOffset(inputData.jobName, inputData.limitColumn, '0000-00-00 00:00:00');
var limitEnd = queryUtil.flexibleUTC(-5);
var extendedLimitEnd = queryUtil.flexibleUTC(+5);;
query += ` WHERE \`${inputData.limitColumn}\` >= '${limitStart}' AND \`${inputData.limitColumn}\` <= '${extendedLimitEnd}'`;
}
console.log(`MySQL query: ${query}`);
// run dataflow job
console.log('Starting Dataflow job from template.');
const dataflowConfig = {
projectId: projectId,
region: REGION,
jobName: inputData.jobName,
templatePath: `gs://${BUCKET}/templates/MySQLToBigQuery`,
connection: mySqlConfig,
query: query,
outputTable: inputData.stagingTable,
bqLoadTempLocation: `gs://${BUCKET}/bq-temp`,
tempLocation: `gs://${BUCKET}/temp`,
network: NETWORK,
subnetwork: SUBNET,
zone: ZONE,
machineType: inputData.machineType || 'n1-standard-1',
};
await dataflowUtil.startJobFromTemplate(dataflowConfig).then((dataflowJobId) => {
// setup Cloud Scheduler monitoring cron
console.log('Setting up Cloud Scheduler cron for monitoring Dataflow job.');
inputData.dataflowJobId = dataflowJobId; // pass the Dataflow job ID to the monitoring function
const config = {
projectId: projectId,
region: REGION,
jobName: `monitor-dataflow-${dataflowJobId}`,
description: 'Triggers a Cloud Function for monitoring a Dataflow job.',
topic: TOPIC_MONITOR,
payload: inputData,
schedule: '* * * * *',
timezone: "UTC"
};
return schedulerUtil.createJob(config);
}).then((data)=>{
// save new offset
if (typeof limitEnd !== "undefined" && typeof limitStart !== "undefined") {
console.log('Saving new offset in Cloud Storage.');
return queryUtil.saveOffset(inputData.jobName, inputData.limitColumn, limitEnd, limitStart);
} else {
return null;
}
}).then((data) => {
console.log('Done starting dataflow job.');
});
}
await main();
};
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload.
* @param {!Object} context Metadata for the event.
*/
exports.monitorDataflow = async (event, context) => {
// general constants
const REGION = process.env.REGION;
const BUCKET = process.env.BUCKET;
// import modules
const DataflowUtil = require('./dataflow-util');
const dataflowUtil = new DataflowUtil();
const SchedulerUtil = require('./scheduler-util');
const schedulerUtil = new SchedulerUtil();
const BqUtil = require('./bq-util');
const bqUtil = new BqUtil();
const QueryUtil = require('./query-util');
const queryUtil = new QueryUtil(BUCKET);
const userData = Buffer.from(event.data, 'base64').toString();
console.log(`Input data received: ${userData}`);
const inputData = JSON.parse(userData);
/**
* Returns the MERGE query
*
* @param {string} stagingTable
* @param {string} destinationTable
* @param {string} columns
* @param {string} pkColumn
*/
function assembleMergeQuery(stagingTable, destinationTable, columns, pkColumn) {
let updateStatement = [];
columns.forEach(element => {
updateStatement.push(`D.\`${element}\` = S.\`${element}\``);
});
const query = `MERGE \`${destinationTable.replace(':', '.')}\` D
USING \`${stagingTable.replace(':', '.')}\` S ON S.\`${pkColumn}\` = D.\`${pkColumn}\`
WHEN NOT MATCHED THEN
INSERT (\`${columns.join("`, `")}\`) VALUES (\`${columns.join("`, `")}\`)
WHEN MATCHED THEN
UPDATE SET ${updateStatement.join(", ")}`;
return query;
}
async function main() {
const projectId = await dataflowUtil.getProjectId();
const status = await dataflowUtil.getJobStatus(projectId, REGION, inputData.dataflowJobId);
if (status === 'JOB_STATE_DONE') {
console.log('Deleting Cloud Scheduler job.');
await schedulerUtil.deleteJob(inputData.csJobName);
console.log('Running Big Query MERGE query.');
const query = assembleMergeQuery(inputData.stagingTable, inputData.destinationTable, inputData.columns, inputData.pkColumn);
let queryJobId = await bqUtil.runQuery(query, projectId);
console.log(`Big Query query ${queryJobId} has been started. Done.`);
} else if(status === 'JOB_STATE_FAILED' || status === 'JOB_STATE_CANCELLED'){
console.log(`Deleting Cloud Scheduler job for failed/canceled dataflow job (${status}).`);
await schedulerUtil.deleteJob(inputData.csJobName);
console.log('Rolling back offset.');
await queryUtil.rollbackOffset(inputData.jobName, inputData.limitColumn);
console.log('Done.');
} else {
console.log('Job is still in progress (' + status + '). Exiting.');
}
}
await main();
};
https://cloud.google.com/dataflow/docs/guides/templates/ssl-certificates中描述的过程仅适用于Google提供的模板。
您是否使用 google 提供的 MySql to BigQuery 模板?如果是的话,这应该可行。要仔细检查,您还可以在 Dataflow 控制台中检查工作日志。应该有一个日志表明该文件已暂存。
如果您使用自己的自定义模板,则必须实现像 this 这样的 JvmInitializer 并确保它位于类路径中。在 JvmInitializer 的
beforeProcessing
方法中,您可以从 GCS 复制证书并将其保存在本地。 JvmInitializer 在数据处理开始之前在每个工作线程中执行。因此,在创建与数据库的连接之前,应确保工作程序中存在证书。