我已经四处寻找这个问题的答案,但我似乎只能找到可以为您做到这一点的软件。有谁知道如何在 python 中做到这一点?
我编写了一段 Python 代码,用于根据 .torrent 文件 中的内容验证 下载文件 的哈希值。假设您想检查下载内容是否损坏,您可能会发现这很有用。
您需要 bencode 包 才能使用它。 Bencode 是 .torrent 文件中使用的序列化格式。它可以像 JSON 一样整理列表、字典、字符串和数字。
代码采用
info['pieces']
字符串中包含的哈希值:
torrent_file = open(sys.argv[1], "rb")
metainfo = bencode.bdecode(torrent_file.read())
info = metainfo['info']
pieces = StringIO.StringIO(info['pieces'])
该字符串包含一系列 20 字节哈希值(每个部分一个)。然后将这些哈希值与磁盘上文件片段的哈希值进行比较。
此代码唯一复杂的部分是处理多文件 torrent,因为单个 torrent piece 可以跨越多个文件(BitTorrent 在内部将多文件下载视为单个连续文件)。我正在使用生成器函数
pieces_generator()
将其抽象出来。
您可能需要阅读 BitTorrent 规范 以更详细地了解这一点。
完整代码如下:
import sys, os, hashlib, StringIO, bencode
def pieces_generator(info):
"""Yield pieces from download file(s)."""
piece_length = info['piece length']
if 'files' in info: # yield pieces from a multi-file torrent
piece = ""
for file_info in info['files']:
path = os.sep.join([info['name']] + file_info['path'])
print path
sfile = open(path.decode('UTF-8'), "rb")
while True:
piece += sfile.read(piece_length-len(piece))
if len(piece) != piece_length:
sfile.close()
break
yield piece
piece = ""
if piece != "":
yield piece
else: # yield pieces from a single file torrent
path = info['name']
print path
sfile = open(path.decode('UTF-8'), "rb")
while True:
piece = sfile.read(piece_length)
if not piece:
sfile.close()
return
yield piece
def corruption_failure():
"""Display error message and exit"""
print("download corrupted")
exit(1)
def main():
# Open torrent file
torrent_file = open(sys.argv[1], "rb")
metainfo = bencode.bdecode(torrent_file.read())
info = metainfo['info']
pieces = StringIO.StringIO(info['pieces'])
# Iterate through pieces
for piece in pieces_generator(info):
# Compare piece hash with expected hash
piece_hash = hashlib.sha1(piece).digest()
if (piece_hash != pieces.read(20)):
corruption_failure()
# ensure we've read all pieces
if pieces.read():
corruption_failure()
if __name__ == "__main__":
main()
以下是我如何从 torrent 文件中提取 HASH 值:
#!/usr/bin/python
import sys, os, hashlib, StringIO
import bencode
def main():
# Open torrent file
torrent_file = open(sys.argv[1], "rb")
metainfo = bencode.bdecode(torrent_file.read())
info = metainfo['info']
print hashlib.sha1(bencode.bencode(info)).hexdigest()
if __name__ == "__main__":
main()
与运行命令相同:
transmissioncli -i test.torrent 2>/dev/null | grep "^hash:" | awk '{print $2}'
希望,有帮助:)
如果有人想知道如何从 BitTorrent v2 兼容种子中提取文件哈希值,您可以使用此命令行工具。
根据已接受的答案更新了 python3 的脚本。还添加了进度并且不需要外部库。
import sys, os, hashlib, io, re
decimal_match = re.compile(r'\d')
def bdecode(data):
'''Main function to decode bencoded data'''
chunks = [i.to_bytes() for i in data]
print(chunks[:100])
chunks.reverse()
root = _dechunk(chunks)
return root
def _dechunk(chunks):
item = chunks.pop()
#print(item)
if item == b'd':
item = chunks.pop()
hash = {}
while item != b'e':
chunks.append(item)
key = _dechunk(chunks)
hash[key.decode("utf-8")] = _dechunk(chunks)
item = chunks.pop()
return hash
elif item == b'l':
item = chunks.pop()
list = []
while item != b'e':
chunks.append(item)
list.append(_dechunk(chunks))
item = chunks.pop()
return list
elif item == b'i':
item = chunks.pop()
num = b''
while item != b'e':
num += item
item = chunks.pop()
return int(num)
elif decimal_match.search(item.decode("utf-8")):
num = b''
while decimal_match.search(item.decode("utf-8")):
num += item
item = chunks.pop()
line = b''
for i in range(int(num)):
line += chunks.pop()
return line
print(item)
raise ValueError("Invalid input!")
currentTestedPath = ""
def pieces_generator(info):
global currentTestedPath
"""Yield pieces from download file(s)."""
piece_length = info['piece length']
if 'files' in info: # yield pieces from a multi-file torrent
piece = b""
for file_info in info['files']:
path = os.sep.join([info['name'].decode("utf-8")] + [p.decode("utf-8") for p in file_info['path']])
currentTestedPath = path
print(currentTestedPath)
sfile = open(path, "rb")
while True:
piece += sfile.read(piece_length-len(piece))
if len(piece) != piece_length:
sfile.close()
break
yield piece
piece = b""
if piece != b"":
yield piece
else: # yield pieces from a single file torrent
path = info['name'].decode("utf-8")
print(path)
sfile = open(path.decode('UTF-8'), "rb")
while True:
piece = sfile.read(piece_length)
if not piece:
sfile.close()
return
yield piece
def corruption_failure():
"""Display error message and exit"""
print("download corrupted")
exit(1)
def main():
# Open torrent file
torrent_file = open(sys.argv[1], "rb")
metainfo = bdecode(torrent_file.read())
print(metainfo)
info = metainfo['info']
pieces = io.BytesIO(info['pieces'])
# Iterate through pieces
nbTestedPieces = 0
for piece in pieces_generator(info):
nbTestedPieces += 1
# Compare piece hash with expected hash
piece_hash = hashlib.sha1(piece).digest()
#if nbTestedPieces%100 == 0:
print("Testing hash "+piece_hash.hex()+" for '"+currentTestedPath+"' ("+str(nbTestedPieces)+"/"+str(int(len(info['pieces'])/20))+")")
if (piece_hash != pieces.read(20)):
corruption_failure()
# ensure we've read all pieces
if pieces.read():
corruption_failure()
if __name__ == "__main__":
main()