我一般来说是 BigQuery 的新手,我正在尝试将多行插入到 bigquery 表中,同时还在单个事务中修改另一个表。
我在 kotlin 中使用 BigQuery java 客户端,但在为
insertAll
请求设置会话 ID 时遇到问题。
我通过创建一个事务并创建一个新的会话 ID 开始
val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
.setCreateSession(true)
.build()
// runQuery just creates a job and waits for execution to end
val sessionId = runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
然后我为第一个请求设置会话 ID,更新另一个表
val queryConfig2 = QueryJobConfiguration.newBuilder("SOME_SQL_STATEMENT")
.setCreateSession(false)
.setConnectionProperties(listOf(ConnectionProperty.of("session_id", sessionId)))
.setNamedParameters(values)
.setUseLegacySql(false)
.build()
runQuery(queryConfig2)
最后我发送了InsertAllRequest,但我不确定如何在这里设置会话id?
bigquery.insertAll(
InsertAllRequest.newBuilder(tableId)
.setRows(rowContents)
.build()
)
所有这些目前都包含在这个小例程中:
suspend fun runRoutine(source: String, : String, name: String, records: List<RowToInsert>)
val context = "context"
val id = "id"
val latestRefreshAt = "latestRefreshAt"
transaction { configureSession ->
query(
"""
UPDATE `${googleTableId1.toSqlTableName()}`
SET $latestRefreshAt = CURRENT_TIMESTAMP()
WHERE $context = @$context AND $id = @$id
""",
mapOf(
context to QueryParameterValue.string(source),
id to QueryParameterValue.string(name),
),
configureSession
)
tableInsertRows(
tableId = googleTableId2,
rowContents = records,
)
}
}
suspend fun transaction(block: suspend BigQuery.(QueryJobConfiguration.Builder.() -> Unit) -> Unit) {
val sessionId = beginTransaction()
val prepareQuery: QueryJobConfiguration.Builder.() -> Unit = {
setCreateSession(false)
setConnectionProperties(
listOf(ConnectionProperty.of("session_id", sessionId))
)
}
try {
bigquery.block(prepareQuery)
commitTransaction(prepareQuery)
} catch (throwable: Throwable) {
rollbackTransaction(prepareQuery)
closeSession(prepareQuery)
throw throwable
}
}
private suspend fun beginTransaction(): String {
val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
.setCreateSession(true)
.build()
return runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
}
private suspend fun commitTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("COMMIT TRANSACTION;")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun rollbackTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun closeSession(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
val queryConfig = QueryJobConfiguration.newBuilder("CALL BQ.ABORT_SESSION();")
.apply(prepareQuery)
.build()
runQuery(queryConfig)
}
private suspend fun runQuery(queryJobConfiguration: QueryJobConfiguration): Job {
val jobId = JobId.newBuilder().setProject(projectId.value).build()
val queryJob =
bigquery.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build())
.waitFor()
if (queryJob == null) {
throw RuntimeException("Job no longer exists")
} else if (queryJob.getStatus().getError() != null) {
throw RuntimeException(queryJob.getStatus().getError().toString())
}
return queryJob
}
suspend fun query(
sql: String,
values: Map<String, QueryParameterValue>,
configure: QueryJobConfiguration.Builder.() -> Unit = {},
): TableResult {
val queryConfig = QueryJobConfiguration.newBuilder(sql)
.apply(configure)
.setNamedParameters(values)
.setUseLegacySql(false)
.build()
return runQuery(queryConfig).getQueryResults()
}
suspend fun tableInsertRows(
tableId: TableId,
rowContents: Iterable<InsertAllRequest.RowToInsert>,
) {
bigquery.insertAll(
InsertAllRequest.newBuilder(tableId)
.setRows(rowContents)
.build()
)
}
private val bigquery by lazy {
BigQueryOptions.getDefaultInstance().getService()
};
所以,我想知道如何为 insertAll 请求设置 sessionId?
据我所知,您无法使用 BigQuery Java 客户端直接为 insertAll 请求设置 sessionID。 insertAll 专为流式插入而设计,不会像查询那样参与显式事务。
我认为事务中的参数化 INSERT 语句将是一个不错的选择。这样,您就可以保证操作的原子性,因此要么应用所有更改,要么不应用任何更改。这是在 BigQuery 中管理单个事务中的多个更改的更好方法。