在Python中读取LabVIEW TCP数据(Flattened String / Data Cluster)

问题描述 投票:1回答:4

我有一个LabVIEW应用程序,在通过TCP / IP传输到我的python应用程序之前,将双打的集群(数组)展平为一个字符串。这样做是因为TCP / IP只会传输字符串。

问题是python读取字符串作为无意义的ASCII字符的加载,我似乎无法将它们解读回原始的双精度数组。

如何解释在展平数据字符串后LabVIEW发送的字符串数据。谷歌数小时后我唯一的有用信息提示是一个名为pyLFDS的PyPI条目,但它已经被删除了。

python tcp double ascii labview
4个回答
1
投票

我想记录问题和解决方案,以便其他人可以避免浪费我在Google上寻找解决方案的时间。

当LabVIEW平展数据时,在这种情况下是一个双精度集群,它将它们简单地作为一个连续的字符串发送,每个双字节由8个字节表示。这被python解释为每个双8个ASCII字符,显示为nonsense in your console

要返回传输的双精度数,需要依次取每个8字节的部分并将ASCII字符转换为ASCII码,在Python的情况下使用ord()。这将给你一个8字节的十进制代码(例如4.8 = [64 19 51 51 51 51 51 51]

事实证明,LabVIEW可以完成大部分工作,包括TCP / IP传输,Big Endian。除非您正在使用Big Endian,否则您可能需要更改它。例如,上面的例子将成为[51 51 51 51 51 51 19 64]。我将每个双打放入一个列表中,因此能够使用list(reversed())函数来改变endienness。

然后,您可以将其转换回双精度版。示例python代码:

import struct
b = bytearray([51,51,51,51,51,51,19,64]) #this is the number 4.8
value = struct.unpack('d', b)

print(value) #4.8

对于经验丰富的程序员来说,这可能是显而易见的,但它让我感到困惑了好几天。我为使用stackoverflow作为平台来回答我自己的问题而道歉,但希望这篇帖子可以帮助下一个正在挣扎的人。

编辑:注意如果您使用的是早于Python 2.7.5的版本,那么您可能会发现struct.unpack()将失败。使用上面的示例代码替换以下代码为我工作:b = bytes(bytearray([51,51,51,51,51,51,19,64]))


1
投票

详细描述了qengxswpoi中的LabVIEW展平数据格式。该文档没有明确描述如何表示双精度浮点数(DBL类型),但更多的搜索发现here澄清它们存储在this中。

但是,以XML或JSON等标准文本格式发送数据可能更简单,更具前瞻性,LabVIEW中的内置函数和Python中的标准库模块都支持这两种格式。

如果您可以选择,不使用LabVIEW扁平化数据与其他程序交换的另一个原因是扁平字符串不包含您需要将其转换回原始数据类型所需的类型描述符 - 您需要知道什么键入数据是为了解码它。


0
投票

我知道这并没有解决你的问题,因为你提到你没有能力修改LabVIEW代码。但是,我希望在LabVIEW中通过TCP传输字符串数据的常用方法增加一些清晰度。

可以控制通过Write TCP发送的数据字符串的字节顺序。我建议使用IEEE 754 format,因为它使您能够选择在展平数据时要使用的字节顺序; big-endian(默认如果是无线连接),native(使用主机的字节顺序)或little-endian。

我见过的另一种常见技术是使用Flatten To String Function。这样做会将数字转换为big-endian字符串。当你在网络的另一端阅读它时,这当然会让人感到困惑,因为大多数其他东西都是小端的,所以你需要进行一些字节交换。

一般来说,如果您不确定代码是什么样的,假设它来自LabVIEW代码它将是big-endian。

nekomatic的答案很好。在可用时使用标准文本格式始终是一个不错的选择。


0
投票

这段代码适合我。 UDP服务器接受展平的dbl阵列x,将x + 1返回到端口6503.修改LabView UDP客户端以满足您的需要。

Type Cast Function

LabView UDP客户端:import struct import socket import numpy as np def get_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except: IP = '127.0.0.1' finally: s.close() return IP #bind_ip = get_ip() print("\n\n[*] Current ip is %s" % (get_ip())) bind_ip = '' bind_port = 6502 server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server.bind((bind_ip,bind_port)) print("[*] Ready to receive UDP on %s:%d" % (bind_ip,bind_port)) while True: data, address = server.recvfrom(1024) print('[*] Received %s bytes from %s:' % (len(data), address)) arrLen = struct.unpack('>i', data[:4])[0] print('[*] Received array of %d doubles:' % (arrLen,)) x = [] elt = struct.iter_unpack('>d', data[4:]) while True: try: x.append(next(elt)[0]) print(x[-1]) except StopIteration: break x = np.array(x) y = x+1 # np.sin(x) msg = data[:4] for item in y: msg += struct.pack('>d', item) print(msg) A = (address[0], 6503) server.sendto(msg, A) break server.close() print('[*] Server closed') print('[*] Done')

© www.soinside.com 2019 - 2024. All rights reserved.