我正在尝试使用 gRPC 和生成的原型执行到 Google Cloud Storage 的简单上传,但我遇到了 WriteObjectRequest 的问题。我收到的错误是:
必须为此请求提供 x-goog-request-params 请求元数据属性。
奇怪的是,它似乎甚至没有检查
x-goog-request-params
是否存在于请求元数据中。但是,当我对 ReadObjectRequest 使用相同的标头集时,它会正确识别 x-goog-request-params
并成功处理请求。我能够毫无问题地验证 ReadObjectRequest 的内容。
use gcloud_sdk::google::storage::v2::{storage_client::StorageClient, write_object_request, ChecksummedData, Object, ReadObjectRequest, WriteObjectRequest, WriteObjectSpec};
use tonic::{metadata::MetadataValue, transport::Channel, Request};
use futures::stream;
use hyper_rustls;
use tokio_stream::StreamExt;
use tonic::transport::ClientTlsConfig;
const BUCKET_NAME: &str = "test-bucket";
const GOOGLE_AUTH_TOKEN: &str = "token-xxxx";
struct GcsClient {
client: StorageClient<Channel>,
bucket: String,
}
impl GcsClient {
async fn new(bucket: String) -> Result<Self, Box<dyn std::error::Error>> {
let channel = Channel::from_static("https://storage.googleapis.com")
.connect_timeout(std::time::Duration::from_secs(5))
.timeout(std::time::Duration::from_secs(30))
.tcp_nodelay(true)
.http2_adaptive_window(true)
.http2_keep_alive_interval(std::time::Duration::from_secs(30))
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?;
Ok(Self {
client: StorageClient::new(channel),
bucket,
})
}
fn get_formatted_bucket(&self) -> String {
format!("projects/_/buckets/{}", self.bucket)
}
async fn get_token() -> Result<String, Box<dyn std::error::Error>> {
Ok(GOOGLE_AUTH_TOKEN.to_string())
}
async fn auth_request<T>(&self, request: T) -> Request<T> {
let token = Self::get_token().await.expect("Failed to get authentication token");
let mut request = Request::new(request);
// Authorization header
request.metadata_mut().insert(
"authorization",
MetadataValue::try_from(&format!("Bearer {}", token)).unwrap(),
);
let formatted_bucket = self.get_formatted_bucket();
let encoded_bucket = urlencoding::encode(&formatted_bucket);
// Adding required x-goog-request-params based on request type
let params = if std::any::type_name::<T>().contains("WriteObjectRequest") {
format!("write_object_spec.resource.bucket={}", encoded_bucket)
} else if std::any::type_name::<T>().contains("ReadObjectRequest") {
format!("bucket={}", encoded_bucket)
} else if std::any::type_name::<T>().contains("StartResumableWriteRequest") {
format!("write_object_spec.resource.bucket={}", encoded_bucket)
} else if std::any::type_name::<T>().contains("QueryWriteStatusRequest") {
format!("upload_id={}", encoded_bucket)
} else {
format!("bucket={}", encoded_bucket)
};
request.metadata_mut().insert(
"x-goog-request-params",
MetadataValue::try_from(¶ms).unwrap(),
);
request
}
async fn simple_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
// Create write specification
let write_spec = WriteObjectSpec {
resource: Some(Object {
name: object_name.to_string(),
bucket: self.get_formatted_bucket(),
..Default::default()
}),
..Default::default()
};
// Create a single request with both spec and data
let write_request = WriteObjectRequest {
first_message: Some(write_object_request::FirstMessage::WriteObjectSpec(write_spec)),
write_offset: 0,
data: Some(write_object_request::Data::ChecksummedData(ChecksummedData {
content: data,
crc32c: None,
})),
finish_write: true,
..Default::default()
};
// Create authenticated request and convert to stream
let auth_request = self.auth_request(write_request).await;
// Print the request metadata for debugging
println!("Request metadata: {:#?}", auth_request.metadata());
// Output from above:
// Request metadata: MetadataMap {
// headers: {
// "authorization": "Bearer token-xxxx",
// "x-goog-request-params": "write_object_spec.resource.bucket=projects%2F_%2Fbuckets%2Ftest-bucket",
// },
// }
// Send the request
match self.client.write_object(stream::iter(vec![auth_request.into_inner()])).await {
Ok(response) => {
println!("Upload successful! Response: {:?}", response);
Ok(())
}
Err(e) => {
println!("Error details: {:#?}", e);
// Output from above:
// Error details: Status {
// code: InvalidArgument,
// message: "An x-goog-request-params request metadata property must be provided for this request.",
// details: b"\x08\x03\x12UAn x-goog-request-params request metadata property must be provided for this request.\x1ah\n(type.googleapis.com/google.rpc.ErrorInfo\x12<\n\"GRPC_INVALID_X_GOOG_REQUEST_PARAMS\x12\x16storage.googleapis.com",
// metadata: MetadataMap {
// headers: {
// "grpc-server-stats-bin": "AAAzYToAAAAAAA",
// "google.rpc.errorinfo-bin": "CiJHUlBDX0lOVkFMSURfWF9HT09HX1JFUVVFU1RfUEFSQU1TEhZzdG9yYWdlLmdvb2dsZWFwaXMuY29t",
// "endpoint-load-metrics-bin": "MbFRfe/oHjJASbtUC3gdlq0/",
// "content-type": "application/grpc",
// "grpc-accept-encoding": "identity, deflate, gzip",
// "content-length": "0",
// "date": "Tue, 07 Jan 2025 21:58:42 GMT",
// "alt-svc": "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
// },
// },
// source: None,
// }
Err(Box::new(e))
}
}
}
async fn download_object(&mut self, object_name: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let request = ReadObjectRequest {
bucket: self.get_formatted_bucket(),
object: object_name.to_string(),
..Default::default()
};
let request = self.auth_request(request).await;
println!("Request metadata: {:#?}", request.metadata());
// Output from above:
// Request metadata: MetadataMap {
// headers: {
// "authorization": "Bearer token-xxxx",
// "x-goog-request-params": "bucket=projects%2F_%2Fbuckets%2Ftest-bucket",
// },
// }
let response = self.client.read_object(request).await?;
let mut stream = response.into_inner();
let mut content = Vec::new();
while let Some(chunk) = StreamExt::next(&mut stream).await {
let chunk = chunk?;
if let Some(data) = chunk.checksummed_data {
content.extend(data.content);
}
}
Ok(content)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bucket = BUCKET_NAME.to_string();
let mut client = GcsClient::new(bucket).await?;
// Test small file upload
println!("\n=== Testing Small File Upload ===");
let small_content = b"Hello, GCS! This is a test.".to_vec();
let small_object_name = "test-small-file.txt";
println!("Uploading small file: {}", small_object_name);
client.simple_upload(small_object_name, small_content.clone()).await?;
println!("Small file upload complete!");
// The above output is failing with the error specified in the function.
// Download and verify small file
println!("\n=== Downloading Small File ===");
let downloaded = client.download_object(small_object_name).await?;
println!("Downloaded content: {}", String::from_utf8_lossy(&downloaded));
println!("Content verification: {}", downloaded == small_content);
// Output from above:
// Downloaded content: Hello, GCS! This is a test.
// Content verification: true
Ok(())
}
我已经花了超过 24 小时尝试不同的方法来解决这个问题,但我仍然陷入困境。任何帮助将不胜感激!
您遇到的错误似乎是预期的行为。
WriteObject
方法是请求流,但不是回复流:最后只有一个回复。
还说明了这里
当打开 ClientStream 时,不保证 BucketName 可以源自请求消息。对于一元流或服务器流,必须在发起请求之前提供请求消息。
Google Cloud Storage 在每条流消息上都需要
x-goog-request-params
(对于 WriteObject):与 ReadObject 不同,Google Cloud Storage gRPC 似乎严格要求流中的每个 WriteObjectRequest 都有 x-goog-request-params
元数据标头。