我正在序列化列数据,然后通过套接字连接发送它。就像是:
import array, struct, socket
## Socket setup
s = socket.create_connection((ip, addr))
## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
## Binarize data
columns['col1'] = array.array('i', range(10000))
columns['col2'] = array.array('f', [float(num) for num in range(10000)])
.
.
.
## Send away
chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
s.sendall(chunk)
s.recv(1000) #get confirmation
我希望将计算与发送分开,将它们放在单独的线程或进程上,这样我就可以在数据被发送时继续进行计算。
我把二值化部分作为生成器函数,然后将生成器发送到一个单独的线程,然后通过队列产生二进制块。
我从主线程中收集了数据并将其发送出去。就像是:
import array, struct, socket
from time import sleep
try:
import thread
from Queue import Queue
except:
import _thread as thread
from queue import Queue
## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()
def binarize(num_of_chunks):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
.
.
yield b''.join((columns[col_name] for col_name in ordered_col_list))
def chunk_yielder(queue):
''' Generate binary chunks and put them on a queue. To be used from a thread '''
while True:
try:
data_gen = queue.get_nowait()
except:
sleep(0.1)
continue
else:
for chunk in data_gen:
queue.put(chunk)
## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)
## Get data back and send away
while True:
try:
binary_chunk = queue.get_nowait()
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) #Get confirmation
但是,我没有看到和性能改进 - 它没有更快的工作。
我不太了解线程/进程,我的问题是,是否有可能(在所有和Python中)从这种类型的分离中获益,以及使用线程或者线程或者什么是好的方法。处理(或任何其他方式 - 异步等)。
编辑:
据我所知 -
socket.send()
发送应释放GIL因此,我认为(如果我弄错了,请纠正我),线程解决方案是正确的方法。但是我不确定如何正确地做到这一点。
我知道cython可以释放线程的GIL,但由于其中一个只是socket.send / recv,我的理解是它不应该是必要的。
在Python中有两个并行运行的选项,可以使用multiprocessing
(docs)库,也可以在cython
中编写并行代码并释放GIL。一般而言,后者的工作量大得多,而且适用性较差。
Python线程受Global Interpreter Lock(GIL)的限制,我不会在这里详细介绍,因为你会在网上找到足够的信息。简而言之,顾名思义,GIL是CPython解释器中的全局锁,它确保多个线程不会同时修改在所述解释器范围内的对象。这就是为什么,例如,cython
程序可以并行运行代码,因为它们可以存在于GIL之外。
至于你的代码,一个问题是你在GIL中同时运行数字运算(binarize
)和socket.send
,这将严格按顺序运行它们。 queue
也非常奇怪,并且有一个NameError
,但让我们把它们放在一边。
考虑到Jeremy Friesner已经指出的警告,我建议你按照以下方式重新构造代码:你有两个进程(不是线程)一个用于二进制数据,另一个用于发送数据。除此之外,还有启动两个子进程的父进程,以及将子进程1连接到子进程2的队列。
socket.send
在代码中,设置看起来像
from multiprocessing import Process, Queue
work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()
binarize
可以保留在你的代码中,除了最后代替yield
,你将元素添加到队列中
def binarize(num_of_chunks, q):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
data = b''.join((columns[col_name] for col_name in ordered_col_list))
q.put(data)
send_data
应该只是代码底部的while
循环,具有连接打开/关闭功能
def send_data(ip, addr, q):
s = socket.create_connection((ip, addr))
while True:
try:
binary_chunk = q.get(False)
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) # Get confirmation
# maybe remember to close the socket before killing the process
现在,您有两个(实际上是三个,如果您计算父级)独立处理数据的进程。您可以通过将队列的max_size
设置为单个元素来强制这两个进程同步其操作。这两个独立进程的操作也很容易通过计算机上的进程管理器监控top
(Linux),Activity Monitor
(OsX),不记得在Windows下调用它的内容。
最后,Python 3提供了使用协同例程的选项,这些例程既不是进程也不是线程,而是完全不同的东西。从CS的角度来看,协同程序非常酷,但最初有点令人头疼。虽然有很多资源需要学习,比如媒体上的this帖子和David Beazley的this讲话。
更一般地说,如果您还不熟悉生产者/消费者模式,则可能需要查看生产者/消费者模式。
如果您尝试使用并发来提高CPython的性能,我强烈建议使用多处理库而不是多线程。这是因为GIL(Global Interpreter Lock)会对执行速度产生巨大影响(在某些情况下,它可能会导致代码运行速度比单线程版本慢)。另外,如果您想了解有关此主题的更多信息,我建议您阅读this presentation by David Beazley。多处理通过为每个进程生成一个新的Python解释器实例来绕过此问题,从而使您可以充分利用多核架构。