如何在Clickhouse中集成C UDF?

问题描述 投票:0回答:1

有一个 C 语言的 UDF 主.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

extern char *u_shaped_processor(char* camps_list_c);

void replacechar(char *s,char c1,char c2) {
    int i=0;

    for(i=0;s[i];i++) {
        if(s[i]==c1) {
            s[i]=c2;
        }
        if (s[i]=='\n'){
            s[i] = '\0';
        }

    }

}

void str_replace(
    char *target,
    const char *needle,
    const char *replacement
) {
    char buffer[1024] = { 0 };
    char *insert_point = &buffer[0];
    const char *tmp = target;
    size_t needle_len = strlen(needle);
    size_t repl_len = strlen(replacement);

    while (1) {
        const char *p = strstr(tmp, needle);

        // walked past last occurrence of needle; copy remaining part
        if (p == NULL) {
            strcpy(insert_point, tmp);
            break;
        }

        // copy part before needle
        memcpy(insert_point, tmp, p - tmp);
        insert_point += p - tmp;

        // copy replacement string
        memcpy(insert_point, replacement, repl_len);
        insert_point += repl_len;

        // adjust pointers, move on
        tmp = p + needle_len;
    }

    // write altered string back to target
    strcpy(target, buffer);
} 

int main(){
    char *line = NULL;
    size_t len = 0;
    ssize_t read;
    int index = 0;
    int range = 0;
    line = (char*) malloc(len);

    while ((read = getline(&line, &len, stdin)) != -1){
        range = atoi(line);
        index = 0;
        while (index < range){
            line = (char*) malloc(len);
            read = getline(&line, &len, stdin);
            char *final_string = u_shaped_processor(line);
            str_replace(final_string, "\"", "\\\"");
            //replacechar(line, '"', "'");
            printf("{\"result\": \"%s\"} \n", final_string);
            ++index;
        }
        fflush(stdout);
        line = (char*) malloc(len);
        index = 0;
    }
    //fclose(fp);
    free(line);
    return 0;
}

函数.xml

<function>
    <type>executable_pool</type>
    <name>att_c</name>
    <execute_direct>1</execute_direct>
    <argument>
        <type>Array(Tuple(Nullable(String), Nullable(DateTime)))</type>
        <name>ts_list</name>
    </argument>
    <return_type>String</return_type>
    <return_name>result</return_name>
    <format>JSONEachRow</format>
    <send_chunk_header>1</send_chunk_header>
    <command>attribution_c</command>
</function>

查询:

SELECT att_c([tuple(CAST(number AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+1 AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+2 AS Nullable(String)), CAST(now() AS Nullable(DateTime))), tuple(CAST(number+5 AS Nullable(String)), CAST(now() AS Nullable(DateTime)))]) as att
FROM system.numbers LIMIT 10

本地运行结果:

1
{"ts_list":[["c1","2023-04-21 00:00:00"],["c2","2023-04-22 00:00:00"],["c3","2023-04-23 00:00:00"],["c4","2023-04-24 00:00:00"],["c5","2023-04-25 00:00:00"]]}
{"result": "{\"c4\":\"0.06666666666666667\",\"c2\":\"0.06666666666666667\",\"c5\":\"0.4\",\"c3\":\"0.06666666666666667\",\"c1\":\"0.4\"}"}

以上也是预期的结果 但我得到了 CH 错误:

Code: 1. DB::Exception: Function 'att_c': wrong result, expected 10 row(s), actual 0: while executing 'FUNCTION att_c(array(tuple(CAST(number, 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 1), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 2), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 5), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)'))) :: 2) -> att_c(array(tuple(CAST(number, 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 1), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 2), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')), tuple(CAST(plus(number, 5), 'Nullable(String)'), CAST(now(), 'Nullable(DateTime)')))) String : 7'. (UNSUPPORTED_METHOD) (version 22.7.1.2484 (official build))

问题出在外部函数上。带有外部 fn 注释的代码工作正常并返回所需的所有内容。 有没有一种方法可以通过在 clickhouse.txt 中打印行来进行调试。我在哪里可以找到这些日志?

c user-defined-functions clickhouse
1个回答
0
投票

这是我的 Rust 函数

use std::collections::HashMap;

use libc::c_char;
use std::ffi::CStr;
use std::str;
use std::ffi::CString;

use serde::{Deserialize, Serialize};


#[derive(Serialize, Deserialize)]
struct InputFormat {
    ts_list: Vec<(String, String)>,
}

extern {
    fn str_c() -> *const c_char;
}

#[no_mangle]
pub unsafe extern "C" fn u_shaped_processor(camps_list_c: *const c_char) ->  *const c_char {
    let mut camps_map: HashMap<String, f64> = HashMap::new();
    //convert c string to rust string
    let c_str = CStr::from_ptr(camps_list_c);
    let rust_str = c_str.to_str().expect("Bad encoding");
    let camps_list_str: String = rust_str.to_owned();

    let value: InputFormat = match serde_json::from_str(&camps_list_str) {
                Ok(value) => value,
                Err(why) => panic!("{:?}", why)
        };
    let camps = value.ts_list.len();
    if camps == 1 {
        let record = camps_map
            .entry((*value.ts_list[0].0).to_string())
            .or_insert(0.0);
        *record += 1.0;
    } else if camps == 2 {
        let record = camps_map
            .entry((*value.ts_list[0].0).to_string())
            .or_insert(0.0);
        *record += 0.5;
        let record = camps_map
            .entry((*value.ts_list[1].0).to_string())
            .or_insert(0.0);
        *record += 0.5;
    } else {
        let mid_weight = 0.2 / (camps - 2) as f64;
        for i in 0..camps {
            let entry = camps_map
                .entry((*value.ts_list[i].0).to_string())
                .or_insert(0.0);
            if i == 0 || i == camps - 1 {
                *entry += 0.4;
            } else {
                *entry += mid_weight;
            }
        }
    }
   let mut result = HashMap::new();
   result.insert(
        "result".to_string(), 
        serde_json::to_string(&camps_map).unwrap()
    );
   let mut final_map = HashMap::new();
   final_map.insert(
        "result".to_string(),
        serde_json::to_string(&result).unwrap()
    );
    let c_string = CString::new(final_map["result"]).expect("CString::new failed");
    return c_string.into_raw(); // Move ownership to C
}

/// # Safety
/// The ptr should be a valid pointer to the string allocated by rust
#[no_mangle]
pub unsafe extern fn free_string(ptr: *const c_char) {
    // Take the ownership back to rust and drop the owner
    let _ = CString::from_raw(ptr as *mut _);
}
© www.soinside.com 2019 - 2024. All rights reserved.