在 Golang 中使用 MQTT Paho 通过代理连接到 AWS IoT Core 时“连接丢失:EOF;订阅完成之前连接丢失”

问题描述 投票:0回答:1

我一直在努力通过在

AWS IoT Core MQTT broker
上运行的 Tinyproxy 代理实例,使用运行 Paho 库 的 Golang 应用程序连接到
localhost:3128
,但没有成功。

我能够使用 aws-iot-device-sdk-python-v2 在 Python 中管理这种情况,但不幸的是 aws-sdk-v2 for Golang 尚不支持 IoT Core,所以我一直在尝试与 Paho 一起解决这个问题。

这是我的主要功能

import (
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

func MainFunc() {
    MQTT.DEBUG = log.New(os.Stdout, "", 0)
    MQTT.ERROR = log.New(os.Stderr, "", 0)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    basePassword := `mypassword`
    server := flag.String("server", "ssl://iot.customdomain.io:443", "The full URL of the MQTT server to connect to")
    topic := flag.String("topic", "test/foo", "Topic to subscribe to")
    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := flag.String("clientid", "my-gw", "A clientid for the connection")
    username := flag.String("username", "CertificateAuth&my-user?x-amz-customauthorizer-name=iot-custom-authorizer", "A username to authenticate to the MQTT server")
    password := flag.String("password", basePassword, "Password to match username")
    token := flag.String("token", "", "An optional token credential to authenticate with")
    // skipVerify := flag.Bool("skipVerify", false, "Controls whether TLS certificate is verified")
    flag.Parse()

    os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
    defer os.Unsetenv("ALL_PROXY")

    connOpts := MQTT.NewClientOptions().AddBroker(*server).
        SetClientID(*clientid).
        SetCleanSession(true).
        SetProtocolVersion(4)
    if *username != "" {
        connOpts.SetUsername(*username)
        if *password != "" {
            connOpts.SetPassword(*password)
        }
    } else if *token != "" {
        connOpts.SetCredentialsProvider(func() (string, string) {
            return "unused", *token
        })
    }
    connOpts.SetHTTPHeaders(http.Header{
        "x-amz-customauthorizer-name": {"iot-custom-authorizer"},
        "Proxy-Connection":            {"keep-alive"},
    })

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))
    TlsCerts := TlsCerts{
        IotPrivateKey:     IotPrivateKey,
        IotCertificatePem: IotCertificatePem,
        AlnpProtocols:     []string{"mqtt", "x-amzn-mqtt-ca"},
    }
    tlsConfig := parseTlsConfig(TlsCerts)
    connOpts.SetTLSConfig(tlsConfig)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
        }
    }

    // Illustrates customized TLS configuration prior to connection attempt
    connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
        cfg := tlsCfg.Clone()
        cfg.ServerName = broker.Hostname()
        return cfg
    }
    dialer := proxy.FromEnvironment()
    connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
        fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
        address := uri.Host

        return dialer.Dial(uri.Scheme, address)
    })

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to %s\n", *server)
    }

    <-c
}

用户名和密码将传递给在 IoT Core MQTT 代理前面运行的自定义授权者

lambda function

正如 paho lib 实现与 http 代理所述,这是我的

Dial
实现
dialer
以及相关的 mqtt 内容。

type httpProxy struct {
    host     string
    haveAuth bool
    username string
    password string
    forward  proxy.Dialer
}

func (s httpProxy) String() string {
    return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}

func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
    s := new(httpProxy)
    s.host = uri.Host
    s.forward = forward
    if uri.User != nil {
        s.haveAuth = true
        s.username = uri.User.Username()
        s.password, _ = uri.User.Password()
    }

    return s, nil
}

func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
    reqURL := url.URL{
        Scheme: "https",
        Host:   addr,
    }

    req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
    if err != nil {
        return nil, err
    }
    req.Close = false
    if s.haveAuth {
        req.SetBasicAuth(s.username, s.password)
    }
    req.Header.Set("User-Agent", "paho.mqtt")

    // Dial and create the client connection.
    c, err := s.forward.Dial("tcp", s.host)
    if err != nil {
        return nil, err
    }

    err = req.Write(c)
    if err != nil {
        _ = c.Close()
        return nil, err
    }

    resp, err := http.ReadResponse(bufio.NewReader(c), req)
    if err != nil {
        _ = c.Close()
        return nil, err
    }
    _ = resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        _ = c.Close()
        return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
    }
    TlsCerts := TlsCerts{
        IotPrivateKey:     IotPrivateKey,
        IotCertificatePem: IotCertificatePem,
        AlnpProtocols:     []string{"mqtt", "x-amzn-mqtt-ca"},
    }
    tlsConfig := parseTlsConfig(TlsCerts)
        // Tunnel the communication over TLS
    tlsConn := tls.Client(c, tlsConfig)

    return tlsConn, nil
}

