如何通过多个Java线程使用只读借来的Rust数据?

问题描述 投票:1回答:1

我有一个结构FooFooRef,它们引用了Foo中的数据:

struct Foo { /* ... */ }

struct FooRef<'foo> { /* ... */ }

impl Foo {
    pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}

现在Foo直接不能在逻辑中使用;我需要FooRef。创建FooRef需要大量的计算,因此在创建Foo实例后,我只需执行一次。 FooRef是不可变的;它仅用于读取数据。

多个线程需要访问此FooRef实例。我该如何实施?调用线程是Java线程,它将与JNI一起使用。例如,这可以防止使用作用域线程池。

[另一个麻烦是,当我必须刷新Foo实例以将新数据加载到其中时。然后,我还需要重新创建FooRef实例。

如何通过线程安全和内存安全实现?我尝试弄乱指针和RwLock,但是这导致了内存泄漏(每次重新加载时,内存使用量不断增加)。我是Java开发人员,是指针的新手。

Foo中的数据主要是文本,约为250Mb。 FooRef主要是str,是从str借用的Foo的结构。

我的Java使用说明

我在Java类中使用两个long变量来存储指向FooFooRef的指针。我使用静态ReentrantReadWriteLock保护这些指针。

如果需要在Foo中更新数据,则需要获取写锁定,删除FooRef,更新Foo,创建新的FooRef并更新Java中的ref指针。

[如果我需要读取数据(即,当我不更新Foo时,我将获得读取锁并使用FooRef。]]

仅当多个Java线程调用此代码时,内存泄漏才可见。

锈:

use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;

use std::collections::HashMap;

macro_rules! foo_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptr", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut Foo)
                }
            })
    };
}

macro_rules! foo_ref_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptrRef", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut FooRef)
                }
            })
    };
}

macro_rules! foo_mut {
    ($env: expr, $class: expr) => {
        foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
    };
}

macro_rules! foo_ref {
    ($env: expr, $class: expr) => {
        foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
    Box::into_raw(Box::new(Foo::default())) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
    let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
    let foo_ref = foo.create_ref();
    Box::into_raw(Box::new(foo_ref)) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
    let foo = foo_mut!(env, class).expect("foo must be initialized");
    *foo = Foo {
        data: vec!["hello".to_owned(); 1024 * 1024],
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
    drop_ptr(foo_mut_ptr!(env, class));
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
}

unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
    if let Some(ptr) = ptr {
        let _foo = Box::from_raw(ptr);
        // foo drops here
    }
}

#[derive(Default)]
struct Foo {
    data: Vec<String>,
}

#[derive(Default)]
struct FooRef<'a> {
    data: HashMap<&'a str, Vec<&'a str>>,
}

impl Foo {
    fn create_ref(&self) -> FooRef {
        let mut data = HashMap::new();
        for s in &self.data {
            let s = &s[..];
            data.insert(s, vec![s]);
        }
        FooRef { data }
    }
}

爪哇:

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

public class App implements AutoCloseable {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReadLock readLock = lock.readLock();
    private final WriteLock writeLock = lock.writeLock();

    private volatile long ptr;
    private volatile long ptrRef;
    private volatile boolean reload;

    static {
        System.loadLibrary("foo");
    }

    public static void main(String[] args) throws InterruptedException {
        try (App app = new App()) {
            for (int i = 0; i < 20; i++) {
                new Thread(() -> {
                    while (true) {
                        app.tryReload();
                    }
                }).start();
            }

            while (true) {
                app.setReload();
            }
        }
    }

    public App() {
        this.ptr = this.create();
    }

    public void setReload() {
        writeLock.lock();
        try {
            reload = true;
        } finally {
            writeLock.unlock();
        }
    }

    public void tryReload() {
        readLock.lock();
        debug("Got read lock");

        if (reload) {
            debug("Cache is expired");

            readLock.unlock();
            debug("Released read lock coz expired");

            writeLock.lock();
            debug("Got write lock");

            try {
                if (reload) {
                    fullReload();
                }

                readLock.lock();
                debug("Got read lock inside write");
            } finally {
                writeLock.unlock();
                debug("Released write lock");
            }
        }

        readLock.unlock();
        debug("Released read lock");
    }

    private void fullReload() {
        destroyRef();
        debug("Dropped ref");

        debug("Reloading");
        reload();
        debug("Reloading completed");

        updateRef();
        debug("Created ref");
        reload = false;
    }

    private void updateRef() {
        this.ptrRef = this.createRef();
    }

    private native void reload();

    private native long create();

    private native long createRef();

    private native void destroy();

    private native void destroyRef();

    @Override
    public void close() {
        writeLock.lock();
        try {
            this.destroy();
            this.ptrRef = 0;
            this.ptr = 0;
        } finally {
            writeLock.unlock();
        }
    }

    private static void debug(String s) {
        System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
    }

}

我有一个结构Foo和FooRef,它们引用了Foo中的数据:struct Foo {/ * ... * /} struct FooRef {/ * ... * /} impl Foo {pub fn create_ref(&'a self )-> ...

rust java-native-interface borrowing
1个回答
0
投票

我当时认为是内存泄漏的问题实际上不是内存泄漏。问题是分配器正在使用线程本地竞技场。因此,无论正在重新加载250MB数据的线程如何,都将保留分配的空间,而不会将其返回给系统。这个问题不是JNI特有的,而是在纯安全的rust代码中发生的。参见Why multiple threads using too much memory when holding Mutex

© www.soinside.com 2019 - 2024. All rights reserved.