Google BigQuery InsertAll 与事务?

问题描述 投票:0回答:1

我一般来说是 BigQuery 的新手,我正在尝试将多行插入到 bigquery 表中,同时还在单个事务中修改另一个表。

我在 kotlin 中使用 BigQuery java 客户端,但在为

insertAll
请求设置会话 ID 时遇到问题。

我通过创建一个事务并创建一个新的会话 ID 开始

val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
    .setCreateSession(true)
    .build()

// runQuery just creates a job and waits for execution to end
val sessionId = runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId

然后我为第一个请求设置会话 ID,更新另一个表

val queryConfig2 = QueryJobConfiguration.newBuilder("SOME_SQL_STATEMENT")
    .setCreateSession(false)
    .setConnectionProperties(listOf(ConnectionProperty.of("session_id", sessionId)))
    .setNamedParameters(values)
    .setUseLegacySql(false)
    .build()
runQuery(queryConfig2)

最后我发送了InsertAllRequest,但我不确定如何在这里设置会话id?

bigquery.insertAll(
    InsertAllRequest.newBuilder(tableId)
        .setRows(rowContents)
        .build()
)

所有这些目前都包含在这个小例程中:

    suspend fun runRoutine(source: String, : String, name: String, records: List<RowToInsert>) 
        val context = "context"
        val id = "id"
        val latestRefreshAt = "latestRefreshAt"
        transaction { configureSession ->
            query(
                """
                    UPDATE `${googleTableId1.toSqlTableName()}` 
                    SET $latestRefreshAt = CURRENT_TIMESTAMP()
                    WHERE $context = @$context AND $id = @$id
                """,
                mapOf(
                    context to QueryParameterValue.string(source),
                    id to QueryParameterValue.string(name),
                ),
                configureSession
            )

            tableInsertRows(
                tableId = googleTableId2,
                rowContents = records,
            )
        }
    }

    suspend fun transaction(block: suspend BigQuery.(QueryJobConfiguration.Builder.() -> Unit) -> Unit) {
        val sessionId = beginTransaction()
        val prepareQuery: QueryJobConfiguration.Builder.() -> Unit = {
            setCreateSession(false)
            setConnectionProperties(
                listOf(ConnectionProperty.of("session_id", sessionId))
            )
        }

        try {
            bigquery.block(prepareQuery)
            commitTransaction(prepareQuery)
        } catch (throwable: Throwable) {
            rollbackTransaction(prepareQuery)
            closeSession(prepareQuery)
            throw throwable
        }
    }

    private suspend fun beginTransaction(): String {
        val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
            .setCreateSession(true)
            .build()

        return runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
    }

    private suspend fun commitTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("COMMIT TRANSACTION;")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun rollbackTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun closeSession(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("CALL BQ.ABORT_SESSION();")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun runQuery(queryJobConfiguration: QueryJobConfiguration): Job {
        val jobId = JobId.newBuilder().setProject(projectId.value).build()
        val queryJob =
            bigquery.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build())
                .waitFor()

        if (queryJob == null) {
            throw RuntimeException("Job no longer exists")
        } else if (queryJob.getStatus().getError() != null) {
            throw RuntimeException(queryJob.getStatus().getError().toString())
        }

        return queryJob
    }

    suspend fun query(
        sql: String,
        values: Map<String, QueryParameterValue>,
        configure: QueryJobConfiguration.Builder.() -> Unit = {},
    ): TableResult {
        val queryConfig = QueryJobConfiguration.newBuilder(sql)
            .apply(configure)
            .setNamedParameters(values)
            .setUseLegacySql(false)
            .build()

        return runQuery(queryConfig).getQueryResults()
    }


    suspend fun tableInsertRows(
        tableId: TableId,
        rowContents: Iterable<InsertAllRequest.RowToInsert>,
    ) {
        bigquery.insertAll(
            InsertAllRequest.newBuilder(tableId)
                .setRows(rowContents)
                .build()
        )
    }

    private val bigquery by lazy {
        BigQueryOptions.getDefaultInstance().getService()
    };

所以,我想知道如何为 insertAll 请求设置 sessionId?

session google-bigquery transactions batch-insert
1个回答
0
投票

据我所知,您无法使用 BigQuery Java 客户端直接为 insertAll 请求设置 sessionID。 insertAll 专为流式插入而设计,不会像查询那样参与显式事务。

我认为事务中的参数化 INSERT 语句将是一个不错的选择。这样,您就可以保证操作的原子性,因此要么应用所有更改,要么不应用任何更改。这是在 BigQuery 中管理单个事务中的多个更改的更好方法。

© www.soinside.com 2019 - 2024. All rights reserved.