未经授权的访问。需要“聆听”声明 - 使用 RUST(事件中心)的 Azure 函数

问题描述 投票:0回答:1

我在尝试(本地和 Azure 上)运行我的 Azure Func 时遇到了一个奇怪的问题。我明白了:

[2024-11-27T08:47:28.789Z] Error receiving event: Error { context: Custom(Custom { kind: Other, error: Fe2o3 Error: Receiver attach error RemoteClosedWithError(Error { condition: AmqpError(UnauthorizedAccess), description: Some("Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://backendiothub.servicebus.windows.net/backendiothubns/consumergroups/backendiotcg/partitions/0'. TrackingId:f5b003e1e2c54481adbcafaebc948090_G30, SystemTracker:gateway5, Timestamp:2024-11-27T08:47:28"), info: None }) }) }

但我很确定,我已授予对我的功能的完全访问权限:

1/Azure 门户 - 角色分配 Azure Portal Role assignment 2/Azure 门户 - 事件中心命名空间 Event Hub Namespace 3/Azure门户-环境变量 Environment variables

更新 - 这是代码:

配置.rs:

use std::{fmt, sync::Arc};

use anyhow::Result;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::consumer::ConsumerClient;
use figment::{
    providers::Env,
    Figment,
};
use serde::Deserialize;
use tracing::debug;

/// Structure contaning the config of the Azure EventHub.
#[derive(Deserialize, Clone, Default)]
pub struct EventHub {
    host: String,
    name: String,
    #[serde(alias="consumergroup")]
    consumer_group: String,
    #[serde(skip)]
    pub client: Option<Arc<ConsumerClient>>,
}

impl fmt::Debug for EventHub {
/// Custom Debug implementation as ConsumerClient do not implement it.
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("EventHub")
            .field("host", &self.host)
            .field("name", &self.name)
            .field("consumer_group", &self.name)
            .finish_non_exhaustive()
    }
}

/// Structure containing the configuaration of the application.
/// derive:
///     - Deserialize: can deserialize data to this structure.
///     - Clone: can be explicitly cloned if needed.
///     - Default: can be initialized empty. (mostly for testing)
///     - Debug: can be printed in debug output.
/// all the sub structure must implement or derive those trait;
#[derive(Deserialize, Clone, Default, Debug)]
pub struct Config {
    pub eventhub: EventHub,
}

impl Config {
    /// Create a new Config from the environment.
    ///
    /// It use the folowing environment variable:
    ///     - EVENTHUB_HOST (fully qualified namespace of the EventHub)
    ///     - EVENTHUB_NAME (name of the EventHub)
    /// the funcion then generates the credential from the default azure
    /// environment variables and use it to create a client that is added to
    /// the config.
    pub fn new() -> Result<Self> {
        let mut config: Config = Figment::new()
            .merge(
                Env::raw()
                .filter(|k| k.starts_with("EVENTHUB_"))
                .split("_")
            ).extract()?;
        let credential = DefaultAzureCredential::new()?;
        let client = ConsumerClient::new(
            config.eventhub.host.clone(),
            config.eventhub.name.clone(),
            Some(config.eventhub.consumer_group.clone()),
            credential,
            None,
        );
        config.eventhub.client = Some(Arc::new(client));
        debug!("Config: {config:?}");
        Ok(config)
    }
}

main.rs :

use anyhow::{bail, Result};
use tracing::info;
use futures::{pin_mut, StreamExt};
use tracing_subscriber::{fmt, EnvFilter};

mod config;
use config::Config;

async fn receive_events(config: &Config) -> Result<()>{
    let Some(ref client) = config.eventhub.client else {
        bail!("EventHub not correctly initialized!")
    };
    client.open().await.unwrap();
    info!("Connected to the event hub.");
    let event_stream = client
        .receive_events_on_partition(
            "0".to_string(),
            Some(
                azure_messaging_eventhubs::consumer::ReceiveOptions{
                    start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
                        location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
                        ..Default::default()
                    }),
                    ..Default::default()
                },
            ))
        .await;

    pin_mut!(event_stream);
    while let Some(event_result) = event_stream.next().await {
        match event_result {
            Ok(event) => {
                // Process the received event
                println!("Received event: {:?}", event);
            }
            Err(err) => {
                // Handle the error
                eprintln!("Error receiving event: {:?}", err);
            }
        }
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    fmt()
        .with_target(false)
        .with_level(true)
        .with_env_filter(EnvFilter::from_default_env())
        .init();
    let config = Config::new()?;
    receive_events(&config).await?;
    println!("funcion terminated");
    Ok(())
}

我使用的库:https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/eventhubs/azure_messaging_eventhubs/README.md

非常欢迎任何帮助。 预先感谢

azure function rust iot hub
1个回答
0
投票

我尝试了你的代码,由于

azure_messaging_eventhubs
包而发生了同样的错误。我建议切换到
azeventhubs

我参考了此文档,使用 Visual Studio Code 在 Rust 中创建了一个 Azure 函数。

此外,我在 Rust 中使用了这个 documentation 来实现

azeventhubs

下面的 Rust 代码使用 Warp 框架创建一个 HTTP 服务器,该服务器与 Azure 事件中心集成以基于 HTTP GET 请求发送消息。


use std::collections::HashMap;
use std::env;
use std::net::Ipv4Addr;
use warp::{http::Response, Filter};
use azeventhubs::producer::{EventHubProducerClient, EventHubProducerClientOptions, SendEventOptions};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
   
    let mut producer_client = EventHubProducerClient::new_from_connection_string(
        "<CONNECTION_STRING>",
        "<EVENT_HUB_NAME>".to_string(),
        EventHubProducerClientOptions::default()
    ).await?;

    let partition_ids = producer_client.get_partition_ids().await?;
    
    let example1 = warp::get()
        .and(warp::path("api"))
        .and(warp::path("HttpExample"))
        .and(warp::query::<HashMap<String, String>>())
        .map(|p: HashMap<String, String>| {
          
            if let Some(name) = p.get("name") {
                // Prepare and send event to the first partition
                let event = format!("Hello, {} from Warp Server!", name);
                let options = SendEventOptions::new().with_partition_id(&partition_ids[0]);
                tokio::spawn(async move {
                    if let Err(e) = producer_client.send_event(&event, options).await {
                        eprintln!("Error sending event: {}", e);
                    }
                });
                
                Response::builder().body(format!("Hello, {}. This HTTP triggered function executed successfully.", name))
            } else {
                Response::builder().body(String::from("This HTTP triggered function executed successfully. Pass a name in the query string for a personalized response."))
            }
        });
    let port_key = "FUNCTIONS_CUSTOMHANDLER_PORT";
    let port: u16 = match env::var(port_key) {
        Ok(val) => val.parse().expect("Custom Handler port is not a number!"),
        Err(_) => 3000,
    };
    warp::serve(example1).run((Ipv4Addr::LOCALHOST, port)).await;


    producer_client.close().await?;

    Ok(())
}


Output in function app

Out put of azurre event

© www.soinside.com 2019 - 2024. All rights reserved.