我很新的Python和一般的多线程编程。基本上,我有一个脚本,将文件复制到另一个位置。我想这是放置在另一个线程,所以我可以输出....
,表明脚本仍在运行。
那我遇到的问题是,如果这些文件无法复制它会抛出异常。如果在主线程中运行这是确定的;然而,具有下面的代码无法正常工作:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身,我试图重新抛出异常,但它不工作。我看到有人在这里问类似的问题,但他们似乎都做的比我试图做(我不太理解提供的解决方案)更具体的东西。我看到有人提到sys.exc_info()
的用法,但是我不知道在哪里或如何使用它。
所有的帮助感激!
编辑:对于线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
问题是,thread_obj.start()
立即返回。你催生子线程执行自己的上下文,有它自己的堆栈。出现有任何的例外是在子线程的上下文,它是在它自己的堆栈。我现在能想到的一种方式来传达这个信息给父线程是使用某种消息传递的,所以你可能会考虑这样做。
试试这个关于大小:
import sys
import threading
import Queue
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception('An error occured here.')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = Queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except Queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == '__main__':
main()
我知道我有点迟到了这里,但我有一个非常类似的问题,但它包括了使用Tkinter的作为GUI和主循环,不可能使用任何依赖。加入()的解决方案。因此,我在改编原来的问题的编辑给出的解决方案,但变得更加普遍,使之更容易理解别人。
下面是使用中的新线程类:
import threading
import traceback
import logging
class ExceptionThread(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except Exception:
logging.error(traceback.format_exc())
def test_function_1(input):
raise IndexError(input)
if __name__ == "__main__":
input = 'useful'
t1 = ExceptionThread(target=test_function_1, args=[input])
t1.start()
当然,你可以总是有它处理异常一些其他的方式,从伐木,如打印出来,或有它输出到控制台。
这使您可以使用ExceptionThread类酷似你会Thread类,没有任何特殊的修改。
一种方法是我喜欢的是基于observer pattern。我定义它我的线程使用发射例外听众的信号类。它也可以用来从线程返回值。例:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == '__main__':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError('a failed')
def b():
time.sleep(2)
return 'b returns'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print 'Threads started'
t.join()
t2.join()
print 'Done'
我没有使用线程声称这是一个完全安全的方法的足够的经验。但它很适合我,我喜欢的灵活性。
使用裸节选是不是一个好的做法,因为你通常捕获到比你讨价还价更多。
我建议修改except
只捕获你想处理异常。我不认为提高它预期的效果,因为当你走在外侧TheThread
实例try
,如果它抛出一个异常,分配永远不会发生。
相反,你可能只想警告就可以了,继续前进,比如:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
然后,当异常被抓住了,你可以在那里处理它。然后,当外try
捕获来自TheThread
一个例外,你知道它会不会是你已经处理了一个,并会帮助您找出您的工艺流程。
捕获线程的异常,并传送回给调用者方法的一个简单的方法可能是通过使词典或列表worker
方法。
实施例(通过字典来辅助方法):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err
shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj['err']:
print(">>%s" % shared_obj['err'])
包装与贮存例外线程。
import threading
import sys
class ExcThread(threading.Thread):
def __init__(self, target, args = None):
self.args = args if args else []
self.target = target
self.exc = None
threading.Thread.__init__(self)
def run(self):
try:
self.target(*self.args)
raise Exception('An error occured here.')
except Exception:
self.exc=sys.exc_info()
def main():
def hello(name):
print(!"Hello, {name}!")
thread_obj = ExcThread(target=hello, args=("Jack"))
thread_obj.start()
thread_obj.join()
exc = thread_obj.exc
if exc:
exc_type, exc_obj, exc_trace = exc
print(exc_type, ':',exc_obj, ":", exc_trace)
main()
该concurrent.futures
模块可以很方便地做到在单独的线程(或程序)的工作,并处理由此产生的任何异常:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
concurrent.futures
包括在Python 3.2,并且可作为the backported futures
module早期版本。
虽然不可能直接赶在不同的线程抛出一个异常,这里有一个代码,相当透明获得某物非常接近这个功能。你的子线程必须继承的ExThread
类而不是threading.Thread
和等待线程完成其工作,当父线程必须改为调用child_thread.join_with_exception()
的child_thread.join()
方法。
这种实现的技术细节:当子线程抛出一个异常,它是通过一个Queue
传递给家长和父线程再次抛出。请注意,有没有忙等待的这种做法。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
if __name__ == '__main__':
main()
如果在一个线程中发生了异常,最好的办法是join
过程中调用程序线程重新提出。您可以获取有关当前正在使用sys.exc_info()
功能处理的异常信息。此信息可以简单地被存储为螺纹对象的属性,直到join
被调用时,它指向可以重新升高处。
请注意,Queue.Queue
(在其他的答案建议)没有必要在该线程抛出至多异常并抛出异常后马上完成这个简单的例子。我们只需等待线程完成避免竞争状态。
例如,延伸ExcThread
(下图),覆盖excRun
(代替run
)。
Python的2.ch:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don't rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python的x.ch:
对于raise
3论证形式消失在Python 3,所以更改最后一行:
raise new_exc.with_traceback(self.exc[2])
有很多真正古怪复杂,这个问题的答案的。我是简单化这一点,因为这似乎足以满足大多数东西给我。
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self):
super(PropagatingThread, self).join()
if self.exc:
raise self.exc
return self.ret
如果您确定你永远只能在一个或另一个版本的Python运行,你可以下来减少的run()
方法只是错位版本(如果您只需要Python的版本3之前运行)或者只是干净的版本(如果你只与3开始的Python版本中运行)。
实例:
def f(*args, **kwargs)
print(args)
print(kwargs)
raise Exception('I suck')
t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()
你会看到所提出的其他线程时加入例外。
这是一个讨厌的小问题,我希望把我的解决方案中的一些其他的解决方案,我发现(async.io例如)看起来前途无量,但也提出了一个黑盒的。队列/事件循环的方式排序的领带,你一定实现。 The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend。这让我很容易地解决我的问题:没有太多的设置创建特设工作线程,并能够捕捉异常在主线程。
我的解决方案采用了并发期货API和线程API。它允许你创建工作,让你的线程和未来同在。这样一来,就可以加入线程等待的结果:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
...或者你可以让做当工人只发送一个回调:
worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))
...或者你可以循环,直到事件结束:
worker = Worker(test)
thread = worker.start()
while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print('exception?', exc)
result = worker.future.result()
print('result', result)
break
time.sleep(0.25)
下面的代码:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
...和测试功能:
def test(*args):
print('args are', args)
time.sleep(2)
raise Exception('foo')
concurrent.futures.as_completed
https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed
以下解决方案:
Queue
,除了其他添加在你的工作线程资源:
#!/usr/bin/env python3
import concurrent.futures
import time
def func_that_raises(do_raise):
for i in range(3):
print(i)
time.sleep(0.1)
if do_raise:
raise Exception()
for i in range(3):
print(i)
time.sleep(0.1)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(executor.submit(func_that_raises, False))
futures.append(executor.submit(func_that_raises, True))
for future in concurrent.futures.as_completed(futures):
print(repr(future.exception()))
可能的输出:
0
0
1
1
2
2
0
Exception()
1
2
None
这是不幸的是没有可能杀死期货取消别人为一个失败:
concurrent.features
; Python: concurrent.futures How to make it cancelable?threading
:Is there any way to kill a Thread?如果你是这样的:
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
那么with
捕获它,并等待第二个线程继续之前完成。以下行为与此类似:
for future in concurrent.futures.as_completed(futures):
future.result()
因为future.result()
重新引发异常,如果发生一个。
如果你想退出整个Python过程中,你可能会逃脱os._exit(0)
,但是这可能意味着你需要重构。
具有完善的异常语义自定义类
我结束了在编码完美的界面为自己:The right way to limit maximum number of threads running at once?节“队列例如错误处理”。这个类的目标是既方便,让您对提交和结果/错误处理总量控制。
测试在Python的3.6.7,Ubuntu的18.04。
作为一个noobie到线程,我花了很长的时间来了解如何实现Mateusz科博斯的代码(上图)。这里有一个明确的版本,以帮助了解如何使用它。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
# This overrides the "run_with_exception" from class "ExThread"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print "sleeping 5 seconds"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int("str")
# I'm honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
# thread_name = threading.current_thread().name
# raise MyException("An error in thread '{}'.".format(thread_name))
if __name__ == '__main__':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print ("Notice 't.start()' is considered to have completed, although"
" the countdown continues in its new thread. So you code "
"can tinue into new processing.")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace "Exception" with specific error (untested)
except Exception, e:
print
print "Exceptioon was caught and control passed back to the main thread"
print "Do some handling here...or raise a custom exception "
thread_name = threading.current_thread().name
e = ("Caught a MyException in thread: '" +
str(thread_name) +
"' [" + str(e) + "]")
raise Exception(e) # Or custom class of exception, such as MyException
类似的方式像RickardSjogren的没有队列,SYS等也没有一些听众信号:直接执行,其对应于一个不同的块中的异常处理程序。
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
仅self._callback和除块中的run()是附加到正常threading.Thread。