有迭代器之类的东西,但是具有像流这样的功能?

问题描述 投票:0回答:1

基本上我要做的就是以下内容:

  1. 从数据库加载数据批次
  2. 将数据(查询结果)映射到以可读格式表示数据的类
  3. 写入文件
    重复直到查询不再有结果
  4. i列出了我熟悉的结构似乎适合需求,以及它们为什么不符合我的需求。
  5. ITERATOR→没有打电话的情况下映射和过滤的选项

我需要在不实际拥有数据(类似于流)的情况下定义映射函数,以便我可以将“流”方式传递到呼叫类,而只有在那里呼叫

Object[]
    ,然后呼叫
  • next()
    所有映射的功能都为结果
    •  -stream→所有数据都需要在映射和过滤之前可用
    • 可观察到可用的数据后立即发送数据。我需要同步处理它
  • 要获得更多的感觉,我做了一个小例子:
  • next
  • 我到目前为止有什么:
我试图从迭代器的底部转移,因为它是唯一真正满足我的内存需求的人。然后,我添加了一些方法来映射和循环数据。不过,这并不是一个很强的设计,它将比我想象的要困难,所以我想知道是否已经有任何东西可以做我需要的东西。

// Disclaimer: "Something" is the structure I am not sure of now. // Could be an Iterator or something else that fits (Thats the question) public class Orchestrator { @Inject private DataGetter dataGetter; public void doWork() { FileWriter writer = new FileWriter("filename"); // Write the formatted data to the file dataGetter.getData() .forEach(data -> writer.writeToFile(data)); } } public class FileWriter { public void writeToFile(List<Thing> data) { // Write to file } } public class DataGetter { @Inject private ThingDao thingDao; public Something<List<Thing>> getData() { // Map data to the correct format and return that return thingDao.getThings() .map(partialResult -> /* map to object */); } } public class ThingDao { public Something<List<Object[]>> getThings() { Query q = ...; // Dont know what to return } }

this到目前为止的工作,但是有一些我并不喜欢的作业,而且我也希望能够将一个Quiterator“附加”到另一个Quiterator本身并不难,但也应该拍摄地图在附录之后随之而来。
	
Assume您有一个以分页方式提供数据的DAO,例如通过将

public class QIterator<E> implements Iterator<List<E>> { public static String QUERY_OFFSET = "queryOffset"; public static String QUERY_LIMIT = "queryLimit"; private Query query; private long lastResultIndex = 0; private long batchSize; private Function<List<Object>, List<E>> mapper; public QIterator(Query query, long batchSize) { this.query = query; this.batchSize = batchSize; } public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) { this(query, batchSize); this.mapper = mapper; } @Override public boolean hasNext() { return lastResultIndex % batchSize == 0; } @Override public List<E> next() { query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex); query.setParameter(QueryIterator.QUERY_LIMIT, batchSize); List<Object> result = (List<Object>) query.getResultList(); // unchecked lastResultIndex += result.size(); List<E> mappedResult; if (mapper != null) { mappedResult = mapper.apply(result); } else { mappedResult = (List<E>) result; // unchecked } return mappedResult; } public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) { return new QIterator<>(query, batchSize, (data) -> { if (this.mapper != null) { return appendingMapper.apply(this.mapper.apply(data)); } else { return appendingMapper.apply((List<E>) data); } }); } public void forEach(BiConsumer<List<E>, Integer> consumer) { for (int i = 0; this.hasNext(); i++) { consumer.accept(this.next(), i); } } }

unchecked

子句应用于基础SQL。这样的DAO类将具有将这些值作为参数的方法,即该方法将符合以下函数方法:
LIMIT

e.g。呼叫
OFFSET

将返回前20行(第1页),呼叫
java iterator java-stream
1个回答
2
投票
将在第4页上返回20行。如果该方法返回少于20行,则意味着我们获得了最后一页。在最后一行之后索要数据将返回一个空列表。

对于下面的演示,我们可以嘲笑这样的DAO类:

getData(0, 20)
如果您想从该方法生成一条行,将所有行中的所有行流在一定尺寸的块中流式传输,我们需要一个

getData(60, 20)

创建一个流。
在这里是这样的
public class MockDao {
    private final int rowCount;
    public MockDao(int rowCount) {
        this.rowCount = rowCount;
    }
    public List<SimpleRow> getSimpleRows(int offset, int limit) {
        System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
        if (offset < 0 || limit <= 0)
            throw new IllegalArgumentException();
        List<SimpleRow> data = new ArrayList<>();
        for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
            data.add(new SimpleRow("Row #" + rowNo));
        System.out.println("DEBUG:   data = " + data);
        return data;
    }
}

public class SimpleRow {
    private final String data;
    public SimpleRow(String data) {
        this.data = data;
    }
    @Override
    public String toString() {
        return "Row[data=" + this.data + "]";
    }
}

的实现。

Stream

我们现在可以使用以上模拟DAO进行测试:

Spliterator

输出
StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)
可以看到,我们得到13行数据,从数据库中以5行的块检索。
在需要之前,数据不会从数据库中检索到数据库,从而导致存储空间较低,具体取决于块大小,流操作不会缓存数据。 您可以按以下方式一行进行一行:
Spliterator
当从查询返回第一行并与数据库并行持续之前,直到读取所有行。

该方法一次仅在内存中有一行,并通过仅运行1个查询来最大程度地减少数据库上的负载。

映射来自A

public class PagedDaoSpliterator<T> implements Spliterator<T> {
    private final PagedDao<T> dao;
    private final int blockSize;
    private int nextOffset;
    private List<T> data;
    private int dataIdx;
    public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
        if (blockSize <= 0)
            throw new IllegalArgumentException();
        this.dao = Objects.requireNonNull(dao);
        this.blockSize = blockSize;
    }
    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (this.data == null) {
            if (this.nextOffset == -1/*At end*/)
                return false; // Already at end
            this.data = this.dao.getData(this.nextOffset, this.blockSize);
            this.dataIdx = 0;
            if (this.data.size() < this.blockSize)
                this.nextOffset = -1/*At end, after this data*/;
            else
                this.nextOffset += data.size();
            if (this.data.isEmpty()) {
                this.data = null;
                return false; // At end
            }
        }
        action.accept(this.data.get(this.dataIdx++));
        if (this.dataIdx == this.data.size())
            this.data = null;
        return true;
    }
    @Override
    public Spliterator<T> trySplit() {
        return null; // Parallel processing not supported
    }
    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // Unknown
    }
    @Override
    public int characteristics() {
        return ORDERED | NONNULL;
    }
}
比thapping thaps anmage比thage的映射要容易和自然得多,因为您可以通过

name
访问列,并且具有正确键入的值,例如:

MockDao dao = new MockDao(13); Stream<SimpleRow> stream = StreamSupport.stream( new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false); stream.forEach(System.out::println);

	

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.