我在尝试(本地和 Azure 上)运行我的 Azure Func 时遇到了一个奇怪的问题。我明白了:
[2024-11-27T08:47:28.789Z] Error receiving event: Error { context: Custom(Custom { kind: Other, error: Fe2o3 Error: Receiver attach error RemoteClosedWithError(Error { condition: AmqpError(UnauthorizedAccess), description: Some("Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://backendiothub.servicebus.windows.net/backendiothubns/consumergroups/backendiotcg/partitions/0'. TrackingId:f5b003e1e2c54481adbcafaebc948090_G30, SystemTracker:gateway5, Timestamp:2024-11-27T08:47:28"), info: None }) }) }
但我很确定,我已授予对我的功能的完全访问权限:
1/Azure 门户 - 角色分配 2/Azure 门户 - 事件中心命名空间 3/Azure门户-环境变量
更新 - 这是代码:
配置.rs:
use std::{fmt, sync::Arc};
use anyhow::Result;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::consumer::ConsumerClient;
use figment::{
providers::Env,
Figment,
};
use serde::Deserialize;
use tracing::debug;
/// Structure contaning the config of the Azure EventHub.
#[derive(Deserialize, Clone, Default)]
pub struct EventHub {
host: String,
name: String,
#[serde(alias="consumergroup")]
consumer_group: String,
#[serde(skip)]
pub client: Option<Arc<ConsumerClient>>,
}
impl fmt::Debug for EventHub {
/// Custom Debug implementation as ConsumerClient do not implement it.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventHub")
.field("host", &self.host)
.field("name", &self.name)
.field("consumer_group", &self.name)
.finish_non_exhaustive()
}
}
/// Structure containing the configuaration of the application.
/// derive:
/// - Deserialize: can deserialize data to this structure.
/// - Clone: can be explicitly cloned if needed.
/// - Default: can be initialized empty. (mostly for testing)
/// - Debug: can be printed in debug output.
/// all the sub structure must implement or derive those trait;
#[derive(Deserialize, Clone, Default, Debug)]
pub struct Config {
pub eventhub: EventHub,
}
impl Config {
/// Create a new Config from the environment.
///
/// It use the folowing environment variable:
/// - EVENTHUB_HOST (fully qualified namespace of the EventHub)
/// - EVENTHUB_NAME (name of the EventHub)
/// the funcion then generates the credential from the default azure
/// environment variables and use it to create a client that is added to
/// the config.
pub fn new() -> Result<Self> {
let mut config: Config = Figment::new()
.merge(
Env::raw()
.filter(|k| k.starts_with("EVENTHUB_"))
.split("_")
).extract()?;
let credential = DefaultAzureCredential::new()?;
let client = ConsumerClient::new(
config.eventhub.host.clone(),
config.eventhub.name.clone(),
Some(config.eventhub.consumer_group.clone()),
credential,
None,
);
config.eventhub.client = Some(Arc::new(client));
debug!("Config: {config:?}");
Ok(config)
}
}
main.rs :
use anyhow::{bail, Result};
use tracing::info;
use futures::{pin_mut, StreamExt};
use tracing_subscriber::{fmt, EnvFilter};
mod config;
use config::Config;
async fn receive_events(config: &Config) -> Result<()>{
let Some(ref client) = config.eventhub.client else {
bail!("EventHub not correctly initialized!")
};
client.open().await.unwrap();
info!("Connected to the event hub.");
let event_stream = client
.receive_events_on_partition(
"0".to_string(),
Some(
azure_messaging_eventhubs::consumer::ReceiveOptions{
start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
..Default::default()
}),
..Default::default()
},
))
.await;
pin_mut!(event_stream);
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
fmt()
.with_target(false)
.with_level(true)
.with_env_filter(EnvFilter::from_default_env())
.init();
let config = Config::new()?;
receive_events(&config).await?;
println!("funcion terminated");
Ok(())
}
非常欢迎任何帮助。 预先感谢
我尝试了你的代码,由于
azure_messaging_eventhubs
包而发生了同样的错误。我建议切换到azeventhubs
。
我参考了此文档,使用 Visual Studio Code 在 Rust 中创建了一个 Azure 函数。
此外,我在 Rust 中使用了这个 documentation 来实现
azeventhubs
。
下面的 Rust 代码使用 Warp 框架创建一个 HTTP 服务器,该服务器与 Azure 事件中心集成以基于 HTTP GET 请求发送消息。
use std::collections::HashMap;
use std::env;
use std::net::Ipv4Addr;
use warp::{http::Response, Filter};
use azeventhubs::producer::{EventHubProducerClient, EventHubProducerClientOptions, SendEventOptions};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut producer_client = EventHubProducerClient::new_from_connection_string(
"<CONNECTION_STRING>",
"<EVENT_HUB_NAME>".to_string(),
EventHubProducerClientOptions::default()
).await?;
let partition_ids = producer_client.get_partition_ids().await?;
let example1 = warp::get()
.and(warp::path("api"))
.and(warp::path("HttpExample"))
.and(warp::query::<HashMap<String, String>>())
.map(|p: HashMap<String, String>| {
if let Some(name) = p.get("name") {
// Prepare and send event to the first partition
let event = format!("Hello, {} from Warp Server!", name);
let options = SendEventOptions::new().with_partition_id(&partition_ids[0]);
tokio::spawn(async move {
if let Err(e) = producer_client.send_event(&event, options).await {
eprintln!("Error sending event: {}", e);
}
});
Response::builder().body(format!("Hello, {}. This HTTP triggered function executed successfully.", name))
} else {
Response::builder().body(String::from("This HTTP triggered function executed successfully. Pass a name in the query string for a personalized response."))
}
});
let port_key = "FUNCTIONS_CUSTOMHANDLER_PORT";
let port: u16 = match env::var(port_key) {
Ok(val) => val.parse().expect("Custom Handler port is not a number!"),
Err(_) => 3000,
};
warp::serve(example1).run((Ipv4Addr::LOCALHOST, port)).await;
producer_client.close().await?;
Ok(())
}