我将字节数组写入二进制文件,每个数组都以数字
313
或 331
开头,但是当我读取它时,我有以数字 28164
开头的数组。出了什么问题?代码如下。
问题历史记录
我从串行端口接收数据。
DataReceived
在辅助线程上引发,因此我所有的数据处理都在该线程上运行。我在序列化之前和序列化期间检查数据,一切正常,没有问题,但是当我开始读取此文件时(当没有更多写入时)我读取的不是我写入的数据(您可以阅读问题历史记录)请参阅下文了解更多信息)。我发现,如果每个序列化调用我都会写入不同的文件中,那么没有人会出错,这就是为什么我认为错误的原因是从不同的线程访问 File
/BinaryWriter
。我尝试过使用锁,但没有帮助。我该怎么办?
锁定尝试:
private static object locker = new object();
public void Serialize() {
lock(locker) {
// some logic here
}
}
我有
ConcurrentQueue<byte[]>
作为Buffer
。当我从 SerialPort
收到字节数组时,我检查 CRC,如果没问题,我添加调用下一个函数(进一步简短描述):
public void PassData(byte[] data) {
const int recordLen = 18;
const int unusedBytesLen = 6; // 2 for command, 2 for index, and 2 for CRC
const int addressLen = 2;
const int nonRecordDataLen = unusedBytesLen + addressLen;
int recordDataLen = data.Length - nonRecordDataLen;
byte recordsCount = (byte)(recordDataLen / recordLen);
if(recordDataLen % recordLen != 0) {
return;
}
if(recordDataLen == 0) {
return;
}
int serializeLen = data.Length - unusedBytesLen + 1; // plus one for record count
totalCount += serializeLen;
byte[] temp = new byte[serializeLen];
Array.Copy(data, 0, temp, 0, addressLen); // copy address
temp[addressLen] = recordsCount; // record count after address
Array.Copy(data, 6, temp, 3, recordDataLen); // copy records
Buffer.Enqueue(temp);
int addr = BitConverter.ToUInt16(data, 0);
if(addr!=331 && addr != 313) {
throw new Exception("Incorrect data");
}
}
所以,正如你所看到的,我没有添加一个数组来缓冲区 if
data.Length-6
不能被18整除)data.Length==6
)。数组由地址、记录数和记录(18 字节组)组成。
如果前两个字节的 ushort 不是我的地址之一,我
throw Exception
。
达到一定条件后,我开始连载。
public void Serialize() {
if(Buffer.Count == 0)
return;
using(var stream = File.Open(fileName, FileMode.OpenOrCreate)) {
using(var writer = new BinaryWriter(stream)) {
while(Buffer.Count > 0) {
byte[] bOut;
bool success = Buffer.TryDequeue(out bOut);
if(bOut[2] == 0) {
// will be never thrown cause of the second reason
throw new Exception("Incorrect data");
}
if(bOut.Length != bOut[2] * 18 + 3) {
// will be never thrown cause of the first reason
throw new Exception("Incorrect data");
}
if(success) {
writer.Write(bOut);
}
else {
return;
}
}
}
}
}
但是当我尝试读取这个文件(在写入之后)时,我有一些不正确的记录。前几个没问题,但后来我收到了像 12751 这样的
address
(这是不可能的,因为如果我收到的不是我的地址,我会抛出异常)。为什么会出现这种情况?阅读下面的功能:
using(var stream = File.Open(fileName, FileMode.Open)) {
using(var reader = new BinaryReader(stream)) {
int length = (int)reader.BaseStream.Length;
while(reader.BaseStream.Position != length) {
uint readerAddr = reader.ReadUInt16();
byte count = reader.ReadByte();
for(int i = 0; i < count; i++) {
uint markAddr = reader.ReadUInt16();
byte cmp0 = reader.ReadByte();
byte cmp1 = reader.ReadByte();
byte cmp2 = reader.ReadByte();
byte cmp3 = reader.ReadByte();
byte cmp4 = reader.ReadByte();
byte cmp5 = reader.ReadByte();
byte cmp6 = reader.ReadByte();
byte cmp7 = reader.ReadByte();
byte cmp8 = reader.ReadByte();
char fec = reader.ReadChar();
uint tim = reader.ReadUInt32();
byte freq = reader.ReadByte();
byte rssi = reader.ReadByte();
sb.Append($"{readerAddr} {markAddr} {cmp0} {cmp1} {cmp2} {cmp3} {cmp4} {cmp5} {cmp6} {cmp7} {cmp8} {fec} {tim} {freq} {rssi}\n");
}
Console.WriteLine(sb.ToString());
sb.Length = 0;
Console.ReadLine();
}
}
}
附注如果我的问题既麻烦又不清楚,我非常抱歉,我一整天都在与这个错误作斗争。请留下评论,我会尽力提供一些说明。
编辑/澄清:
我为什么使用
ConcurrentQueue<byte>
?这是我修复此错误的尝试。
我通过以下方式从 SerialPort 接收数据:我有一个
SerialPort.DataReceived
事件处理程序,可以启用 Timer
15 毫秒。如果在这个时间间隔内触发事件,我会重新启动Timer
(只是timer.Stop(); timer.Start();
)。在 Timer.Elapsed
事件处理程序上,我检查 CRC,如果一切正常则传递数据。
所以,我认为错误的原因是
SerialPort.DataReceived
在辅助线程上引发,所以我在并发方面遇到了一些问题。
在我的第一次尝试中,我使用
List<byte>
作为 Buffer
,这不是线程安全的。
UPD:我尝试将每组数据写入单独的文件中,问题就消失了。所以也许从不同线程调用
Serialize()
存在一些问题?..我尝试过 lock
处理文件,但没有帮助。
你(我)应该使用
FileMode.Append
而不是 FileMode.OpenOrCreate
。 BinaryWriter
开始重写文件而不是在末尾添加新数据。锁是多余的(就我而言),因为我在两次序列化迭代之间有显着的时间延迟。
从流中读取消息时,可以随时读取任意数量的字节。然后,您必须保留这些字节,直到可以检测到整个消息。
只是因为您认为另一个端点正在写入整个消息,并且它们之间有很长的延迟。并不意味着这些字节将从操作系统以方便的连续块到达。
例如,您的主要阅读循环可能如下所示;
var buffer = new byte[maxSize];
var offset = 0;
var processed = 0;
while (true)
{
var len = await stream.ReadAsync(buffer, offset, buffer.Length - offset);
if (len == 0)
break; // EOF
offset += len;
// TODO scan through the buffer looking whole messages to process
while (processed < offset)
processed++;
// shuffle unprocessed bytes to the start (if any)
if (processed < offset && processed >0)
Array.Copy(buffer, processed, buffer, 0, offset - processed);
offset -= processed;
processed = 0;
}