我正在使用一个 C# 程序,该程序订阅期权合约数据,并在多个线程上进行更新。我想每分钟左右运行一次分析,所以我计划将交易存储在
ConcurrentQueue<PriceSub>
交易中。我当前的实现如下,这是一个函数,每当我调用它时,它都会尝试批量写入交易。这足够了吗?或者有更好的方法来做到这一点吗?目前它适用于低容量,但不确定它将如何支持大量更新。谢谢!
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Npgsql;
public struct PriceSub
{
public DateTime _timestamp;
public string _name;
public string _term;
public DateTime? _expiration;
public string _type;
public string _product;
public decimal? _strike;
public ulong _instrumentID;
public decimal _price;
public decimal _userTV;
public decimal _midImpliedTV;
public string _theoPrice;
public decimal _userVol;
public decimal _midImpliedVol;
public decimal _quantity;
public decimal _totalQuanity;
public decimal _openInterest;
public long _thread;
}
public class TradeWriter
{
private ConcurrentQueue<PriceSub> _trades;
private string _connectionString;
public TradeWriter(ConcurrentQueue<PriceSub> trades, string connectionString)
{
_trades = trades;
_connectionString = connectionString;
}
public async Task WriteTradesToDbAsync(int batchSize)
{
List<PriceSub> batch = new List<PriceSub>();
// Dequeue trades in batches
while (batch.Count < batchSize && _trades.TryDequeue(out var trade))
{
batch.Add(trade);
}
// If there are trades to write, proceed with batch insertion
if (batch.Count > 0)
{
await InsertTradesBatchAsync(batch);
}
}
private async Task InsertTradesBatchAsync(List<PriceSub> tradesBatch)
{
// SQL query for batch insert
string sql = @"INSERT INTO es_options_daily
(_timestamp, _name, _term, _expiration, _type, _product, _strike, _instrumentID,
_price, _userTV, _midImpliedTV, _theoPrice, _userVol, _midImpliedVol,
_quantity, _totalQuantity, _openInterest, _thread)
VALUES
(@timestamp, @name, @term, @expiration, @type, @product, @strike, @instrumentID,
@price, @userTV, @midImpliedTV, @theoPrice, @userVol, @midImpliedVol,
@quantity, @totalQuantity, @openInterest, @thread)";
using (var connection = new NpgsqlConnection(_connectionString))
{
await connection.OpenAsync();
using (var transaction = await connection.BeginTransactionAsync())
{
using (var command = new NpgsqlCommand(sql, connection, transaction))
{
foreach (var trade in tradesBatch)
{
// Add parameters for each trade in the batch
command.Parameters.AddWithValue("@timestamp", trade._timestamp);
command.Parameters.AddWithValue("@name", trade._name);
command.Parameters.AddWithValue("@term", trade._term);
command.Parameters.AddWithValue("@expiration", trade._expiration.HasValue ? (object)trade._expiration.Value : DBNull.Value);
command.Parameters.AddWithValue("@type", trade._type);
command.Parameters.AddWithValue("@product", trade._product);
command.Parameters.AddWithValue("@strike", trade._strike.HasValue ? (object)trade._strike.Value : DBNull.Value);
command.Parameters.AddWithValue("@instrumentID", trade._instrumentID);
command.Parameters.AddWithValue("@price", trade._price);
command.Parameters.AddWithValue("@userTV", trade._userTV);
command.Parameters.AddWithValue("@midImpliedTV", trade._midImpliedTV);
command.Parameters.AddWithValue("@theoPrice", trade._theoPrice);
command.Parameters.AddWithValue("@userVol", trade._userVol);
command.Parameters.AddWithValue("@midImpliedVol", trade._midImpliedVol);
command.Parameters.AddWithValue("@quantity", trade._quantity);
command.Parameters.AddWithValue("@totalQuantity", trade._totalQuanity);
command.Parameters.AddWithValue("@openInterest", trade._openInterest);
command.Parameters.AddWithValue("@thread", trade._thread);
// Execute insert for the current trade in the batch
await command.ExecuteNonQueryAsync();
// Clear parameters for the next trade
command.Parameters.Clear();
}
}
// Commit the transaction after batch insertion
await transaction.CommitAsync();
}
}
}
}
您有一个生产者-消费者场景,为了实现此场景,要使用的正确集合是
BlockingCollection<T>
(同步)或 Channel<T>
(异步)。当前实现的问题是,一旦 WriteTradesToDbAsync
集合暂时为空,_trades
方法就会退出。上述两个集合都通过让消费者在集合为空时等待来处理此问题,直到添加新项目或生产者完成在集合中添加项目(它们都有一个 Complete
方法)。
这是一个简化的使用示例,其中
_trades
是 BlockingCollection<PriceSub>
:
public async Task WriteTradesToDbAsync(int batchSize)
{
foreach (PriceSub trade in _trades.GetConsumingEnumerable())
{
await InsertTradeAsync(trade);
}
}
由于您要批量插入交易,因此您可以在 this answer 中找到
GetConsumingEnumerableBatch
的 BlockingCollection<T>
扩展方法,您可以像这样使用:
public async Task WriteTradesToDbAsync(int batchSize)
{
foreach (PriceSub[] trades in _trades.GetConsumingEnumerableBatch())
{
await InsertTradesBatchAsync(trades);
}
}
使用
Channel<T>
代替 BlockingCollection<T>
在概念上应该更好,因为它在集合为空时异步等待,因此它优化了线程的使用。您可以在这个问题中找到类似的
ReadAllBatches
扩展方法。在使用此解决方案之前,您应该熟悉异步序列ChannelReader<T>
。