如何使用 Spring Boot 在 MongoDb 集合中创建/保证唯一的时间戳,以限制按时间戳排序的查询而不进行分页

问题描述 投票:0回答:1

该应用程序通过在线 Mongo DB 集合在设备之间同步数据记录。 多个设备可以随时向服务器 Mongo 集合发送批量的新的或修改的记录。 设备通过请求自上次获取请求以来添加或修改的记录来获取尚未拥有的所有记录更新。

方法 1 - 是在保存到 MongoDb 之前向记录添加一个日期对象字段(称为stored1)。 当设备请求记录时,mongoDb分页用于跳过条目直到当前页面,然后限制为1000。现在数据集很大,每个页面请求都需要很长时间,mongo会遇到内存错误。
https://docs.mongodb.com/manual/reference/limits/#operations

由于某种原因,如我当前配置中发布的代码所示设置allowDiskUse(true)并不能修复内存错误。 如果可以解决这个问题,它仍然不是一个长期的解决方案,因为分页的查询时间已经太长了。

方法2:

使用java在mongodb上分页的最佳方法是什么

https://arpitbhayani.me/blogs/benchmark-and-compare-pagination-approach-in-mongodb

考虑的第二种方法是从 Mongo 分页跳过返回的记录更改为仅询问存储时间 > 最后接收的最大存储时间,直到返回的记录数小于限制。 这要求存储的时间戳在与查询匹配的所有记录之间是唯一的,否则可能会丢失记录或获取重复记录等。 在示例代码中,使用stored2字段,仍然有可能出现重复的时间戳,即使概率很低。

Mongo 有一个 BSON 时间戳,可以保证每个集合的唯一值,但我没有找到将其与文档 save() 一起使用或在 Spring Boot 中查询的方法。 需要在新插入、替换或更新的每条记录上设置它。 https://docs.mongodb.com/manual/reference/bson-types/#timestamps

关于如何做到这一点有什么建议吗?

@Getter
@Setter
public abstract class DataModel {

    private Map<String, Object> data;

    @Id // maps this field name to the database _id field, automatically indexed
    private String uid;

    /** Time this entry is written to the db (new or modified), to support querying for changes since last query */
    private Date stored1; //APPROCAH 1
    private long stored2; //APPROACH 2
}

/** SpringBoot+MongoDb database interface implementation */
@Component
@Scope("prototype")
public class SpringDb implements DbInterface {

    @Autowired
    public MongoTemplate db; // the database

    @Override
    public boolean set(Collection<?> newRecords, Collection<?> updatedRecords) {
        // get current time for this set
        Date date = new Date();
        int randomOffset = ThreadLocalRandom.current().nextInt(0, 500000);
        long startingNanoSeconds = Instant.now().getEpochSecond() * 1000000000L + instant.getNano() + randomOffset;
        int ns = 0;

        if (updatedRecords != null && updatedRecords.size() > 0) {
            for (Object entry : updatedRecords) {
                entry.setStored1(date); //APPROACH 1
                entry.setStored2(startingNs + ns++); //APPROCH 2
                db.save(entry, repoName);
            }
        }

        // for new documents only
        if (newRecords != null && newRecords.size() > 0) {
            for (DataModel entry : newRecords) {
                entry.setStored1(date); //APPROACH 1
                entry.setStored2(startingNs + ns++); // APPROACH 2
            }
    
            //multi record insert
            db.insert(newRecords, repoName);
        }

        return true;
    }

    @Override
    public List<DataModel> get(Map<String, String> params, int maxResults, int page, String sortParameter) {
        // generate query
        Query query = buildQuery(params);

        //APPROACH 1 
        // do a paged query
        Pageable pageable = PageRequest.of(page, maxResults, Direction.ASC, sortParameter);
        List<T> queryResults = db.find(query.allowDiskUse(true).with(pageable), DataModel.class, repoName); //allowDiskUse(true) not working, still get memory error
        // count total results
        Page<T> pageQuery = PageableExecutionUtils.getPage(queryResults, pageable,
        () -> db.count(Query.of(query).limit(-1).skip(-1), clazz, getRepoName(clazz)));
        // return the query results
        queryResults = pageQuery.getContent();

        //APPROACH 2
        List<T> queryResults = db.find(query.allowDiskUse(true), DataModel.class, repoName);

        return queryResults;
    }

    @Override
    public boolean update(Map<String, String> params, Map<String, Object> data) {
        // generate query
        Query query = buildQuery(params);

        //This applies the same changes to every entry
        Update update = new Update();
        for (Map.Entry<String, Object> entry : data.entrySet()) {
            update.set(entry.getKey(), entry.getValue());
        }
        db.updateMulti(query, update, DataModel.class, repoName);

        return true;
    }

    private Query buildQuery(Map<String, String> params) {
        //...
    }
}
mongodb spring-boot pagination spring-data-mongodb
1个回答
0
投票

我最终使用的解决方案是定义另一个名为storedId的字段并为其建立索引,该字段是修改记录storedTime和_id的字符串连接。 这保证了所有这些storedId记录字段都是唯一的,因为_id是唯一的。

以下示例展示了如何对连接的storedTime+_id字段进行索引和查询,而对单独的storedTime和_id字段进行索引和查询失败:

public abstract class DataModel {

    private Map<String, Object> data;

    @Indexed
    private String _id; // Unique id
    @Indexed
    private String storedTime; // Time this entry is written to the db (new or modified)

    @Indexed
    String storedId;    // String concatenation of storedTime and _id field
}

//Querying on separate fields and indexes:
{
//storedTime, _id
     "time1", "id2"
     "time1", "id3"
     "time1", "id4"
     "time2", "id1"
     "time2", "id5"
}

 get (storedTime>"time0", _id>"id0", limit=2) // returns _id's 2,3 (next query needs to check for more at storedTime="time1" but skip _id’s <="id3")
 get (storedTime>="time1", _id>"id3", limit=2) // returns _id's 4,5
//FAILS because this second query MISSES _id 1  (Note any existing _id record can be modified at any time, so the _id fields are not in storedTime order)

//Querying on the combined field and index:
     {
    //storedId
     "time1-id2"
     "time1-id3"
     "time1-id4"
     "time2-id1"
     "time2-id5"
     }

 get (storedId>"time0", limit=2) // returns _id's 2,3 (next query for values greater than the greatest last value returned)
 get (storedId>"time1-id3", limit=2) // returns _id's 4,1 (next query for values greater than the greatest last value returned)
 get (storedId>"time2-id1", limit=2) //: returns _id 5
//WORKS, this doesn't miss or duplicate any records
© www.soinside.com 2019 - 2024. All rights reserved.