如何在java中配置GCP Spanner ChangeStream读取持续时间

问题描述 投票:0回答:1

我们在 Apache Beam Java 数据流作业中使用 GCP Spanner ChangeStream。我们使用 SpannerIO 连接器对其进行配置。代码如下,

static class Read extends PTransform<PBegin, PCollection<DataChangeRecord>> {

        @Override
        public PCollection<DataChangeRecord> expand(PBegin input) {
            Pipeline pipeline = input.getPipeline();
            Options options = (Options) pipeline.getOptions();

            // Retrieve and parse the startTimestamp and endTimestamp.
            Timestamp startTimestamp =
                options.getStartTimestamp().isEmpty()
                    ? Timestamp.now()
                    : Timestamp.parseTimestamp(options.getStartTimestamp());
            Timestamp endTimestamp =
                options.getEndTimestamp().isEmpty()
                    ? Timestamp.now()
                    : Timestamp.parseTimestamp(options.getEndTimestamp());

            SpannerConfig spannerConfig =
                SpannerConfig.create()
                    .withProjectId(getSpannerProjectId(options))
                    .withInstanceId(getRequiredData(options.getSpannerInstanceId()))
                    .withDatabaseId(getRequiredData(options.getSpannerDatabase()))
                    .withRpcPriority(options.getSpannerRpcPriority());

            SpannerIO.ReadChangeStream stream =
                SpannerIO.readChangeStream()
                    .withSpannerConfig(spannerConfig)
                    .withMetadataInstance(getRequiredData(options.getSpannerMetadataInstanceId()))
                    .withMetadataDatabase(getRequiredData(options.getSpannerMetadataDatabase()))
                    .withChangeStreamName(options.getSpannerChangeStreamName())
                    .withInclusiveStartAt(startTimestamp)
                    .withInclusiveEndAt(endTimestamp)
                    .withRpcPriority(options.getSpannerRpcPriority());

            String spannerMetadataTableName = options.getSpannerMetadataTableName();
            if (spannerMetadataTableName != null) {
                readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
            }

            return pipeline.apply("Read from Spanner", stream );
        }

工作正常。但每一秒它都在敲击扳手并寻找变化。我们必须配置扳手点击频率。在日志中我们可以看到启动应用程序后每秒的日志

INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)

谁知道如何配置这个。我们不想使用 Spanner 数据库客户端。 spannerIO有什么办法吗?

提前致谢:)

google-cloud-platform apache-beam google-cloud-spanner changestream
1个回答
0
投票

您看到的日志来自代码中的这一行:https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/ org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java#L132

此代码与执行变更流查询的Spanner数据库客户端无关。此代码来自 DetectNewPartitionsDoFn,它会轮询元数据表以查找新的更改流分区,以每 100 毫秒执行一次。

您能在这里详细说明您的问题吗?

© www.soinside.com 2019 - 2024. All rights reserved.