批量将实时数据写入Postgres表的最佳方法[已关闭]

问题描述 投票:0回答:1

我正在使用一个 C# 程序,该程序订阅期权合约数据,并在多个线程上进行更新。我想每分钟左右运行一次分析,所以我计划将交易存储在

ConcurrentQueue<PriceSub>
交易中。我当前的实现如下,这是一个函数,每当我调用它时,它都会尝试批量写入交易。这足够了吗?或者有更好的方法来做到这一点吗?目前它适用于低容量,但不确定它将如何支持大量更新。谢谢!

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Npgsql;

public struct PriceSub
{
    public DateTime _timestamp;
    public string _name;
    public string _term;
    public DateTime? _expiration;
    public string _type;
    public string _product;
    public decimal? _strike;
    public ulong _instrumentID;

    public decimal _price;
    public decimal _userTV;
    public decimal _midImpliedTV;
    public string _theoPrice;

    public decimal _userVol;
    public decimal _midImpliedVol;

    public decimal _quantity;
    public decimal _totalQuanity;
    public decimal _openInterest;

    public long _thread;
}

public class TradeWriter
{
    private ConcurrentQueue<PriceSub> _trades;
    private string _connectionString;

    public TradeWriter(ConcurrentQueue<PriceSub> trades, string connectionString)
    {
        _trades = trades;
        _connectionString = connectionString;
    }

    public async Task WriteTradesToDbAsync(int batchSize)
    {
        List<PriceSub> batch = new List<PriceSub>();

        // Dequeue trades in batches
        while (batch.Count < batchSize && _trades.TryDequeue(out var trade))
        {
            batch.Add(trade);
        }

        // If there are trades to write, proceed with batch insertion
        if (batch.Count > 0)
        {
            await InsertTradesBatchAsync(batch);
        }
    }

    private async Task InsertTradesBatchAsync(List<PriceSub> tradesBatch)
    {
        // SQL query for batch insert
        string sql = @"INSERT INTO es_options_daily 
            (_timestamp, _name, _term, _expiration, _type, _product, _strike, _instrumentID,
            _price, _userTV, _midImpliedTV, _theoPrice, _userVol, _midImpliedVol, 
            _quantity, _totalQuantity, _openInterest, _thread) 
            VALUES 
            (@timestamp, @name, @term, @expiration, @type, @product, @strike, @instrumentID,
            @price, @userTV, @midImpliedTV, @theoPrice, @userVol, @midImpliedVol,
            @quantity, @totalQuantity, @openInterest, @thread)";

        using (var connection = new NpgsqlConnection(_connectionString))
        {
            await connection.OpenAsync();

            using (var transaction = await connection.BeginTransactionAsync())
            {
                using (var command = new NpgsqlCommand(sql, connection, transaction))
                {
                    foreach (var trade in tradesBatch)
                    {
                        // Add parameters for each trade in the batch
                        command.Parameters.AddWithValue("@timestamp", trade._timestamp);
                        command.Parameters.AddWithValue("@name", trade._name);
                        command.Parameters.AddWithValue("@term", trade._term);
                        command.Parameters.AddWithValue("@expiration", trade._expiration.HasValue ? (object)trade._expiration.Value : DBNull.Value);
                        command.Parameters.AddWithValue("@type", trade._type);
                        command.Parameters.AddWithValue("@product", trade._product);
                        command.Parameters.AddWithValue("@strike", trade._strike.HasValue ? (object)trade._strike.Value : DBNull.Value);
                        command.Parameters.AddWithValue("@instrumentID", trade._instrumentID);
                        command.Parameters.AddWithValue("@price", trade._price);
                        command.Parameters.AddWithValue("@userTV", trade._userTV);
                        command.Parameters.AddWithValue("@midImpliedTV", trade._midImpliedTV);
                        command.Parameters.AddWithValue("@theoPrice", trade._theoPrice);
                        command.Parameters.AddWithValue("@userVol", trade._userVol);
                        command.Parameters.AddWithValue("@midImpliedVol", trade._midImpliedVol);
                        command.Parameters.AddWithValue("@quantity", trade._quantity);
                        command.Parameters.AddWithValue("@totalQuantity", trade._totalQuanity);
                        command.Parameters.AddWithValue("@openInterest", trade._openInterest);
                        command.Parameters.AddWithValue("@thread", trade._thread);

                        // Execute insert for the current trade in the batch
                        await command.ExecuteNonQueryAsync();

                        // Clear parameters for the next trade
                        command.Parameters.Clear();
                    }
                }

                // Commit the transaction after batch insertion
                await transaction.CommitAsync();
            }
        }
    }
}
c# postgresql multithreading
1个回答
0
投票

您有一个生产者-消费者场景,为了实现此场景,要使用的正确集合是

BlockingCollection<T>
(同步)或
Channel<T>
(异步)。当前实现的问题是,一旦
WriteTradesToDbAsync
集合暂时为空,
_trades
方法就会退出。上述两个集合都通过让消费者在集合为空时等待来处理此问题,直到添加新项目或生产者完成在集合中添加项目(它们都有一个
Complete
方法)。

这是一个简化的使用示例,其中

_trades
BlockingCollection<PriceSub>
:

public async Task WriteTradesToDbAsync(int batchSize)
{
    foreach (PriceSub trade in _trades.GetConsumingEnumerable())
    {
        await InsertTradeAsync(trade);
    }
}

由于您要批量插入交易,因此您可以在 this answer 中找到

GetConsumingEnumerableBatch
BlockingCollection<T>
扩展方法,您可以像这样使用:

public async Task WriteTradesToDbAsync(int batchSize)
{
    foreach (PriceSub[] trades in _trades.GetConsumingEnumerableBatch())
    {
        await InsertTradesBatchAsync(trades);
    }
}

使用

Channel<T>
代替
BlockingCollection<T>
在概念上应该更好,因为它在集合为空时异步等待,因此它优化了线程的使用。您可以在
这个问题
中找到类似的
ReadAllBatches
扩展方法。在使用此解决方案之前,您应该熟悉异步序列ChannelReader<T>
    

© www.soinside.com 2019 - 2024. All rights reserved.