如何向异步函数添加轨迹/跨度?

问题描述 投票:0回答:1

我在向 Rust 项目中的特定函数添加跟踪时遇到问题。这是相关文件

// chain/ethereum/src/ingestor.rs

    #[tracing::instrument(skip(self), name = "PollingBlockIngestor::do_poll")]
    async fn do_poll(&self) -> Result<(), IngestorError> {

        // Get chain head ptr from store
        let head_block_ptr_opt = self.chain_store.cheap_clone().chain_head_ptr().await?;

        // To check if there is a new block or not, fetch only the block header since that's cheaper
        // than the full block. This is worthwhile because most of the time there won't be a new
        // block, as we expect the poll interval to be much shorter than the block time.
        info_span!("latest_block");
        let latest_block = self.latest_block().await?;

        if let Some(head_block) = head_block_ptr_opt.as_ref() {
            // If latest block matches head block in store, nothing needs to be done
            if &latest_block == head_block {
                return Ok(());
            }

            if latest_block.number < head_block.number {
                // An ingestor might wait or move forward, but it never
                // wavers and goes back. More seriously, this keeps us from
                // later trying to ingest a block with the same number again
                warn!(self.logger,
                    "Provider went backwards - ignoring this latest block";
                    "current_block_head" => head_block.number,
                    "latest_block_head" => latest_block.number);
                return Ok(());
            }
        }

        // Compare latest block with head ptr, alert user if far behind
        match head_block_ptr_opt {
            None => {
                info!(
                    self.logger,
                    "Downloading latest blocks from Ethereum, this may take a few minutes..."
                );
            }
            Some(head_block_ptr) => {
                let latest_number = latest_block.number;
                let head_number = head_block_ptr.number;
                let distance = latest_number - head_number;
                let blocks_needed = (distance).min(self.ancestor_count);
                let code = if distance >= 15 {
                    LogCode::BlockIngestionLagging
                } else {
                    LogCode::BlockIngestionStatus
                };
                if distance > 0 {
                    info!(
                        self.logger,
                        "Syncing {} blocks from Ethereum",
                        blocks_needed;
                        "current_block_head" => head_number,
                        "latest_block_head" => latest_number,
                        "blocks_behind" => distance,
                        "blocks_needed" => blocks_needed,
                        "code" => code,
                    );
                }
            }
        }

        let mut missing_block_hash = self.ingest_block(&latest_block.hash).await?;

        
        while let Some(hash) = missing_block_hash {
            missing_block_hash = self.ingest_block(&hash).await?;
        }
        Ok(())
    }

    async fn latest_block(&self) -> Result<BlockPtr, IngestorError> {
        info_span!("latest_block_header");
        self.eth_adapter
            .latest_block_header(&self.logger)
            .compat()
            .await
            .map(|block| block.into())
    }

// chain/ethereum/src/ethereum_adapter.rs

impl EthereumAdapterTrait for EthereumAdapter {

    #[tracing::instrument(skip_all, name = "testest")]
    fn latest_block_header(
        &self,
        logger: &Logger,
    ) -> Box<dyn Future<Item = web3::types::Block<H256>, Error = IngestorError> + Send> {
        let s = info_span!("latest_block_header in eth adapter");
        let web3 = self.web3.clone();
        Box::new(
            retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
                .no_limit()
                .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
                .run(move || {
                    let web3 = web3.cheap_clone();
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }
                })
                .map_err(move |e| {
                    e.into_inner().unwrap_or_else(move || {
                        anyhow!("Ethereum node took too long to return latest block").into()
                    })
                })
                .boxed()
                .compat(),
        )
    }
// lots of other functions
}

当我运行此命令并将跟踪发送给 Jaeger 时,我在

latest_block_header
上看不到任何一个跨度。我看到
PollingBlockIngestor::do_poll
latest_block
latest_block_header
,但没有看到
testest
latest_block_header in eth adapter

enter image description here

如何正确为

latest_block_header
函数创建跨度?

编辑:我还尝试修改异步移动,就像我在未来调用

instrument
一样,但这也不起作用。

                    let web3 = web3.cheap_clone();
                    let s = info_span!("latest_block_header in eth adapter");
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .instrument(s)
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }
rust trace opentracing rust-tracing
1个回答
0
投票

仅创建跨度并不会“输入”它。我不确定未输入的跨度是否意味着它在 Jaeger 中丢失(通过 opentelemetry),但无论如何......

要“输入”跨度,您需要:

  • 使用
    .enter()
    .entered()
    将返回 RAII 防护类型,一旦丢弃,该类型将退出跨度。两者之间的唯一区别只是跨度是被消耗还是只是被引用。
  • 使用
    .in_scope(|| { ... })
    将所附代码包装在该范围内。
  • .instrument(span)
     / 
    Future
    块上使用
    async { ... }
     扩展方法。当任务被轮询时,它将自动进入和退出。

您可能需要最后一个选项,因为它专门用于跟踪异步任务。

将其放在内部

async
块上意味着它仅覆盖代码的该部分,并且如果重新运行(假设这就是
retry(...)
可能会做的事情),您的“ latest_block_header”跨度。

use tracing::Instrument;

let s = info_span!("latest_block_header in eth adapter");
async move {
    ...
}.instrument(s)

如果您希望它覆盖整个

retry(...)
操作,您可以将其固定在末尾:

let s = info_span!("latest_block_header in eth adapter");

Box::new(
    retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
        .no_limit()
        .timeout_secs(...)
        .run(...)
        .map_err(...)
        .boxed()
        .compat()
        .instrument(s),
)
© www.soinside.com 2019 - 2024. All rights reserved.