外部K路合并。具有整数流的迭代器

问题描述 投票:0回答:1

迭代器未按预期工作。

这是k路合并算法

我们有一个大输入文件(用于测试 1024 字节),其中包含随机生成的整数,并且该文件被分为 n 个较小的块,其大小为 256 字节(用于测试),块中的数字已经排序。 然后每个文件都被转化为一个

Stream<Integer>
,并且每个流都有自己的迭代器,每个迭代器都有自己的整数值。
Set<IteratorInfo>
由这些迭代器组成,第一个元素作为具有最小值的迭代器。

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

public class MergeFileSorter implements FileSorter {

    // number of elements
    private int chunkSize;
    private String file;

    // Number of runs to be merged
    private int numberOfRuns;

    // FileReader
    private FileSplitter fileSplitter = new FileSplitter();
    private FileReader fileReader;

    public MergeFileSorter(String file, int runs, int chunkSize) {
        this.file = file;
        this.numberOfRuns = runs;
        this.chunkSize = chunkSize;
    }

    @Override
    public void sort(String inputPath, String outPath) {
        try {
            fileSplitter.readChunks(inputPath, (int) chunkSize);
            mergeAndSort(inputPath, outPath, chunkSize);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void mergeAndSort(String inputPath, String outPath, int chunkSize) throws IOException {
        Path chunkFolder = fileSplitter.getChunkFolder(inputPath);

        List<Path> chunks = Files.walk(chunkFolder).filter(path -> !Files.isDirectory(path)).toList();

        Iterator<Integer> treeSetMergedIterator = new TreeSetMergedIterator(chunks);

        Path path = Path.of(outPath);
        Files.write(path, new byte[0]);
        int intCount = 0;

        List<Integer> integers = new ArrayList<>();

        int index = 0;
        while (treeSetMergedIterator.hasNext()){
           // System.out.println("Int Count: " + (++intCount));
            Integer value = treeSetMergedIterator.next();
            integers.add(value);

            if (integers.size() >= chunkSize/Integer.BYTES){
                Files.write(path, convertIntegersToByteArray(integers), StandardOpenOption.APPEND);
                integers.clear();
            }
        }
        if (!integers.isEmpty()){
            Files.write(path, convertIntegersToByteArray(integers), StandardOpenOption.APPEND);
        }
    }

    public static List<String> convertIntegersToStrings(List<Integer> integerList) {
        return integerList.stream()
                .map(Object::toString)
                .collect(Collectors.toList());
    }

    public static List<String> convertIntegersToStrings(int[] integerList) {
        return Arrays.stream(integerList)  // Stream the array
                .mapToObj(Integer::toString)  // Convert each integer to a string
                .collect(Collectors.toList());  //
    }

    public static byte[] convertIntegersToByteArray(List<Integer> integers) {
        ByteBuffer buffer = ByteBuffer.allocate(integers.size()*Integer.BYTES);
        for (Integer integer : integers) {
            buffer.putInt(integer);
        }
        return buffer.array();
    }

    public static byte[] convertIntegersToByteArray(int[] integers) {
        ByteBuffer buffer = ByteBuffer.allocate(integers.length * Integer.BYTES);
        for (int integer : integers) {
            buffer.putInt(integer);
        }
        return buffer.array();
    }
}

此循环将迭代器的值添加到

Integer
数组,并将其写入带有排序块的输出文件。

问题是输出文件的大小与输入文件的大小不同,并且总是变化(400-500 字节),如果块是 4 字节(较小),则输出文件最多可达 1000 字节。

当我读取 32 字节或更小的文件(块的大小为 32/16/8/4)时,一切正常。读取

treeMergedIterator
中的文件似乎是正确的,因为缓冲区的大小始终已满并且与块文件的大小匹配。

但是

treeSetMergedIterator.hasNext()
总是意外地返回
false
并且算法停止工作,但是还剩下数字。

所有将字节转换为 int 或 int 转换为字节的方法似乎都是正确的。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TreeSetMergedIterator implements Iterator<Integer> {

    private final Set<IteratorInfo> iterators;

    public TreeSetMergedIterator(List<Path> paths) {
        iterators = paths.stream()
                .map(this::streamLines)
                .map(Stream::iterator)
                .map(IteratorInfo::new)
                .collect(Collectors.toCollection(TreeSet::new));
    }

    private Stream<Integer> streamLines(Path path) {
        try {
            byte[] bytes = Files.readAllBytes(path);
            return bytesToIntList(bytes).stream();
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public boolean hasNext() {
        return iterators.stream().anyMatch(iteratorInfo -> {
            if (iteratorInfo.getIterator().hasNext()) {
                return true;
            } else {
                return iteratorInfo.value != null;
            }
        });
    }

    @Override
    public Integer next() {
        IteratorInfo info = iterators.iterator().next();
        Integer value = info.value;
        iterators.remove(info);
        if (info.iterator.hasNext()) {
            iterators.add(info.next());
        }

        return value;
    }

    private List<Integer> bytesToIntList(byte[] bytes) {
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        List<Integer> integers = new ArrayList<>();
        /* int[] integers = new int[buffer.capacity()/Integer.BYTES];*/
        while (buffer.remaining() >= 4) {
            integers.add(buffer.getInt());
        }
        /* while (buffer.hasRemaining()){
            integers[buffer.position()/Integer.BYTES] = buffer.getInt();
        }*/
        return integers;
    }

    static class IteratorInfo implements Comparable<IteratorInfo> {

        private final Iterator<Integer> iterator;

        private Integer value;

        public IteratorInfo(Iterator<Integer> iterator) {
            this.iterator = iterator;
            value = iterator.next();
        }

        public IteratorInfo next() {
            value = iterator.next();
            return this;
        }

        @Override
        public int compareTo(IteratorInfo info) {
            return Integer.compare(value, info.value);
        }

        public Iterator<Integer> getIterator() {
            return iterator;
        }
    }
}

我还使用迭代器数组而不是集合实现了更简单的实现(

MergedIterator
类),但问题仍然相同。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

public class MergedIterator implements Iterator<Integer> {
    private final List<Iterator<Integer>> iterators;

    private final Integer[] currentData;

    public MergedIterator(List<Path> paths) {
        iterators = paths.stream()
                .map(this::streamLines)
                .map(Stream::iterator)
                .toList();
        currentData = iterators.stream()
                .map(Iterator::next)
                .toArray(Integer[]::new);
    }

    private Stream<Integer> streamLines(Path path) {
        try {
            byte[] bytes = Files.readAllBytes(path);
            return bytesToIntList(bytes).stream();
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public boolean hasNext() {
        return iterators.stream().anyMatch(Iterator::hasNext);
    }

    @Override
    public Integer next() {
        int minValue = Integer.MAX_VALUE;
        int minIndex = -1;

        for (int i = 0; i < currentData.length; i++) {
            if (currentData[i] != null && currentData[i] < minValue) {
                minValue = currentData[i];
                minIndex = i;
            }
        }

        Iterator<Integer> minIterator = iterators.get(minIndex);
        currentData[minIndex] = minIterator.hasNext() ? minIterator.next() : null;

        return minValue;
    }

    private List<Integer> bytesToIntList(byte[] bytes) {
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        List<Integer> integers = new ArrayList<>();
      
        while (buffer.remaining() >= 4) {
            integers.add(buffer.getInt());
        }
        
        return integers;
    }
}

JVM内存堆为100 Mib

这是主课:(不考虑运行次数)

import java.util.logging.Logger;

public class Main {

    public static final long GENERATED_FILE_SIZE = 1024;

    public static final int chunkSize = 256;

    public static void main(String[] args) {
        String filePath = "random_file.dat";
        String resultPath = "result.dat";

        FileGenerator fileGenerator = new FileGenerator();
        fileGenerator.generateRandomFile(filePath, GENERATED_FILE_SIZE, chunkSize);

        MergeFileSorter fileSorter = new MergeFileSorter(filePath, 2,  chunkSize);
        fileSorter.sort(filePath, resultPath);
    }
}
java file stream iterator external
1个回答
0
投票

目前我只能提供如何跟踪错误的一般建议:

您应该将文件读取、文件写入和排序分成三个独立的部分。如果由于文件大小而这似乎不切实际,您仍然可以有一个根据请求读取下一条输入的源和根据请求写入一段输出的接收器。这样您就可以分别测试这三个部分。

此外,您不应该使用随机数据进行测试,而应该使用明确定义的数据。您可以随机生成一次测试数据文件,但之后您应该重复使用它们。这样您还可以提供预期结果并自动确定测试结果。

最后:您是否使用根本不需要排序的数据测试了您的代码?这样您就可以更好地找出错误发生的位置。我也会写测试

  1. 读取已知文件并将其与硬编码的预期数据进行比较
  2. 写入硬编码数据并将其与众所周知的文件进行比较
  3. 读取已知文件并将其与众所周知的预期文件进行比较

所有三种情况都不应该涉及任何排序(您可以实现虚拟排序,仅传递数据)。

© www.soinside.com 2019 - 2024. All rights reserved.