func init() {
    // Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
    proxy.RegisterDialerType("http", newHTTPProxy)
    proxy.RegisterDialerType("https", newHTTPProxy)
}

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    fmt.Printf("Received message on topic: %s\n", message.Topic())
    fmt.Printf("Message: %s\n", message.Payload())
}

var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Println("Received message on topic: " + msg.Topic())
    ProcessMessage(msg.Payload())
}

func ProcessMessage(payload []byte) {
    fmt.Println(string(payload))
}

在启用日志的情况下运行上面的代码,我看到标题的错误,其中连接在订阅步骤完成之前丢失。

[client]   Connect()
[store]    memorystore initialized
[client]   about to write new connect msg
[client]   using custom onConnectAttempt handler...
Custom dialer invoked for iot.customdomain.io:443
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   startCommsWorkers called
[client]   client is connected/reconnected
[net]      incoming started
[net]      startIncomingComms started
[net]      outgoing started
[net]      startComms started
[client]   enter Subscribe
[client]   startCommsWorkers done
[client]   exit startClient
[pinger]   keepalive starting
Connected to ssl://iot.customdomain.io:443
[net]      outgoing waiting for an outbound message
[net]      logic waiting for msg on ibound
[net]      startIncomingComms: inboundFromStore complete
[net]      logic waiting for msg on ibound
[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [test/foo]
[client]   sending subscribe message, topic: test/foo
[client]   exit Subscribe
[net]      obound priority msg to write, type *packets.SubscribePacket
[net]      outgoing waiting for an outbound message
[net]      incoming complete
[net]      startIncomingComms: got msg on ibound
[net]      logic waiting for msg on ibound
[net]      startIncomingComms: ibound complete
[net]      startIncomingComms goroutine complete
[net]      outgoing waiting for an outbound message
[client]   Connect comms goroutine - error triggered EOF
[client]   internalConnLost called
[client]   stopCommsWorkers called
[pinger]   keepalive stopped
[router]   matchAndDispatch exiting
[client]   startCommsWorkers output redirector finished
[net]      outgoing waiting for an outbound message
[net]      outgoing waiting for an outbound message
[net]      outgoing comms stopping
[client]   stopCommsWorkers waiting for workers
[client]   stopCommsWorkers waiting for comms
[client]   internalConnLost waiting on workers
[net]      startComms closing outError
[client]   incoming comms goroutine done
[client]   stopCommsWorkers done
[client]   internalConnLost workers stopped
[msgids]   cleaned up subs
[client]   internalConnLost complete
connection lost before Subscribe completed
[client]   enter reconnect
[client]   about to write new connect msg
[client]   using custom onConnectAttempt handler...
Custom dialer invoked for iot.customdomain.io:443
Connection lost: EOF
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   startCommsWorkers called
[client]   client is connected/reconnected
[net]      incoming started
[net]      startIncomingComms started
[net]      outgoing started
[net]      startComms started
[pinger]   keepalive starting
[client]   startCommsWorkers done
[store]    enter Resume
[store]    exit resume
[net]      outgoing waiting for an outbound message
[net]      logic waiting for msg on ibound
[net]      startIncomingComms: inboundFromStore complete
[net]      logic waiting for msg on ibound
[client]   enter Subscribe
[client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 2 topics: [test/foo]
[client]   sending subscribe message, topic: test/foo
[client]   exit Subscribe
[net]      obound priority msg to write, type *packets.SubscribePacket
[net]      outgoing waiting for an outbound message
[net]      incoming complete
[net]      startIncomingComms: got msg on ibound
[net]      logic waiting for msg on ibound
[net]      startIncomingComms: ibound complete
[net]      startIncomingComms goroutine complete
[net]      outgoing waiting for an outbound message
[client]   Connect comms goroutine - error triggered EOF
[client]   internalConnLost called
[client]   stopCommsWorkers called
[router]   matchAndDispatch exiting
[client]   startCommsWorkers output redirector finished
[net]      outgoing waiting for an outbound message
[net]      outgoing waiting for an outbound message
[net]      outgoing comms stopping
[net]      startComms closing outError
[client]   incoming comms goroutine done
[pinger]   keepalive stopped
[client]   stopCommsWorkers waiting for workers
[client]   stopCommsWorkers waiting for comms
[client]   stopCommsWorkers done
[client]   internalConnLost waiting on workers
[client]   internalConnLost workers stopped
[msgids]   cleaned up subs
[client]   internalConnLost complete
Connection lost: EOF
connection lost before Subscribe completed
[client]   enter reconnect
[client]   Detect continual connection lost after reconnect, slept for 1 seconds
[client]   about to write new connect msg
...
[client]   internalConnLost complete
Connection lost: EOF
connection lost before Subscribe completed
[client]   enter reconnect
panic: test timed out after 30s
    running tests:
        TestClientConnectWithTlsToIoTCoreCloudBrokerThroughProxy (30s)

错误继续循环,直到

TestClientConnectWithTlsToIoTCoreCloudBrokerThroughProxy
测试因超时错误而失败。

我还可以看到建立连接和关闭连接的tinyproxy日志

CONNECT   Nov 10 16:52:53.777 [45157]: Connect (file descriptor 4): 127.0.0.1
CONNECT   Nov 10 16:52:53.777 [45157]: Request (file descriptor 4): CONNECT iot.customdomain.io:443 HTTP/1.1
INFO      Nov 10 16:52:53.778 [45157]: No upstream proxy for iot.customdomain.io
INFO      Nov 10 16:52:53.778 [45157]: opensock: opening connection to iot.customdomain.io:443
INFO      Nov 10 16:52:53.920 [45157]: opensock: getaddrinfo returned for iot.customdomain.io:443
CONNECT   Nov 10 16:52:54.098 [45157]: Established connection to host "iot.customdomain.io" using file descriptor 8.
INFO      Nov 10 16:52:54.098 [45157]: Not sending client headers to remote machine
INFO      Nov 10 16:52:54.841 [45157]: Closed connection between local client (fd:4) and remote client (fd:8)

因此,我无法通过代理使用 Golang Paho 与 IoT Core 建立工作连接,并且我不知道发生了什么。

amazon-web-services go proxy mqtt paho
1个回答
0
投票

@hardillb 回答后,我意识到客户端确实无权订阅该主题,但发生了另一个连接问题。经过几次尝试使代码正常工作后,我最终想出了一个解决方案,该解决方案能够连接到 IoT Core MQTT 代理实例、发布和订阅该主题以及接收消息。这是工作代码。

import (
    "bufio"
    "crypto/ecdsa"
    "crypto/rsa"
    "crypto/tls"
    "crypto/x509"
    "encoding/pem"
    "fmt"
    "log"
    "net"
    "net/http"
    "net/url"
    "time"

    "golang.org/x/net/proxy"

    "os"
    "os/signal"
    "syscall"

    MQTT "github.com/eclipse/paho.mqtt.golang"
)

type TlsCerts struct {
    IotPrivateKey     string
    IotCertificatePem string
    CaCertificatePem  string
    AlnpProtocols     []string
}

type Config struct {
    ClientId  string
    BrokerUrl string
    TlsCerts  TlsCerts
}

type httpProxy struct {
    host     string
    haveAuth bool
    username string
    password string
    forward  proxy.Dialer
}

func parseTlsConfig(tlsCerts TlsCerts) *tls.Config {
    if tlsCerts.IotPrivateKey == "" || tlsCerts.IotCertificatePem == "" {
        return nil
    }

    cert := parseTlsCertificates(tlsCerts)

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))

    return &tls.Config{
        RootCAs:            caCertPool,
        Certificates:       []tls.Certificate{cert},
        InsecureSkipVerify: false,
        NextProtos:         tlsCerts.AlnpProtocols,
        ServerName:         "iot.customdomain.io",
    }
}

