多处理Gremlin "OSError: [Errno 9] 坏的文件描述符"

问题描述 投票:0回答:1

我想用以下方法计算图中每个顶点的特征 gremlinpython. 如果按顺序迭代每一个顶点,速度太慢了。虽然批处理可以帮助提供一个速度,但我想先试试将查询解析化。

大致上,1.得到完整的顶点集,2.将它们分割在num_cores=x上,3.并行迭代每个子顶点集。

但是我得到的错误是 "OSError: [Errno 9] Bad file descriptor"。下面的代码是我解决这个问题的最新尝试。

import multiprocessing
from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import lt


def create_traversal_object():
    graph = Graph()
    g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
    return g

g = create_traversal_object()

num_cores = 1
vertex_lsts = np.array_split(g.V().limit(30).id().toList(), num_cores)


class FeatureClass():

    def __init__(self, g, vertex_list):
        self.g = g
        self.vertex_list = vertex_list

    def orchestrator(self):
        for vertex_id in self.vertex_list:
            self.compute_number_of_names(float(vertex_id))

    def get_names(self, vertex_id):
        return self.g.V(vertex_id).inE().values('benef_nm').dedup().toList()


class Simulation(multiprocessing.Process):
    def __init__(self, id, worker, *args, **kwargs):
        # must call this before anything else
        multiprocessing.Process.__init__(self)
        self.id = id
        self.worker = worker
        self.args = args
        self.kwargs = kwargs
        sys.stdout.write('[%d] created\n' % (self.id))

    def run(self):
        sys.stdout.write('[%d] running ...  process id: %s\n' % (self.id, os.getpid()))
        self.worker.orchestrator()
        sys.stdout.write('[%d] completed\n' % (self.id))

list_of_objects = [FeatureClass(create_traversal_object(), vertex_lst) for vertex_lst in vertex_lsts]
list_of_sim = [Simulation(id=k, worker=obj) for k, obj in enumerate(list_of_objects)]  

for sim in list_of_sim:
    sim.start()

下面是完整的堆栈跟踪,看起来是以下问题的问题 tornado,其中 gremlinpython 的使用。

Process Simulation-1:
Traceback (most recent call last):
  File "/Users/greatora/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "<ipython-input-4-b3177477fabe>", line 42, in run
    self.worker.orchestrator()
  File "<ipython-input-4-b3177477fabe>", line 23, in orchestrator
    self.compute_number_of_names(float(vertex_id))
  File "<ipython-input-4-b3177477fabe>", line 26, in compute_number_of_names
    print(self.g.V(vertex_id).inE().values('benef_nm').dedup().count().next())
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 88, in next
    return self.__next__()
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 47, in __next__
    self.traversal_strategies.apply_strategies(self)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/process/traversal.py", line 512, in apply_strategies
    traversal_strategy.apply(traversal)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/remote_connection.py", line 148, in apply
    remote_traversal = self.remote_connection.submit(traversal.bytecode)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/driver_remote_connection.py", line 53, in submit
    result_set = self._client.submit(bytecode)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/client.py", line 108, in submit
    return self.submitAsync(message, bindings=bindings).result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/connection.py", line 63, in cb
    f.result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/greatora/anaconda3/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/protocol.py", line 74, in write
    self._transport.write(message)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/gremlin_python/driver/tornado/transport.py", line 37, in write
    lambda: self._ws.write_message(message, binary=True))
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py", line 453, in run_sync
    self.start()
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py", line 863, in start
    event_pairs = self._impl.poll(poll_timeout)
  File "/Users/greatora/anaconda3/lib/python3.6/site-packages/tornado/platform/kqueue.py", line 66, in poll
    kevents = self._kqueue.control(None, 1000, timeout)
OSError: [Errno 9] Bad file descriptor

我使用的是Pythton3.7,gremlinpython==3.4.6,MacOS。

python tornado gremlin janusgraph gremlinpython
1个回答
0
投票

我仍然不完全确定问题出在哪里,但这个可以用。

import multiprocessing
from multiprocessing import Pool
import itertools

def graph_function(vertex_id_list):

    graph = Graph()
    g = graph.traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))

    res = []
    for vertex_id in vertex_id_list:
        res.append(g.V(str(vertex_id)).inE().values('benef_nm').dedup().toList())
    return res



num_cores = 4
vertex_lst = g.V().limit(30).id().toList()
vertex_lsts = np.array_split(vertex_lst, num_cores)
with Pool(processes=num_cores) as pool:
    results = pool.map(graph_function, vertex_lsts)
    results = [*itertools.chain.from_iterable(results)]
© www.soinside.com 2019 - 2024. All rights reserved.