我正在使用Python框架Twisted编写一个FTP服务器。 Twisted 有自己的普通 FTP 实现 - 但它不支持 FTPS。我注意到大多数客户端连接后立即发出
AUTH TLS
命令,请求加密的 FTPS 连接。如果服务器响应不支持该命令,它们就会断开连接。
有一些第三方库实现显式 FTPS 服务器(即客户端立即通过 FTPS 连接),例如 this one - 但这不是我需要的。我需要隐式 FTPS 支持 - 即,当收到
AUTH TLS
命令时,从 FTP 连接内切换到 TLS 连接。
有什么想法可以做到这一点吗?
好吧,我混淆了 FTPS 类型;我实际上需要显式的 FTPS(即
AUTH TLS
命令的处理);感谢 Martin Prikryl 指出了这一点。
同时,我想出了如何解决我的问题。请注意,对于 FTPS,您需要一个证书(假设存储在下面代码中的文件
server.pem
中) - 特别是,如果您要将其与 Twisted 一起使用,则它必须包含 SAN 扩展。有关如何创建包含 SAN 扩展的自签名证书的信息,请参阅本指南(备份链接)。
这是实现我想要的基本代码:
from os.path import isfile
from sys import version_info
from twisted.internet.ssl import PrivateCertificate
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet.reactor import listenTCP, run
from twisted.python.log import msg
if version_info[0] >= 3:
def decode(x):
return x.decode('utf-8', errors='ignore')
def encode(x):
return x.encode()
else:
def decode(x):
return x
def encode(x):
return x
class MyFTPServer(LineReceiver):
def __init__(self, options):
self.options = options
self.is_fttps = False
def connectionMade(self):
self.transport.write(b'220 MyFTP server\r\n')
def lineReceived(self, line):
line = decode(line)
parts = line.split(None, 1)
if parts:
command = parts[0].upper()
args = parts[1] if len(parts) > 1 else ''
self.process_command(command, args)
def connectionLost(self, reason):
self.is_fttps = False
def process_command(self, command, args):
if command == 'AUTH':
if len(args) == 0:
self.transport.write(b'504 AUTH requires at least one argument\r\n')
elif args.upper().strip() not in ['TLS', 'TLS-C', 'SSL', 'TLS-P']:
self.transport.write(b'500 AUTH not understood\r\n')
elif self.is_fttps:
self.transport.write(b'200 User is already authenticated.\r\n')
elif self.options is not None:
self.transport.write(b'234 AUTH TLS successful\r\n')
self.transport.startTLS(self.options)
self.is_fttps = True
else:
self.transport.write(b'500 AUTH not understood\r\n')
elif command == 'CCC':
if not self.is_fttps:
self.transport.write(b'533 Command channel is alredy cleared\r\n')
else:
self.transport.write(b'200 Clear Command Channel OK\r\n')
self.transport.stopTLS()
self.is_fttps = False
# elif command == '...':
# # Process other commands
else:
self.transport.write(b'500 ' + encode(command) + b' not understood\r\n')
class MyFTPFactory(ServerFactory):
def __init__(self, certfile):
factory_options = None
if isfile(certfile):
cert_data = ''
try:
with open(certfile) as f:
cert_data += f.read()
except OSError:
msg('Could not read the file "{}".'.format(certfile))
if cert_data:
factory_options = PrivateCertificate.loadPEM(cert_data).options()
self.options = factory_options
def buildProtocol(self, addr):
return MyFTPServer(self.options)
def main():
certfile = 'server.pem'
listenTCP(21, MyFTPFactory(certfile))
run()
msg('Shutdown requested, exiting...')
if __name__ == '__main__':
main()