我在多个实例上部署了一项服务,所有指标都报告给一个公共收集器。 每个实例都由三元组
(service.namespace, service.name, service.instance.id)
标识。
然而,这种设置会导致大量时间序列,因为 service.instance.id
值可能具有很大的基数。
我从不使用此标签作为分组标签,因此我可能会将其删除,但在这种情况下,导出到 Google Cloud Stackdriver 会因时间序列冲突而失败。 有没有办法在将收集器中的指标导出到 GCP 之前对其进行聚合?
metricstransform
处理器,但它似乎仅在数据点标签上聚合。service.instance.id
标签(使用 resource
处理器),然后使用 metricstransform
,但这会导致导出过程中出现错误:2024-11-22T15:25:27.422Z error exporterhelper/queue_sender.go:128 Exporting failed. No more retries left. Dropping data.
{
"kind": "exporter",
"data_type": "metrics",
"name": "googlecloud",
"error": "rpc error:
code = InvalidArgument
desc = One or more TimeSeries could not be written:
timeSeries[0-4,8-14]:
write for resource=k8s_cluster{location:us-central1,cluster_name:gb-rgke-usc1-production} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.
error details: name = Unknown desc = total_point_count:27 success_point_count:2 errors:{status:{code:3} point_count:10}", "dropped_items": 27
}
有办法实现吗?
善良, 亚历克西斯
从@Jeff 的评论来看,间隔处理器似乎很有前途。我构建了一个自定义收集器发行版并部署了它。 虽然它在测试环境(流量较低)中似乎可以工作(删除
instance_id
并聚合值),但一旦部署到生产环境中,我遇到了多个错误,并且 StackDriver 中的图表并未反映实际行为(仅反映了 10% 的行为)。请求已计算在内...)。
这是我的 otelcol 配置(部分):
receivers:
otlp:
protocols:
http:
endpoint: ${env:POD_NAME}:4318
processors:
resourcedetection:
detectors: [gcp]
timeout: 10s
batch:
memory_limiter:
check_interval: 1s
limit_percentage: 65
spike_limit_percentage: 20
resource/merge_instances:
attributes:
- key: service.instance.id
action: delete
interval:
interval: 15s
exporters:
googlecloud:
project: mygcpproject
service:
pipelines:
metrics:
receivers: [otlp]
processors: [memory_limiter, batch, resource/merge_instances, interval, resourcedetection]
exporters: [googlecloud]
收集器的输出:
2024-11-28T15:12:16.349Z info [email protected]/service.go:166 Setting up own telemetry...
2024-11-28T15:12:16.349Z info telemetry/metrics.go:70 Serving metrics {"address": "localhost:8888", "metrics level": "Normal"}
2024-11-28T15:12:16.350Z info builders/builders.go:26 Development component. May change in the future. {"kind": "processor", "name": "interval", "pipeline": "metrics"}
2024-11-28T15:12:16.351Z info [email protected]/memorylimiter.go:151 Using percentage memory limiter {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "total_memory_mib": 3923, "limit_percentage": 65, "spike_limit_percentage": 20}
2024-11-28T15:12:16.351Z info [email protected]/memorylimiter.go:75 Memory limiter configured {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "limit_mib": 2550, "spike_limit_mib": 784, "check_interval": 1}
2024-11-28T15:12:16.353Z info [email protected]/service.go:238 Starting otelcol-custom... {"Version": "0.114.0", "NumCPU": 2}
2024-11-28T15:12:16.353Z info extensions/extensions.go:39 Starting extensions...
2024-11-28T15:12:16.353Z info extensions/extensions.go:42 Extension is starting... {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:16.353Z info [email protected]/healthcheckextension.go:32 Starting health_check extension {"kind": "extension", "name": "health_check", "config": {"Endpoint":"otel-collector-7698d7ddb-sncck:13133","TLSSetting":null,"CORS":null,"Auth":null,"MaxRequestBodySize":0,"IncludeMetadata":false,"ResponseHeaders":null,"CompressionAlgorithms":null,"ReadTimeout":0,"ReadHeaderTimeout":0,"WriteTimeout":0,"IdleTimeout":0,"Path":"/","ResponseBody":null,"CheckCollectorPipeline":{"Enabled":false,"Interval":"5m","ExporterFailureThreshold":5}}}
2024-11-28T15:12:16.354Z info extensions/extensions.go:59 Extension started. {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:17.023Z info internal/resourcedetection.go:126 began detecting resource information {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics"}
2024-11-28T15:12:17.035Z info internal/resourcedetection.go:140 detected resource information {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics", "resource": {"cloud.account.id":"mygcpproject","cloud.platform":"gcp_kubernetes_engine","cloud.provider":"gcp","cloud.region":"us-central1","host.id":"xxxx","host.name":"xxxx","k8s.cluster.name":"xxxx"}}
2024-11-28T15:12:17.037Z info [email protected]/otlp.go:169 Starting HTTP server {"kind": "receiver", "name": "otlp", "data_type": "metrics", "endpoint": "otel-collector-7698d7ddb-sncck:4318"}
2024-11-28T15:12:17.037Z info healthcheck/handler.go:132 Health Check state change {"kind": "extension", "name": "health_check", "status": "ready"}
2024-11-28T15:12:17.037Z info [email protected]/service.go:261 Everything is ready. Begin running and processing data.
2024-11-28T15:12:32.274Z error internal/queue_sender.go:92 Exporting failed. Dropping data. {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[0-11]: write for resource=k8s_cluster{location:us-central1,cluster_name:xxxx} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.\nerror details: name = Unknown desc = total_point_count:12 success_point_count:7 errors:{status:{code:3} point_count:5}", "dropped_items": 12}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43
[...]
2024-11-28T15:13:00.226Z error internal/queue_sender.go:92 Exporting failed. Dropping data. {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[4]: Field timeSeries[4] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[6]: Field timeSeries[6] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[7]: Field timeSeries[7] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[5]: Field timeSeries[5] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[9]: Field timeSeries[9] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[8]: Field timeSeries[8] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.\nerror details: name = Unknown desc = total_point_count:10 errors:{status:{code:3} point_count:6}", "dropped_items": 10}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43
我相信我找到了处理我的用例的正确设置。
receivers:
otlp:
protocols:
http:
endpoint: ${env:POD_NAME}:4318
processors:
memory_limiter:
check_interval: 1s
limit_percentage: 65
spike_limit_percentage: 20
interval:
interval: 10s
resourcedetection:
detectors: [gcp]
timeout: 10s
cumulativetodelta:
max_staleness: 24h
transform/resource:
error_mode: ignore
metric_statements:
- context: "resource"
statements:
- set(attributes["service.instance.id"], attributes["service.namespace"]) # Override instance ID to allow aggregation
- set(attributes["k8s.namespace.name"], "GMP namespace") # Set namespace for Managed Prometheus export
groupbyattrs: # Group all metrics from the same group of publishers (discard the service instance ID)
keys:
- service.name
- service.version
- service.namespace
transform/aggregate: # Aggregate metrics
error_mode: ignore
metric_statements:
- context: resource
statements: []
- context: datapoint
statements:
- set(time, TruncateTime(Now(), Duration("10s"))) # Align timestamps to allow aggregation
- set(start_time, TruncateTime(start_time, Duration("10s"))) # Align timestamps to allow aggregation
- delete_key(attributes, "http.host")
- delete_key(attributes, "net.host.port")
- delete_key(attributes, "http.server_name")
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- context: metric
statements:
- aggregate_on_attributes("sum") where type != METRIC_DATA_TYPE_GAUGE
- aggregate_on_attributes("mean") where type == METRIC_DATA_TYPE_GAUGE
deltatocumulative:
exporters:
googlemanagedprometheus:
service:
pipelines:
metrics:
receivers: [otlp]
processors:
- memory_limiter
- interval # Throttle publish rate to match Google Managed Prometheus limits
- resourcedetection # Add some required resources attributes for GCP
- cumulativetodelta # Convert to delta to support horizontal downscaling of the service
- transform/resource # Drop (update) the service.instance.id to group all the producers
- groupbyattrs # Group all timeseries coming from the same group of producers
- transform/aggregate # Aggregate timeseries in each group (requires to align timestamps and to drop unused attributes)
- deltatocumulative # Convert back to cumulative to match GCP expectations
exporters:
- googlemanagedprometheus
我希望这可以帮助别人。