迭代器未按预期工作。
这是k路合并算法
我们有一个大输入文件(用于测试 1024 字节),其中包含随机生成的整数,并且该文件被分为 n 个较小的块,其大小为 256 字节(用于测试),块中的数字已经排序。 然后每个文件都被转化为一个
Stream<Integer>
,并且每个流都有自己的迭代器,每个迭代器都有自己的整数值。 Set<IteratorInfo>
由这些迭代器组成,第一个元素作为具有最小值的迭代器。
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class MergeFileSorter implements FileSorter {
// number of elements
private int chunkSize;
private String file;
// Number of runs to be merged
private int numberOfRuns;
// FileReader
private FileSplitter fileSplitter = new FileSplitter();
private FileReader fileReader;
public MergeFileSorter(String file, int runs, int chunkSize) {
this.file = file;
this.numberOfRuns = runs;
this.chunkSize = chunkSize;
}
@Override
public void sort(String inputPath, String outPath) {
try {
fileSplitter.readChunks(inputPath, (int) chunkSize);
mergeAndSort(inputPath, outPath, chunkSize);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void mergeAndSort(String inputPath, String outPath, int chunkSize) throws IOException {
Path chunkFolder = fileSplitter.getChunkFolder(inputPath);
List<Path> chunks = Files.walk(chunkFolder).filter(path -> !Files.isDirectory(path)).toList();
Iterator<Integer> treeSetMergedIterator = new TreeSetMergedIterator(chunks);
Path path = Path.of(outPath);
Files.write(path, new byte[0]);
int intCount = 0;
List<Integer> integers = new ArrayList<>();
int index = 0;
while (treeSetMergedIterator.hasNext()){
// System.out.println("Int Count: " + (++intCount));
Integer value = treeSetMergedIterator.next();
integers.add(value);
if (integers.size() >= chunkSize/Integer.BYTES){
Files.write(path, convertIntegersToByteArray(integers), StandardOpenOption.APPEND);
integers.clear();
}
}
if (!integers.isEmpty()){
Files.write(path, convertIntegersToByteArray(integers), StandardOpenOption.APPEND);
}
}
public static List<String> convertIntegersToStrings(List<Integer> integerList) {
return integerList.stream()
.map(Object::toString)
.collect(Collectors.toList());
}
public static List<String> convertIntegersToStrings(int[] integerList) {
return Arrays.stream(integerList) // Stream the array
.mapToObj(Integer::toString) // Convert each integer to a string
.collect(Collectors.toList()); //
}
public static byte[] convertIntegersToByteArray(List<Integer> integers) {
ByteBuffer buffer = ByteBuffer.allocate(integers.size()*Integer.BYTES);
for (Integer integer : integers) {
buffer.putInt(integer);
}
return buffer.array();
}
public static byte[] convertIntegersToByteArray(int[] integers) {
ByteBuffer buffer = ByteBuffer.allocate(integers.length * Integer.BYTES);
for (int integer : integers) {
buffer.putInt(integer);
}
return buffer.array();
}
}
此循环将迭代器的值添加到
Integer
数组,并将其写入带有排序块的输出文件。
问题是输出文件的大小与输入文件的大小不同,并且总是变化(400-500 字节),如果块是 4 字节(较小),则输出文件最多可达 1000 字节。
当我读取 32 字节或更小的文件(块的大小为 32/16/8/4)时,一切正常。读取
treeMergedIterator
中的文件似乎是正确的,因为缓冲区的大小始终已满并且与块文件的大小匹配。
但是
treeSetMergedIterator.hasNext()
总是意外地返回false
并且算法停止工作,但是还剩下数字。
所有将字节转换为 int 或 int 转换为字节的方法似乎都是正确的。
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class TreeSetMergedIterator implements Iterator<Integer> {
private final Set<IteratorInfo> iterators;
public TreeSetMergedIterator(List<Path> paths) {
iterators = paths.stream()
.map(this::streamLines)
.map(Stream::iterator)
.map(IteratorInfo::new)
.collect(Collectors.toCollection(TreeSet::new));
}
private Stream<Integer> streamLines(Path path) {
try {
byte[] bytes = Files.readAllBytes(path);
return bytesToIntList(bytes).stream();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext() {
return iterators.stream().anyMatch(iteratorInfo -> {
if (iteratorInfo.getIterator().hasNext()) {
return true;
} else {
return iteratorInfo.value != null;
}
});
}
@Override
public Integer next() {
IteratorInfo info = iterators.iterator().next();
Integer value = info.value;
iterators.remove(info);
if (info.iterator.hasNext()) {
iterators.add(info.next());
}
return value;
}
private List<Integer> bytesToIntList(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
List<Integer> integers = new ArrayList<>();
/* int[] integers = new int[buffer.capacity()/Integer.BYTES];*/
while (buffer.remaining() >= 4) {
integers.add(buffer.getInt());
}
/* while (buffer.hasRemaining()){
integers[buffer.position()/Integer.BYTES] = buffer.getInt();
}*/
return integers;
}
static class IteratorInfo implements Comparable<IteratorInfo> {
private final Iterator<Integer> iterator;
private Integer value;
public IteratorInfo(Iterator<Integer> iterator) {
this.iterator = iterator;
value = iterator.next();
}
public IteratorInfo next() {
value = iterator.next();
return this;
}
@Override
public int compareTo(IteratorInfo info) {
return Integer.compare(value, info.value);
}
public Iterator<Integer> getIterator() {
return iterator;
}
}
}
我还使用迭代器数组而不是集合实现了更简单的实现(
MergedIterator
类),但问题仍然相同。
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
public class MergedIterator implements Iterator<Integer> {
private final List<Iterator<Integer>> iterators;
private final Integer[] currentData;
public MergedIterator(List<Path> paths) {
iterators = paths.stream()
.map(this::streamLines)
.map(Stream::iterator)
.toList();
currentData = iterators.stream()
.map(Iterator::next)
.toArray(Integer[]::new);
}
private Stream<Integer> streamLines(Path path) {
try {
byte[] bytes = Files.readAllBytes(path);
return bytesToIntList(bytes).stream();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext() {
return iterators.stream().anyMatch(Iterator::hasNext);
}
@Override
public Integer next() {
int minValue = Integer.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i < currentData.length; i++) {
if (currentData[i] != null && currentData[i] < minValue) {
minValue = currentData[i];
minIndex = i;
}
}
Iterator<Integer> minIterator = iterators.get(minIndex);
currentData[minIndex] = minIterator.hasNext() ? minIterator.next() : null;
return minValue;
}
private List<Integer> bytesToIntList(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
List<Integer> integers = new ArrayList<>();
while (buffer.remaining() >= 4) {
integers.add(buffer.getInt());
}
return integers;
}
}
JVM内存堆为100 Mib
这是主课:(不考虑运行次数)
import java.util.logging.Logger;
public class Main {
public static final long GENERATED_FILE_SIZE = 1024;
public static final int chunkSize = 256;
public static void main(String[] args) {
String filePath = "random_file.dat";
String resultPath = "result.dat";
FileGenerator fileGenerator = new FileGenerator();
fileGenerator.generateRandomFile(filePath, GENERATED_FILE_SIZE, chunkSize);
MergeFileSorter fileSorter = new MergeFileSorter(filePath, 2, chunkSize);
fileSorter.sort(filePath, resultPath);
}
}
目前我只能提供如何跟踪错误的一般建议:
您应该将文件读取、文件写入和排序分成三个独立的部分。如果由于文件大小而这似乎不切实际,您仍然可以有一个根据请求读取下一条输入的源和根据请求写入一段输出的接收器。这样您就可以分别测试这三个部分。
此外,您不应该使用随机数据进行测试,而应该使用明确定义的数据。您可以随机生成一次测试数据文件,但之后您应该重复使用它们。这样您还可以提供预期结果并自动确定测试结果。
最后:您是否使用根本不需要排序的数据测试了您的代码?这样您就可以更好地找出错误发生的位置。我也会写测试
所有三种情况都不应该涉及任何排序(您可以实现虚拟排序,仅传递数据)。