使用Twisted实现隐式FTPS服务器

问题描述 投票:0回答:1

我正在使用Python框架Twisted编写一个FTP服务器。 Twisted 有自己的普通 FTP 实现 - 但它不支持 FTPS。我注意到大多数客户端连接后立即发出

AUTH TLS
命令,请求加密的 FTPS 连接。如果服务器响应不支持该命令,它们就会断开连接。

有一些第三方库实现显式 FTPS 服务器(即客户端立即通过 FTPS 连接),例如 this one - 但这不是我需要的。我需要隐式 FTPS 支持 - 即,当收到

AUTH TLS
命令时,从 FTP 连接内切换到 TLS 连接。

有什么想法可以做到这一点吗?

python ftp twisted ftps
1个回答
0
投票

好吧,我混淆了 FTPS 类型;我实际上需要显式的 FTPS(即

AUTH TLS
命令的处理);感谢 Martin Prikryl 指出了这一点。

同时,我想出了如何解决我的问题。请注意,对于 FTPS,您需要一个证书(假设存储在下面代码中的文件

server.pem
中) - 特别是,如果您要将其与 Twisted 一起使用,则它必须包含 SAN 扩展。有关如何创建包含 SAN 扩展的自签名证书的信息,请参阅本指南备份链接)。

这是实现我想要的基本代码:


from os.path import isfile
from sys import version_info

from twisted.internet.ssl import PrivateCertificate
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet.reactor import listenTCP, run
from twisted.python.log import msg


if version_info[0] >= 3:
    def decode(x):
        return x.decode('utf-8', errors='ignore')
    def encode(x):
        return x.encode()
else:
    def decode(x):
        return x
    def encode(x):
        return x


class MyFTPServer(LineReceiver):
    def __init__(self, options):
        self.options = options
        self.is_fttps = False

    def connectionMade(self):
        self.transport.write(b'220 MyFTP server\r\n')

    def lineReceived(self, line):
        line = decode(line)
        parts = line.split(None, 1)
        if parts:
            command = parts[0].upper()
            args = parts[1] if len(parts) > 1 else ''
            self.process_command(command, args)

    def connectionLost(self, reason):
        self.is_fttps = False

    def process_command(self, command, args):
        if command == 'AUTH':
            if len(args) == 0:
                self.transport.write(b'504 AUTH requires at least one argument\r\n')
            elif args.upper().strip() not in ['TLS', 'TLS-C', 'SSL', 'TLS-P']:
                self.transport.write(b'500 AUTH not understood\r\n')
            elif self.is_fttps:
                self.transport.write(b'200 User is already authenticated.\r\n')
            elif self.options is not None:
                self.transport.write(b'234 AUTH TLS successful\r\n')
                self.transport.startTLS(self.options)
                self.is_fttps = True
            else:
                self.transport.write(b'500 AUTH not understood\r\n')
        elif command == 'CCC':
            if not self.is_fttps:
                self.transport.write(b'533 Command channel is alredy cleared\r\n')
            else:
                self.transport.write(b'200 Clear Command Channel OK\r\n')
                self.transport.stopTLS()
                self.is_fttps = False
        # elif command == '...':
        #     # Process other commands
        else:
            self.transport.write(b'500 ' + encode(command) + b' not understood\r\n')


class MyFTPFactory(ServerFactory):
    def __init__(self, certfile):
        factory_options = None
        if isfile(certfile):
            cert_data = ''
            try:
                with open(certfile) as f:
                    cert_data += f.read()
            except OSError:
                msg('Could not read the file "{}".'.format(certfile))
            if cert_data:
                factory_options = PrivateCertificate.loadPEM(cert_data).options()
        self.options = factory_options

    def buildProtocol(self, addr):
        return MyFTPServer(self.options)


def main():
    certfile = 'server.pem'
    listenTCP(21, MyFTPFactory(certfile))
    run()
    msg('Shutdown requested, exiting...')


if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.