我正在尝试将从 clickhouse 查询返回的箭头数据传递到极坐标数据框。
我遇到的一个问题是尝试将 Arc 转换为 Box,这是 from_arrow 中的 dtype 参数之一。
将 Arrow 数据读入极坐标数据帧的有效方法是什么? 是否有一种本地方法可以读取极坐标中的箭头查询(类似于镶木地板/羽毛)?
这是一个类似的 stackoverflow 问题。
尝试将“record_batches”向量读入数据帧。
use std::error::Error;
use arrow::ipc::reader::FileReader;
use std::io::Cursor;
use reqwest::Client as HttpClient;
// use clickhouse::Client;
// use polars::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
//columns().await?;
factor_returns_arrow().await?;
Ok(())
}
async fn factor_returns_arrow() -> Result<(), Box<dyn Error>> {
let clickhouse_url = "http://localhost:8000";
let query = "SELECT * FROM factor_returns FORMAT Arrow";
let client = HttpClient::new();
let response = client.get(clickhouse_url)
.query(&[("query", query)])
.send()
.await?;
if response.status().is_success() {
let bytes = response.bytes().await?;
let cursor = Cursor::new(bytes);
let mut reader = FileReader::try_new(cursor, None)?;
let schema = reader.schema();
let fields = schema.fields();
let _column_names = fields.iter().map(|field| field.name()).collect::<Vec<_>>();
let mut record_batches = Vec::new(); // **<--- Reading into Polars**
while let Some(batch_result) = reader.next() {
let batch = batch_result?;
record_batches.push(batch);
}
println!("{:?}", record_batches);
} else {
eprintln!("Error: {:?}", response.status());
}
Ok(())
}
安装极坐标箭头后,我能够创建一个单独的函数来将箭头数据转换为极坐标数据框。
我使用了 Arrow RecordBatch 中类似的函数作为 Polars DataFrame。
async fn factor_returns_arrow() -> Result<(), Box<dyn Error>> {
let clickhouse_url = "http://localhost:8000";
let query = "SELECT * FROM factor_exposures FORMAT Arrow";
let client = HttpClient::new();
let response = client.get(clickhouse_url)
.query(&[("query", query)])
.send()
.await?;
if response.status().is_success() {
let bytes = response.bytes().await?;
let cursor = Cursor::new(bytes);
let mut reader = FileReader::try_new(cursor, None)?;
let mut lazy_frames = Vec::new();
while let Some(batch_result) = reader.next() {
let batch = batch_result?;
let df = record_batch_to_dataframe(&batch)?;
lazy_frames.push(df.lazy());
}
if lazy_frames.len() > 1 {
let combined_df = concat(lazy_frames, UnionArgs::default())?.collect()?;
println!("{}", combined_df);
} else if let Some(lazy_df) = lazy_frames.first() {
let df = lazy_df.clone().collect()?;
println!("{}", df);
}
} else {
eprintln!("Error: {:?}", response.status());
}
Ok(())
}
fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
let schema = batch.schema();
let columns = batch
.columns()
.iter()
.enumerate()
.map(|(i, column)| {
Series::from_arrow(schema.field(i).name(), column.clone().into())
}).collect::<Result<Vec<_>, _>>();
DataFrame::new(columns?)
}