我有一个简单的Azure功能:
[FunctionName("TestFunction")]
public async Task Run([CosmosDBTrigger(
databaseName: "Test",
containerName: "test",
CreateLeaseContainerIfNotExists = true,
MaxItemsPerInvocation = 200,
FeedPollDelay = 10000,
Connection = "TestConnection",
LeaseContainerName = "testLeases")]IReadOnlyList<Document> inputs,
[EventHub("test", Connection = "TestConnectionString")] IAsyncCollector<EventData> outputEvents,
ILogger log)
{
if (inputs != null && inputs.Count > 0)
{
try
{
await inputs.ForEachAsync(dop: 10, input =>
{
var id = input.GetPropertyValue<string>("id");
var eventData = new EventData(Encoding.UTF8.GetBytes(input.ToString()));
return outputEvents.AddAsync(eventData, id);
});
}
catch (Exception exception)
{
_logger.LogError(exception, $"Error occurred while sending reports feed to datalake: {exception.Message}");
}
}
}
我正在尝试模拟 outputEvents 并断言 AddAsync(eventData, id) 已收到调用。 虽然对于 AddAsync(eventData) 来说没有问题,但对于上述情况来说有点问题,因为我收到了来自 EventHubWebJobsExtensions 的异常:
public static Task AddAsync(this IAsyncCollector<EventData> instance, EventData eventData, string partitionKey, CancellationToken cancellationToken = default(CancellationToken)) =>
instance switch
{
EventHubAsyncCollector ehCollector => ehCollector.AddAsync(eventData, partitionKey, cancellationToken),
_ => throw new InvalidOperationException("Adding with a partition key is only available when using the Event Hubs extension package.")
};
对于 AddAsync(eventData) 我只需使用建议的方法:
public class MockAsyncCollector<T> : IAsyncCollector<T>
{
public readonly List<T> Items = new List<T>();
public Task AddAsync(T item, CancellationToken cancellationToken = default)
{
Items.Add(item);
return Task.FromResult(true);
}
public Task FlushAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(true);
}
}
这工作正常,但我无法找出 AddAsync(eventData, id) 的解决方案。
目前我的测试如下:
public class Tests
{
private readonly MockLogger<Test> _logger;
private readonly Test _processor;
public Tests()
{
_logger = Substitute.For<MockLogger<Test>>();
_processor = new Test(_logger);
}
[Fact]
public async Task Run_GivenProperMessage_ShouldNotThrowAndCallAddAsync()
{
// Arrange
var input = new List<Document>();
var inputDocument = new Document();
inputDocument.SetPropertyValue("id", Guid.NewGuid().ToString());
input.Add(inputDocument);
var output = Substitute.For<IAsyncCollector<EventData>>();
// Act
var act = async () => await _processor.Run(input, output, _logger);
// Assert
await act.Should().NotThrowAsync();
await output.Received(1).AddAsync(Arg.Any<EventData>(), Arg.Any<string>());
}
}
异常的原因似乎是一个开关,它需要 EventHubAsyncCollector,而不是得到一个 NSubstitute ObjectProxy。
该问题是由于测试中的
IAsyncCollector<EventData>
实例是 NSubstitute 的替代品,并且由于 AddAsync
扩展方法中的 switch 语句,无法正确拦截 AddAsync
方法。
IAsyncCollector<EventData>
的自定义实现,以正确实现具有分区键支持的 AddAsync
方法。public class MockEventHubAsyncCollector : IAsyncCollector<EventData>
{
public readonly List<(EventData, string)> Items = new List<(EventData, string)>();
public Task AddAsync(EventData item, string partitionKey, CancellationToken cancellationToken = default)
{
Items.Add((item, partitionKey));
return Task.CompletedTask;
}
public Task FlushAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}
测试:
// Arrange
var input = new List<Document>();
var inputDocument = new Document();
var id = Guid.NewGuid().ToString();
inputDocument.SetPropertyValue("id", id);
input.Add(inputDocument);
var output = new MockEventHubAsyncCollector();
// Act
var act = async () => await _processor.Run(input, output, _logger);
// Assert
await act.Should().NotThrowAsync();
output.Items.Should().HaveCount(1);
output.Items[0].Item2.Should().Be(id); // Check the partition key
这是我用于测试目的的自定义实现:
using System.Collections.Generic;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace FunctionApp23
{
public class Function1
{
private readonly ILogger _logger;
public Function1(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<Function1>();
}
[Function("Function1")]
public async Task Run(
[CosmosDBTrigger(
databaseName: "databaseName",
collectionName: "collectionName",
ConnectionStringSetting = "CONN_STRING",
LeaseCollectionName = "leases")] IReadOnlyList<MyDocument> input,
[EventHub("test", Connection = "EventHubConnectionString")] IAsyncCollector<MyEventData> outputEvents)
{
if (input != null && input.Count > 0)
{
_logger.LogInformation("Documents modified: " + input.Count);
_logger.LogInformation("First document Id: " + input[0].Id);
// Create a custom mock for IAsyncCollector<MyEventData>
var mockOutput = new MockEventHubAsyncCollector();
// Process the documents and send to Event Hub
foreach (var document in input)
{
var eventData = new MyEventData
{
Id = document.Id,
Text = document.Text,
Number = document.Number,
Boolean = document.Boolean
};
await outputEvents.AddAsync(eventData, document.Id);
}
// Use the mock to assert expectations
// For example: Assert that AddAsync was called with the correct parameters
// mockOutput.Items should contain the expected values
}
}
}
public class MyEventData
{
public string Id { get; set; }
public string Text { get; set; }
public int Number { get; set; }
public bool Boolean { get; set; }
}
public class MockEventHubAsyncCollector : IAsyncCollector<MyEventData>
{
public readonly List<(MyEventData, string)> Items = new List<(MyEventData, string)>();
public Task AddAsync(MyEventData item, string partitionKey, System.Threading.CancellationToken cancellationToken = default)
{
Items.Add((item, partitionKey));
return Task.CompletedTask;
}
public Task FlushAsync(System.Threading.CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}
样本数据:
MyEventData
实例)表示来自 Cosmos DB 触发器的转换数据,每个事件都与正确的分区键关联。预期结果的具体细节取决于Azure Function中给出的输入文档。
制作活动: