我一直在努力通过在
AWS IoT Core MQTT broker
上运行的 Tinyproxy 代理实例,使用运行 Paho 库 的 Golang 应用程序连接到 localhost:3128
,但没有成功。
我能够使用 aws-iot-device-sdk-python-v2 在 Python 中管理这种情况,但不幸的是 aws-sdk-v2 for Golang 尚不支持 IoT Core,所以我一直在尝试与 Paho 一起解决这个问题。
这是我的主要功能
import (
MQTT "github.com/eclipse/paho.mqtt.golang"
)
func MainFunc() {
MQTT.DEBUG = log.New(os.Stdout, "", 0)
MQTT.ERROR = log.New(os.Stderr, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
basePassword := `mypassword`
server := flag.String("server", "ssl://iot.customdomain.io:443", "The full URL of the MQTT server to connect to")
topic := flag.String("topic", "test/foo", "Topic to subscribe to")
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := flag.String("clientid", "my-gw", "A clientid for the connection")
username := flag.String("username", "CertificateAuth&my-user?x-amz-customauthorizer-name=iot-custom-authorizer", "A username to authenticate to the MQTT server")
password := flag.String("password", basePassword, "Password to match username")
token := flag.String("token", "", "An optional token credential to authenticate with")
// skipVerify := flag.Bool("skipVerify", false, "Controls whether TLS certificate is verified")
flag.Parse()
os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
defer os.Unsetenv("ALL_PROXY")
connOpts := MQTT.NewClientOptions().AddBroker(*server).
SetClientID(*clientid).
SetCleanSession(true).
SetProtocolVersion(4)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
}
} else if *token != "" {
connOpts.SetCredentialsProvider(func() (string, string) {
return "unused", *token
})
}
connOpts.SetHTTPHeaders(http.Header{
"x-amz-customauthorizer-name": {"iot-custom-authorizer"},
"Proxy-Connection": {"keep-alive"},
})
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))
TlsCerts := TlsCerts{
IotPrivateKey: IotPrivateKey,
IotCertificatePem: IotCertificatePem,
AlnpProtocols: []string{"mqtt", "x-amzn-mqtt-ca"},
}
tlsConfig := parseTlsConfig(TlsCerts)
connOpts.SetTLSConfig(tlsConfig)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
// Illustrates customized TLS configuration prior to connection attempt
connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
cfg := tlsCfg.Clone()
cfg.ServerName = broker.Hostname()
return cfg
}
dialer := proxy.FromEnvironment()
connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
address := uri.Host
return dialer.Dial(uri.Scheme, address)
})
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", *server)
}
<-c
}
用户名和密码将传递给在 IoT Core MQTT 代理前面运行的自定义授权者
lambda function
。
正如 paho lib 实现与 http 代理所述,这是我的
Dial
实现 dialer
以及相关的 mqtt 内容。
type httpProxy struct {
host string
haveAuth bool
username string
password string
forward proxy.Dialer
}
func (s httpProxy) String() string {
return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}
func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
s := new(httpProxy)
s.host = uri.Host
s.forward = forward
if uri.User != nil {
s.haveAuth = true
s.username = uri.User.Username()
s.password, _ = uri.User.Password()
}
return s, nil
}
func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
reqURL := url.URL{
Scheme: "https",
Host: addr,
}
req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
if err != nil {
return nil, err
}
req.Close = false
if s.haveAuth {
req.SetBasicAuth(s.username, s.password)
}
req.Header.Set("User-Agent", "paho.mqtt")
// Dial and create the client connection.
c, err := s.forward.Dial("tcp", s.host)
if err != nil {
return nil, err
}
err = req.Write(c)
if err != nil {
_ = c.Close()
return nil, err
}
resp, err := http.ReadResponse(bufio.NewReader(c), req)
if err != nil {
_ = c.Close()
return nil, err
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
_ = c.Close()
return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
}
TlsCerts := TlsCerts{
IotPrivateKey: IotPrivateKey,
IotCertificatePem: IotCertificatePem,
AlnpProtocols: []string{"mqtt", "x-amzn-mqtt-ca"},
}
tlsConfig := parseTlsConfig(TlsCerts)
// Tunnel the communication over TLS
tlsConn := tls.Client(c, tlsConfig)
return tlsConn, nil
}
func init() {
// Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
proxy.RegisterDialerType("http", newHTTPProxy)
proxy.RegisterDialerType("https", newHTTPProxy)
}
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
fmt.Printf("Received message on topic: %s\n", message.Topic())
fmt.Printf("Message: %s\n", message.Payload())
}
var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println("Received message on topic: " + msg.Topic())
ProcessMessage(msg.Payload())
}
func ProcessMessage(payload []byte) {
fmt.Println(string(payload))
}
在启用日志的情况下运行上面的代码,我看到标题的错误,其中连接在订阅步骤完成之前丢失。
[client] Connect()
[store] memorystore initialized
[client] about to write new connect msg
[client] using custom onConnectAttempt handler...
Custom dialer invoked for iot.customdomain.io:443
[client] socket connected to broker
[client] Using MQTT 3.1.1 protocol
[net] connect started
[net] received connack
[client] startCommsWorkers called
[client] client is connected/reconnected
[net] incoming started
[net] startIncomingComms started
[net] outgoing started
[net] startComms started
[client] enter Subscribe
[client] startCommsWorkers done
[client] exit startClient
[pinger] keepalive starting
Connected to ssl://iot.customdomain.io:443
[net] outgoing waiting for an outbound message
[net] logic waiting for msg on ibound
[net] startIncomingComms: inboundFromStore complete
[net] logic waiting for msg on ibound
[client] SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [test/foo]
[client] sending subscribe message, topic: test/foo
[client] exit Subscribe
[net] obound priority msg to write, type *packets.SubscribePacket
[net] outgoing waiting for an outbound message
[net] incoming complete
[net] startIncomingComms: got msg on ibound
[net] logic waiting for msg on ibound
[net] startIncomingComms: ibound complete
[net] startIncomingComms goroutine complete
[net] outgoing waiting for an outbound message
[client] Connect comms goroutine - error triggered EOF
[client] internalConnLost called
[client] stopCommsWorkers called
[pinger] keepalive stopped
[router] matchAndDispatch exiting
[client] startCommsWorkers output redirector finished
[net] outgoing waiting for an outbound message
[net] outgoing waiting for an outbound message
[net] outgoing comms stopping
[client] stopCommsWorkers waiting for workers
[client] stopCommsWorkers waiting for comms
[client] internalConnLost waiting on workers
[net] startComms closing outError
[client] incoming comms goroutine done
[client] stopCommsWorkers done
[client] internalConnLost workers stopped
[msgids] cleaned up subs
[client] internalConnLost complete
connection lost before Subscribe completed
[client] enter reconnect
[client] about to write new connect msg
[client] using custom onConnectAttempt handler...
Custom dialer invoked for iot.customdomain.io:443
Connection lost: EOF
[client] socket connected to broker
[client] Using MQTT 3.1.1 protocol
[net] connect started
[net] received connack
[client] startCommsWorkers called
[client] client is connected/reconnected
[net] incoming started
[net] startIncomingComms started
[net] outgoing started
[net] startComms started
[pinger] keepalive starting
[client] startCommsWorkers done
[store] enter Resume
[store] exit resume
[net] outgoing waiting for an outbound message
[net] logic waiting for msg on ibound
[net] startIncomingComms: inboundFromStore complete
[net] logic waiting for msg on ibound
[client] enter Subscribe
[client] SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 2 topics: [test/foo]
[client] sending subscribe message, topic: test/foo
[client] exit Subscribe
[net] obound priority msg to write, type *packets.SubscribePacket
[net] outgoing waiting for an outbound message
[net] incoming complete
[net] startIncomingComms: got msg on ibound
[net] logic waiting for msg on ibound
[net] startIncomingComms: ibound complete
[net] startIncomingComms goroutine complete
[net] outgoing waiting for an outbound message
[client] Connect comms goroutine - error triggered EOF
[client] internalConnLost called
[client] stopCommsWorkers called
[router] matchAndDispatch exiting
[client] startCommsWorkers output redirector finished
[net] outgoing waiting for an outbound message
[net] outgoing waiting for an outbound message
[net] outgoing comms stopping
[net] startComms closing outError
[client] incoming comms goroutine done
[pinger] keepalive stopped
[client] stopCommsWorkers waiting for workers
[client] stopCommsWorkers waiting for comms
[client] stopCommsWorkers done
[client] internalConnLost waiting on workers
[client] internalConnLost workers stopped
[msgids] cleaned up subs
[client] internalConnLost complete
Connection lost: EOF
connection lost before Subscribe completed
[client] enter reconnect
[client] Detect continual connection lost after reconnect, slept for 1 seconds
[client] about to write new connect msg
...
[client] internalConnLost complete
Connection lost: EOF
connection lost before Subscribe completed
[client] enter reconnect
panic: test timed out after 30s
running tests:
TestClientConnectWithTlsToIoTCoreCloudBrokerThroughProxy (30s)
错误继续循环,直到
TestClientConnectWithTlsToIoTCoreCloudBrokerThroughProxy
测试因超时错误而失败。
我还可以看到建立连接和关闭连接的tinyproxy日志
CONNECT Nov 10 16:52:53.777 [45157]: Connect (file descriptor 4): 127.0.0.1
CONNECT Nov 10 16:52:53.777 [45157]: Request (file descriptor 4): CONNECT iot.customdomain.io:443 HTTP/1.1
INFO Nov 10 16:52:53.778 [45157]: No upstream proxy for iot.customdomain.io
INFO Nov 10 16:52:53.778 [45157]: opensock: opening connection to iot.customdomain.io:443
INFO Nov 10 16:52:53.920 [45157]: opensock: getaddrinfo returned for iot.customdomain.io:443
CONNECT Nov 10 16:52:54.098 [45157]: Established connection to host "iot.customdomain.io" using file descriptor 8.
INFO Nov 10 16:52:54.098 [45157]: Not sending client headers to remote machine
INFO Nov 10 16:52:54.841 [45157]: Closed connection between local client (fd:4) and remote client (fd:8)
因此,我无法通过代理使用 Golang Paho 与 IoT Core 建立工作连接,并且我不知道发生了什么。
@hardillb 回答后,我意识到客户端确实无权订阅该主题,但发生了另一个连接问题。经过几次尝试使代码正常工作后,我最终想出了一个解决方案,该解决方案能够连接到 IoT Core MQTT 代理实例、发布和订阅该主题以及接收消息。这是工作代码。
import (
"bufio"
"crypto/ecdsa"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"log"
"net"
"net/http"
"net/url"
"time"
"golang.org/x/net/proxy"
"os"
"os/signal"
"syscall"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type TlsCerts struct {
IotPrivateKey string
IotCertificatePem string
CaCertificatePem string
AlnpProtocols []string
}
type Config struct {
ClientId string
BrokerUrl string
TlsCerts TlsCerts
}
type httpProxy struct {
host string
haveAuth bool
username string
password string
forward proxy.Dialer
}
func parseTlsConfig(tlsCerts TlsCerts) *tls.Config {
if tlsCerts.IotPrivateKey == "" || tlsCerts.IotCertificatePem == "" {
return nil
}
cert := parseTlsCertificates(tlsCerts)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(AmazonRootCA1Cert))
return &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: false,
NextProtos: tlsCerts.AlnpProtocols,
ServerName: "iot.customdomain.io",
}
}
func parseTlsCertificates(
tlsCerts TlsCerts,
) tls.Certificate {
block, _ := pem.Decode([]byte(tlsCerts.IotPrivateKey))
if block == nil {
log.Panic("Failed to parse private key")
}
var key interface{}
var err error
key, err = x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
key, err = x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
log.Panicf("Failed to parse private key: %v", err)
}
switch k := key.(type) {
case *rsa.PrivateKey:
key = k
case *ecdsa.PrivateKey:
key = k
default:
log.Panicf("Unsupported private key type: %T", key)
}
}
block, _ = pem.Decode([]byte(tlsCerts.IotCertificatePem))
if block == nil {
log.Panic("Failed to parse certificate")
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
log.Panicf("Failed to parse certificate: %v", err)
}
return tls.Certificate{
PrivateKey: key,
Certificate: [][]byte{cert.Raw},
}
}
func (s httpProxy) String() string {
return fmt.Sprintf("HTTP proxy dialer for %s", s.host)
}
func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
s := new(httpProxy)
s.host = uri.Host
s.forward = forward
if uri.User != nil {
s.haveAuth = true
s.username = uri.User.Username()
s.password, _ = uri.User.Password()
}
return s, nil
}
func (s *httpProxy) Dial(_, addr string) (net.Conn, error) {
reqURL := url.URL{
Scheme: "https",
Host: addr,
}
req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
if err != nil {
return nil, err
}
req.Close = false
if s.haveAuth {
req.SetBasicAuth(s.username, s.password)
}
req.Header.Set("User-Agent", "paho.mqtt")
// Dial and create the client connection.
c, err := s.forward.Dial("tcp", s.host)
if err != nil {
return nil, err
}
err = req.Write(c)
if err != nil {
_ = c.Close()
return nil, err
}
resp, err := http.ReadResponse(bufio.NewReader(c), req)
if err != nil {
_ = c.Close()
return nil, err
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
_ = c.Close()
return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status)
}
TlsCerts := TlsCerts{
IotPrivateKey: IotPrivateKey,
IotCertificatePem: IotCertificatePem,
AlnpProtocols: []string{"mqtt", "x-amzn-mqtt-ca"},
}
tlsConfig := parseTlsConfig(TlsCerts)
tlsConn := tls.Client(c, tlsConfig)
return tlsConn, nil
}
func init() {
// Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment
proxy.RegisterDialerType("http", newHTTPProxy)
proxy.RegisterDialerType("https", newHTTPProxy)
}
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
fmt.Printf("Received message on topic: %s\n", message.Topic())
fmt.Printf("Message: %s\n", message.Payload())
}
var messagePubHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println("Received message on topic: " + msg.Topic())
ProcessMessage(msg.Payload())
}
func ProcessMessage(payload []byte) {
fmt.Println(string(payload))
}
func MainFunc() {
MQTT.DEBUG = log.New(os.Stdout, "", 0)
MQTT.ERROR = log.New(os.Stderr, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
server := "https://iot.customdomain.io:443"
topic := "right/topic/now"
qos := 0
clientid := "my-client-id"
os.Setenv("ALL_PROXY", fmt.Sprintf("http://localhost:%s", "3128"))
defer os.Unsetenv("ALL_PROXY")
connOpts := MQTT.NewClientOptions().AddBroker(server).
SetClientID(clientid).
SetCleanSession(true).
SetProtocolVersion(4)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic, byte(qos), onMessageReceived); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
text := `{"message": "Hello MQTT"}`
token := c.Publish(topic, byte(qos), false, text)
token.Wait()
}
dialer := proxy.FromEnvironment()
connOpts.SetCustomOpenConnectionFn(func(uri *url.URL, options MQTT.ClientOptions) (net.Conn, error) {
fmt.Printf("Custom dialer invoked for %s\n", uri.Host) // Debug log for verification
address := uri.Host
return dialer.Dial(uri.Scheme, address)
})
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
fmt.Printf("Connected to %s\n", server)
time.Sleep(1 * time.Second)
fmt.Println("Disconnecting")
client.Disconnect(250)
fmt.Println("Exiting")
}
很多东西都发生了变化,比如删除了
SetUsername
和 SetPassword
调用,因为我已经通过证书和私钥进行身份验证。
这也被删除了:
connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
cfg := tlsCfg.Clone()
cfg.ServerName = broker.Hostname()
return cfg
}
显然,之前的代码一团糟,所以有很多问题,不仅仅是一个问题,而且其中一个问题正是缺乏对该特定主题的授权的问题。但上面的代码能够通过本地主机上运行的 Tinyproxy 实例连接到 IoT Core。