func parseTlsCertificates(
    tlsCerts TlsCerts,
) tls.Certificate {
    block, _ := pem.Decode([]byte(tlsCerts.IotPrivateKey))
    if block == nil {

        log.Panic("Failed to parse private key")

    }

    var key interface{}
    var err error

    key, err = x509.ParsePKCS1PrivateKey(block.Bytes)
    if err != nil {
        key, err = x509.ParsePKCS8PrivateKey(block.Bytes)
        if err != nil {
            log.Panicf("Failed to parse private key: %v", err)
        }

        switch k := key.(type) {
        case *rsa.PrivateKey:
            key = k
        case *ecdsa.PrivateKey:
            key = k
        default:
            log.Panicf("Unsupported private key type: %T", key)
        }
    }

    block, _ = pem.Decode([]byte(tlsCerts.IotCertificatePem))
    if block == nil {
        log.Panic("Failed to parse certificate")
    }
    cert, err := x509.ParseCertificate(block.Bytes)
    if err != nil {
        log.Panicf("Failed to parse certificate: %v", err)
    }

    return tls.Certificate{
        PrivateKey:  key,
        Certificate: [][]byte{cert.Raw},
    }
}

func (s httpProxy) String() string {
    return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}

func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
    s := new(httpProxy)
    s.host = uri.Host
    s.forward = forward
    if uri.User != nil {
        s.haveAuth = true
        s.username = uri.User.Username()
        s.password, _ = uri.User.Password()
    }

    return s, nil
}

