我尝试了使用以下实现提到here的方法:
from subprocess import PIPE, Popen
process = Popen(['/usr/bin/openssl', 'enc', '-aes-256-cbc', '-a', '-pass', 'pass:asdf'], stdin=PIPE, stdout=PIPE)
process.stdin.write('Hello this is it')
process.stdin.flush()
print(repr(process.stdout.readline()))
但它已经停留在readline()虽然我已经写过并且已经刷新了。我还尝试了一种非阻塞方法,提到here但是它也在readline()上阻塞。以下是我后续方法的代码:
import sys
import time
from subprocess import PIPE, Popen
from threading import Thread
from Queue import Queue, Empty
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
def write_to_stdin(process):
process.stdin.write(b'Hello this is it')
p = Popen(['/usr/bin/openssl', 'enc', '-aes-256-cbc', '-a', '-pass', 'pass:asdf'], stdin=PIPE, stdout=PIPE, bufsize=-1, close_fds=ON_POSIX)
q = Queue()
t2 = Thread(target=write_to_stdin, args=(p,))
t2.daemon = True
t2.start()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
try:
line = q.get(timeout=3) # or q.get(timeout=.1)
except Empty:
print('no output yet')
else:
print line
我没有输出作为输出。
唯一的工作方法是使用:
process.communicate
但这会关闭流程,我们必须重新打开流程。对于要加密的大量消息,这需要花费太多时间,我试图避免包含任何外部包来实现此任务。任何帮助将非常感谢谢谢。
用process.stdin.flush()
替换process.stdin.close()
,如下所示:
from subprocess import PIPE, Popen
process = Popen(['/usr/bin/openssl', 'enc', '-aes-256-cbc', '-a', '-pass', 'pass:asdf'], stdin=PIPE, stdout=PIPE)
process.stdin.write('Hello this is it')
process.stdin.close()
print(repr(process.stdout.readline()))
这个readline()
不再阻止。
解释是openssl enc
等待其标准输入结束,只有在调用进程关闭管道时才会到达。
使用ctypescrypto在linux中找到了另一种方法。这不需要任何额外的外部依赖(只需在代码中包含所有给定的源文件,因为许可证允许我们这样做)并且在性能方面它应该非常快,因为可用的函数是用C编译的。
在Linux上test.py我们必须替换:
crypto_dll = os.path.join(r'C:\Python24', 'libeay32.dll')
libcrypto = cdll.LoadLibrary(crypto_dll)
有:
from ctypes.util import find_library
crypto_dll = find_library('crypto') # In my case its 'libcrypto.so.1.0.0'
libcrypto = cdll.LoadLibrary(crypto_dll)
更改后,以下示例有效:
import cipher
from ctypes import cdll
from base64 import b64encode
from base64 import b64decode
libcrypto = cdll.LoadLibrary('libcrypto.so.1.0.0')
libcrypto.OpenSSL_add_all_digests()
libcrypto.OpenSSL_add_all_ciphers()
# Encryption
c = cipher.CipherType(libcrypto, 'AES-256', 'CBC')
ce = cipher.Cipher(libcrypto, c, '11111111111111111111111111111111', '1111111111111111', encrypt=True)
encrypted_text = b64encode(ce.finish("Four Five Six"))
print encrypted_text
# Decryption
cd = cipher.Cipher(libcrypto, c, '11111111111111111111111111111111', '1111111111111111', encrypt=False)
plain_text = cd.finish(b64decode(encrypted_text))
print plain_text