RxJava 从流中定期剥离字节,然后根据消息字节的长度指示符进一步提取消息

问题描述 投票:0回答:1

我有一个 Java 输入流,每 n 个字节跳过 2 个字节。现在,在每 n 个字节去除 2 个分隔符后,输出是干净的字节。

此输出是一系列字节,其中前 4 个字节表示长度,因此我需要计算它们的长度 int 并提取长度 + 附加长度字节并将它们写入文件。

我使用缓冲区来剥离 2 个字节,但不知道如何提取长度和长度字节。基本上,我需要收集/累积长度字节以及表示消息的附加长度字节。 如有任何帮助,我们将不胜感激。

Observable<Integer> byteObservable = Observable.create(emitter -> {
    try {
        while (true) {
            int b = fis.read();
            if (b == -1) {
                // No more bytes to read
                emitter.onComplete();
            } else {
                emitter.onNext(b);
            }
        }
    } catch (IOException e) {
        emitter.onError(e);
    }
});

byteObservable
    .buffer(100, 102) // Buffer 100, skip 101/102

作为一个例子,考虑流是

05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ

每 4 个字节删除

@@

05ABCDE07123456710ABCDEFGHIJ

这是由子消息组成的

05 ABCDE
07 1234567
10 ABCDEFGHIJ

获取每条消息并将其写入文件。

java stream rx-java2
1个回答
0
投票

这是使用简单示例解析消息的一种方法。 这是我的测试结果。

 5 ABCDE
 7 1234567
10 ABCDEFGHIJ

这是完整的可运行代码。

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

public class ParseMessages {

    public static void main(String[] args) {
        String exampleString = "05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ";
        
        InputStream is = new ByteArrayInputStream(exampleString.getBytes(StandardCharsets.UTF_8));
        processMessages(is, 4, 2);
        
        try {
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private static void processMessages(InputStream is, int count, int skip) {
        // Assuming all message lengths are two bytes
        int length = 0;
        int lengthMaximum = 2;
        byte[] text;
        String lengthString = "";
        String message = "";
        
        try {
            do {
                text = is.readNBytes(count);
                is.readNBytes(skip);
                for (int textIndex = 0; textIndex < text.length; textIndex++) {
                    if (length == 0) {
                        lengthString += Character.toString(text[textIndex]);
                        if (lengthString.length() >= lengthMaximum) {
                            length = Integer.valueOf(lengthString);
                            lengthString = "";
                        }   
                    } else {
                        message += Character.toString(text[textIndex]);
                        if (message.length() >= length) {
                            // Here's where you write the individual messages
                            // to an output.
                            System.out.println(String.format("%2d", length) + " " + message);
                            message = "";
                            length = 0;
                        }
                    }
                }
                
                
            } while (text.length > 0);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
}
© www.soinside.com 2019 - 2024. All rights reserved.