Klein应用程序延期

问题描述 投票:0回答:1

我正在探索克莱因和延期。在下面的示例中,我尝试使用子进程递增数字并通过Future返回它。我能够收到未来的回电。

问题是延迟对象从不调用cb()函数,并且对端点的请求永远不会返回。请帮我确定问题所在。

以下是我的server.py代码

from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue
import Process4

if __name__ == '__main__':
    app = Klein()

    @app.route('/visit')
    @inlineCallbacks
    def get_num_visit(request):        
        try:
            resp = yield Process4.get_visitor_num()
            req.setResponseCode(200)
            returnValue('Visited = {}'.format(resp))
        except Exception as e:
            req.setResponseCode(500)
            returnValue('error {}'.format(e))

    print('starting server')
    app.run('0.0.0.0', 5005)

以下是Process4.py代码

from multiprocessing import Process
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred

def foo(x):
    result = x+1
    sleep(3)
    return result


class MyProcess(Process):

    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
        self.f = Future()
        self.visit = 0

    def run(self):
        r = foo(self.visit)
        self.f.set_result(result=r)

def cb(result):
    print('visitor number {}'.format(result))
    return result

def eb(err):
    print('error occurred {}'.format(err))
    return err


def future_to_deferred(future):
    d = Deferred()

    def callback(f):
        e = f.exception()
        if e:
            d.errback(e)
        else:
            d.callback(f.result())

    future.add_done_callback(callback)
    return d

def get_visitor_num():
    p1 = MyProcess(target=foo, args=None)
    d = future_to_deferred(p1.f)
    p1.start()
    d.addCallback(cb)
    d.addErrback(eb)
    sleep(1)
    return d

编辑1

在启动进程之前添加回调p1解决了调用cb()函数的问题。但是仍然向端点发出的http请求仍未返回。

python twisted python-multiprocessing deferred concurrent.futures
1个回答
0
投票

事实证明,在run()方法中设置未来结果self.f.set_result(result = r)会触发子进程中的callback()方法,其中没有线程在等待返回结果!

因此,要获得在MainProcess中触发的callback()函数,我必须使用MainProcess中的工作线程使用多进程队列从子进程获取结果,然后设置将来的结果。

@ notorious.no感谢您的回复。我注意到的一件事是reactor.callFromThread在我的修改后的代码中将工作线程的结果切换到MainThread但是d.callback(f.result())工作得很好但是从工作线程返回结果。

以下是修改后的工作代码

server.朋友

from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue


import Process4

if __name__ == '__main__':
    app = Klein()
    visit_count = 0

    @app.route('/visit')
    @inlineCallbacks
    def get_num_visit(req):
        global visit_count
        try:
            resp = yield Process4.get_visitor_num(visit_count)
            req.setResponseCode(200)
            visit_count = resp
            returnValue('Visited = {}'.format(resp))
        except Exception as e:
            req.setResponseCode(500)
            returnValue('error {}'.format(e))

    print('starting server')
    app.run('0.0.0.0', 5005)

process4.朋友

from multiprocessing import Process, Queue
from concurrent.futures import Future
from time import sleep
from twisted.internet.defer import Deferred
import threading
from twisted.internet import reactor


def foo(x, q):
    result = x+1
    sleep(3)
    print('setting result, {}'.format(result))
    q.put(result)


class MyProcess(Process):

    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
        self.visit = 0

    def run(self):
        self.target(*self.args)


def future_to_deferred(future):
    d = Deferred()

    def callback(f):
        e = f.exception()
        print('inside callback {}'.format(threading.current_thread().name))
        if e:
            print('calling errback')
            d.errback(e)
            # reactor.callFromThread(d.errback, e)
        else:
            print('calling callback with result {}'.format(f.result()))
            # d.callback(f.result())
            reactor.callFromThread(d.callback, f.result())
    future.add_done_callback(callback)
    return d


def wait(q,f):
    r = q.get(block=True)
    f.set_result(r)


def get_visitor_num(x):

    def cb(result):
        print('inside cb visitor number {} {}'.format(result, threading.current_thread().name))
        return result

    def eb(err):
        print('inside eb error occurred {}'.format(err))
        return err

    f = Future()
    q = Queue()
    p1 = MyProcess(target=foo, args=(x,q,))

    wait_thread = threading.Thread(target=wait, args=(q,f,))
    wait_thread.start()

    defr = future_to_deferred(f)
    defr.addCallback(cb)
    defr.addErrback(eb)
    p1.start()
    print('returning deferred')
    return defr
© www.soinside.com 2019 - 2024. All rights reserved.