我知道许多其他人也问过有关没有主键的 EF Core 的类似问题,但我没有找到解决以下问题的方法。
如果我使用原始 SQL 命令,它可以工作:
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_EF_Core",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topicName);
var dbContext = new PubDbContext(connectionString);
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize<JsonObject>(consumeResult.Message.Value);
if (jsonObject != null)
{
int? outerTableColumn1Value = null;
string? outerTableColumn2Value = null;
int? innerTableColumn1Value = null;
string? innerTableColumn2Value = null;
outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int parsedNumber)
? parsedNumber : null;
outerTableColumn2Value = jsonObject["outer2"]?.ToString();
var inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
var inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
if (!string.IsNullOrEmpty(inner1Value) || !string.IsNullOrEmpty(inner2Value))
{
innerTableColumn1Value = int.TryParse(inner1Value, out parsedNumber)
? parsedNumber : null;
innerTableColumn2Value = inner2Value;
Console.WriteLine("Inner table exists.");
}
else
{
Console.WriteLine("Inner table does not exist.");
}
var transaction = dbContext.Database.BeginTransaction();
try
{
string sqlCommand = @"SELECT TOP 1 * FROM OuterTable WHERE Outer1 = {0}";
string updateCommand = "";
string insertCommand = "";
var outerEntity = await dbContext.OuterTable
.FromSqlRaw(sqlCommand, outerTableColumn1Value)
.FirstOrDefaultAsync();
if (outerEntity != null)
{
updateCommand = @"UPDATE OuterTable SET Outer2 = {1} WHERE Outer1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row updated.");
}
else
{
insertCommand = @"INSERT INTO OuterTable (Outer1, Outer2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row inserted.");
}
sqlCommand = @"SELECT TOP 1 * FROM InnerTable WHERE Inner1 = {0}";
var innerEntity = await dbContext.InnerTable
.FromSqlRaw(sqlCommand, innerTableColumn1Value)
.FirstOrDefaultAsync();
if (innerEntity != null)
{
updateCommand = @"UPDATE InnerTable SET Inner2 = {1} WHERE Inner1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row updated.");
}
else
{
insertCommand = @"INSERT INTO InnerTable (Inner1, Inner2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row inserted.");
}
await transaction.CommitAsync();
}
catch (Exception ex)
{
await transaction.RollbackAsync();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class PubDbContext : DbContext
{
private readonly string _connectionString;
public PubDbContext(string connectionString)
{
_connectionString = connectionString;
}
public DbSet<OuterTable> OuterTable { get; set; }
public DbSet<InnerTable> InnerTable { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(_connectionString);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OuterTable>().HasNoKey();
modelBuilder.Entity<InnerTable>().HasNoKey();
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
输出到SQL Server,即OuterTable、InnerTable。事实上,OuterTable 和 InnerTable 已经在 SQL Server 中使用空数据创建,并且它们没有主键。
无论如何,来源来自 Couchbase JSON 文档(使用 Kafka 接收消息),如下所示: 所以我正在处理嵌套的 JSON 文档。其中一些具有内部结构。
不管怎样,别人告诉我,上面的代码背离了ORM的精神,因为应该避免在代码中执行原始的SQL命令。所以我遇到了以下情况。
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_ORM",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
var optionsBuilder = new DbContextOptionsBuilder<ApplicationDbContext>();
optionsBuilder.UseSqlServer(connectionString);
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topicName);
int i = 0;
int j = 0;
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize<JsonObject>(consumeResult.Message.Value);
if (jsonObject != null)
{
int outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int outer1Parsed) ? outer1Parsed : --i;
string? outerTableColumn2Value = jsonObject["outer2"]?.ToString();
string? inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
string? inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
int innerTableColumn1Value = int.TryParse(inner1Value, out int inner1Parsed) ? inner1Parsed : --j;
string? innerTableColumn2Value = inner2Value;
using (var context = new ApplicationDbContext(optionsBuilder.Options))
{
using (var transaction = context.Database.BeginTransaction())
{
try
{
var outerRow = await context.OuterTable
.FirstOrDefaultAsync(o => o.Outer1 == outerTableColumn1Value);
if (outerRow != null)
{
outerRow.Outer2 = outerTableColumn2Value;
context.OuterTable.Update(outerRow);
Console.WriteLine("Outer table row updated.");
}
else
{
var outerTable = new OuterTable
{
Outer1 = outerTableColumn1Value,
Outer2 = outerTableColumn2Value
};
await context.OuterTable.AddAsync(outerTable);
Console.WriteLine("Outer table row inserted.");
}
var innerRow = await context.InnerTable
.FirstOrDefaultAsync(o => o.Inner1 == innerTableColumn1Value);
if (innerRow != null)
{
innerRow.Inner2 = innerTableColumn2Value;
context.InnerTable.Update(innerRow);
Console.WriteLine("Inner table row updated.");
}
else
{
var innerTable = new InnerTable
{
Inner1 = innerTableColumn1Value,
Inner2 = innerTableColumn2Value
};
await context.InnerTable.AddAsync(innerTable);
Console.WriteLine("Inner table row inserted.");
}
await context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class ApplicationDbContext : DbContext
{
public DbSet<OuterTable> OuterTable { get; set; }
public DbSet<InnerTable> InnerTable { get; set; }
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OuterTable>().HasKey(o => o.Outer1);
modelBuilder.Entity<InnerTable>().HasKey(i => i.Inner1);
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
正如您在第二个代码中看到的那样,我输入了
modelBuilder.Entity<OuterTable>().HasKey(o => o.Outer1);
modelBuilder.Entity<InnerTable>().HasKey(i => i.Inner1);
但这实际上不是我打算这样做的,因为没有主键。但我需要使用这些
Update
和 AddAsync
以便我被迫拥有主键,如果没有,EF Core 将显示错误。因此,我被迫为这些 NULL 值输入一些值,我只是输入负整数。
我看到帖子建议使用复合键,但问题是,有些 JSON 文档根本没有外部/内部结构,但我仍然坚持将 NULL 值放入其中。所以合成键在我的情况下似乎不起作用。
这里我还附上了我的第一个代码生成的 SQL Server 中的表,以及我真正想要的。
在 EF Core 中,您无法创建没有任何主键的表。因为它用于对象的源。您不仅可以使用 Int32 值,还可以通过 GUID 和 String 链接它们。无论如何,您必须将键绑定到字典等值才能控制它