我们在 Apache Beam Java 数据流作业中使用 GCP Spanner ChangeStream。我们使用 SpannerIO 连接器对其进行配置。代码如下,
static class Read extends PTransform<PBegin, PCollection<DataChangeRecord>> {
@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
Options options = (Options) pipeline.getOptions();
// Retrieve and parse the startTimestamp and endTimestamp.
Timestamp startTimestamp =
options.getStartTimestamp().isEmpty()
? Timestamp.now()
: Timestamp.parseTimestamp(options.getStartTimestamp());
Timestamp endTimestamp =
options.getEndTimestamp().isEmpty()
? Timestamp.now()
: Timestamp.parseTimestamp(options.getEndTimestamp());
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(getSpannerProjectId(options))
.withInstanceId(getRequiredData(options.getSpannerInstanceId()))
.withDatabaseId(getRequiredData(options.getSpannerDatabase()))
.withRpcPriority(options.getSpannerRpcPriority());
SpannerIO.ReadChangeStream stream =
SpannerIO.readChangeStream()
.withSpannerConfig(spannerConfig)
.withMetadataInstance(getRequiredData(options.getSpannerMetadataInstanceId()))
.withMetadataDatabase(getRequiredData(options.getSpannerMetadataDatabase()))
.withChangeStreamName(options.getSpannerChangeStreamName())
.withInclusiveStartAt(startTimestamp)
.withInclusiveEndAt(endTimestamp)
.withRpcPriority(options.getSpannerRpcPriority());
String spannerMetadataTableName = options.getSpannerMetadataTableName();
if (spannerMetadataTableName != null) {
readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
}
return pipeline.apply("Read from Spanner", stream );
}
工作正常。但每一秒它都在敲击扳手并寻找变化。我们必须配置扳手点击频率。在日志中我们可以看到启动应用程序后每秒的日志
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
Jun 28, 2024 7:30:10 PM org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction getAllPartitionsCreatedAfter
INFO: Found 0 to be scheduled (readTimestamp = 2024-06-28T23:30:09.117068001Z)
谁知道如何配置这个。我们不想使用 Spanner 数据库客户端。 spannerIO有什么办法吗?
提前致谢:)
此代码与执行变更流查询的Spanner数据库客户端无关。此代码来自 DetectNewPartitionsDoFn,它会轮询元数据表以查找新的更改流分区,以每 100 毫秒执行一次。
您能在这里详细说明您的问题吗?