将箭头数据转换为极坐标数据框

问题描述 投票:0回答:1

我正在尝试将从 clickhouse 查询返回的箭头数据传递到极坐标数据框。

我遇到的一个问题是尝试将 Arc 转换为 Box,这是 from_arrow 中的 dtype 参数之一。

将 Arrow 数据读入极坐标数据帧的有效方法是什么? 是否有一种本地方法可以读取极坐标中的箭头查询(类似于镶木地板/羽毛)?

这是一个类似的 stackoverflow 问题

尝试将“record_batches”向量读入数据帧。

use std::error::Error;
use arrow::ipc::reader::FileReader;
use std::io::Cursor;
use reqwest::Client as HttpClient;

// use clickhouse::Client;
// use polars::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    //columns().await?;
    factor_returns_arrow().await?;
    Ok(())
}

async fn factor_returns_arrow() -> Result<(), Box<dyn Error>> {
    let clickhouse_url = "http://localhost:8000"; 
    let query = "SELECT * FROM factor_returns FORMAT Arrow";
    let client = HttpClient::new();
    let response = client.get(clickhouse_url)
        .query(&[("query", query)])
        .send()
        .await?;
    
    if response.status().is_success() {
        let bytes = response.bytes().await?;
        let cursor = Cursor::new(bytes);
        let mut reader = FileReader::try_new(cursor, None)?;
        
        let schema = reader.schema();
        let fields = schema.fields();

        let _column_names = fields.iter().map(|field| field.name()).collect::<Vec<_>>();

        let mut record_batches = Vec::new(); // **<--- Reading into Polars**
        
        while let Some(batch_result) = reader.next() {
            let batch = batch_result?;
            record_batches.push(batch);
        }

        
        println!("{:?}", record_batches);
        
    } else {
        eprintln!("Error: {:?}", response.status());
    }
    
    Ok(())
}
rust clickhouse apache-arrow rust-polars
1个回答
0
投票

安装极坐标箭头后,我能够创建一个单独的函数来将箭头数据转换为极坐标数据框。

我使用了 Arrow RecordBatch 中类似的函数作为 Polars DataFrame

async fn factor_returns_arrow() -> Result<(), Box<dyn Error>> {
    let clickhouse_url = "http://localhost:8000"; 
    let query = "SELECT * FROM factor_exposures FORMAT Arrow";
    let client = HttpClient::new();
    let response = client.get(clickhouse_url)
        .query(&[("query", query)])
        .send()
        .await?;
    
    if response.status().is_success() {
        let bytes = response.bytes().await?;
        let cursor = Cursor::new(bytes);
        let mut reader = FileReader::try_new(cursor, None)?;
        
        let mut lazy_frames = Vec::new();
        
        while let Some(batch_result) = reader.next() {
            let batch = batch_result?;
            let df = record_batch_to_dataframe(&batch)?;
            lazy_frames.push(df.lazy());
        }
        
        if lazy_frames.len() > 1 {
            let combined_df = concat(lazy_frames, UnionArgs::default())?.collect()?;
            println!("{}", combined_df);

        } else if let Some(lazy_df) = lazy_frames.first() {
            let df = lazy_df.clone().collect()?;
            println!("{}", df);
        }
        
    } else {
        eprintln!("Error: {:?}", response.status());
    }
    
    Ok(())
}

fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
    let schema = batch.schema();
    let columns = batch
        .columns()
        .iter()
        .enumerate()
        .map(|(i, column)| {
        Series::from_arrow(schema.field(i).name(), column.clone().into())
    }).collect::<Result<Vec<_>, _>>();
    
    DataFrame::new(columns?)
}
© www.soinside.com 2019 - 2024. All rights reserved.