func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
    reqURL := url.URL{
        Scheme: "https",
        Host:   addr,
    }

    req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
    if err != nil {
        return nil, err
    }
    req.Close = false
    if s.haveAuth {
        req.SetBasicAuth(s.username, s.password)
    }
    req.Header.Set("User-Agent", "paho.mqtt")

    // Dial and create the client connection.
    c, err := s.forward.Dial("tcp", s.host)
    if err != nil {
        return nil, err
    }

    err = req.Write(c)
    if err != nil {
        _ = c.Close()
        return nil, err
    }

    resp, err := http.ReadResponse(bufio.NewReader(c), req)
    if err != nil {
        _ = c.Close()
        return nil, err
    }
    _ = resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        _ = c.Close()
        return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
    }
    TlsCerts := TlsCerts{
        IotPrivateKey:     IotPrivateKey,
        IotCertificatePem: IotCertificatePem,
        AlnpProtocols:     []string{"mqtt", "x-amzn-mqtt-ca"},
    }
    tlsConfig := parseTlsConfig(TlsCerts)

    tlsConn := tls.Client(c, tlsConfig)

    return tlsConn, nil
}

func init() {
    // Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
    proxy.RegisterDialerType("http", newHTTPProxy)
    proxy.RegisterDialerType("https", newHTTPProxy)
}

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    fmt.Printf("Received message on topic: %s\n", message.Topic())
    fmt.Printf("Message: %s\n", message.Payload())
}

var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Println("Received message on topic: " + msg.Topic())
    ProcessMessage(msg.Payload())
}

func ProcessMessage(payload []byte) {
    fmt.Println(string(payload))
}

func MainFunc() {
    MQTT.DEBUG = log.New(os.Stdout, "", 0)
    MQTT.ERROR = log.New(os.Stderr, "", 0)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    server := "https://iot.customdomain.io:443"
    topic := "right/topic/now"
    qos := 0
    clientid := "my-client-id"

    os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
    defer os.Unsetenv("ALL_PROXY")

    connOpts := MQTT.NewClientOptions().AddBroker(server).
        SetClientID(clientid).
        SetCleanSession(true).
        SetProtocolVersion(4)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic, byte(qos), onMessageReceived); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
        }

        text := `{"message": "Hello MQTT"}`
        token := c.Publish(topic, byte(qos), false, text)
        token.Wait()
    }

    dialer := proxy.FromEnvironment()
    connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
        fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
        address := uri.Host
        return dialer.Dial(uri.Scheme, address)
    })

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    fmt.Printf("Connected to %s\n", server)

    time.Sleep(1 * time.Second)

    fmt.Println("Disconnecting")
    client.Disconnect(250)
    fmt.Println("Exiting")
}

很多东西都发生了变化,比如删除了

SetUsername
SetPassword
调用,因为我已经通过证书和私钥进行身份验证。

这也被删除了:

connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
        cfg := tlsCfg.Clone()
        cfg.ServerName = broker.Hostname()
        return cfg
    }

显然,之前的代码一团糟,所以有很多问题,不仅仅是一个问题,而且其中一个问题正是缺乏对该特定主题的授权的问题。但上面的代码能够通过本地主机上运行的 Tinyproxy 实例连接到 IoT Core。

© www.soinside.com 2019 - 2024. All rights reserved.