使用 NSubstitute 模拟 Azure 函数中的 EventHubAsyncCollector

问题描述 投票:0回答:1

我有一个简单的Azure功能:

[FunctionName("TestFunction")]
public async Task Run([CosmosDBTrigger(
    databaseName: "Test",
    containerName: "test",
    CreateLeaseContainerIfNotExists = true,
    MaxItemsPerInvocation = 200,
    FeedPollDelay = 10000,
    Connection = "TestConnection",            
    LeaseContainerName = "testLeases")]IReadOnlyList<Document> inputs,
    [EventHub("test", Connection = "TestConnectionString")] IAsyncCollector<EventData> outputEvents,
    ILogger log)
{
    if (inputs != null && inputs.Count > 0)
    {
        try
        {
            await inputs.ForEachAsync(dop: 10, input =>
            {
                var id = input.GetPropertyValue<string>("id");
                var eventData = new EventData(Encoding.UTF8.GetBytes(input.ToString()));
                return outputEvents.AddAsync(eventData, id);
            });
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, $"Error occurred while sending reports feed to datalake: {exception.Message}");
        }
    }
}

我正在尝试模拟 outputEvents 并断言 AddAsync(eventData, id) 已收到调用。 虽然对于 AddAsync(eventData) 来说没有问题,但对于上述情况来说有点问题,因为我收到了来自 EventHubWebJobsExtensions 的异常:

public static Task AddAsync(this IAsyncCollector<EventData> instance, EventData eventData, string partitionKey, CancellationToken cancellationToken = default(CancellationToken)) =>
    instance switch
    {
        EventHubAsyncCollector ehCollector => ehCollector.AddAsync(eventData, partitionKey, cancellationToken),
        _ => throw new InvalidOperationException("Adding with a partition key is only available when using the Event Hubs extension package.")
    };

对于 AddAsync(eventData) 我只需使用建议的方法:

    public class MockAsyncCollector<T> : IAsyncCollector<T>
    {
        public readonly List<T> Items = new List<T>();

        public Task AddAsync(T item, CancellationToken cancellationToken = default)
        {
            Items.Add(item);
            return Task.FromResult(true);
        }

        public Task FlushAsync(CancellationToken cancellationToken = default)
        {
            return Task.FromResult(true);
        }
    }

这工作正常,但我无法找出 AddAsync(eventData, id) 的解决方案。

目前我的测试如下:

public class Tests
{
    private readonly MockLogger<Test> _logger;
    private readonly Test _processor;

    public Tests()
    {
        _logger = Substitute.For<MockLogger<Test>>();
        _processor = new Test(_logger);
    }
    
    [Fact]
    public async Task Run_GivenProperMessage_ShouldNotThrowAndCallAddAsync()
    {
        // Arrange
        var input = new List<Document>();
        var inputDocument = new Document();
        inputDocument.SetPropertyValue("id", Guid.NewGuid().ToString());
        input.Add(inputDocument);

        var output = Substitute.For<IAsyncCollector<EventData>>();
        
        // Act
        var act = async () => await _processor.Run(input, output, _logger);

        // Assert
        await act.Should().NotThrowAsync();
        await output.Received(1).AddAsync(Arg.Any<EventData>(), Arg.Any<string>());
    }
}

异常的原因似乎是一个开关,它需要 EventHubAsyncCollector,而不是得到一个 NSubstitute ObjectProxy。

c# .net azure-functions azure-eventhub nsubstitute
1个回答
0
投票

该问题是由于测试中的

IAsyncCollector<EventData>
实例是 NSubstitute 的替代品,并且由于
AddAsync
扩展方法中的 switch 语句,无法正确拦截
AddAsync
方法。

  • 创建
    IAsyncCollector<EventData>
    的自定义实现,以正确实现具有分区键支持的
    AddAsync
    方法。
public class MockEventHubAsyncCollector : IAsyncCollector<EventData>
{
    public readonly List<(EventData, string)> Items = new List<(EventData, string)>();

    public Task AddAsync(EventData item, string partitionKey, CancellationToken cancellationToken = default)
    {
        Items.Add((item, partitionKey));
        return Task.CompletedTask;
    }

    public Task FlushAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}

测试:

// Arrange
var input = new List<Document>();
var inputDocument = new Document();
var id = Guid.NewGuid().ToString();
inputDocument.SetPropertyValue("id", id);
input.Add(inputDocument);

var output = new MockEventHubAsyncCollector();
    
// Act
var act = async () => await _processor.Run(input, output, _logger);

// Assert
await act.Should().NotThrowAsync();
output.Items.Should().HaveCount(1);
output.Items[0].Item2.Should().Be(id); // Check the partition key

这是我用于测试目的的自定义实现:

using System.Collections.Generic; using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using System.Threading.Tasks; namespace FunctionApp23 { public class Function1 { private readonly ILogger _logger; public Function1(ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<Function1>(); } [Function("Function1")] public async Task Run( [CosmosDBTrigger( databaseName: "databaseName", collectionName: "collectionName", ConnectionStringSetting = "CONN_STRING", LeaseCollectionName = "leases")] IReadOnlyList<MyDocument> input, [EventHub("test", Connection = "EventHubConnectionString")] IAsyncCollector<MyEventData> outputEvents) { if (input != null && input.Count > 0) { _logger.LogInformation("Documents modified: " + input.Count); _logger.LogInformation("First document Id: " + input[0].Id); // Create a custom mock for IAsyncCollector<MyEventData> var mockOutput = new MockEventHubAsyncCollector(); // Process the documents and send to Event Hub foreach (var document in input) { var eventData = new MyEventData { Id = document.Id, Text = document.Text, Number = document.Number, Boolean = document.Boolean }; await outputEvents.AddAsync(eventData, document.Id); } // Use the mock to assert expectations // For example: Assert that AddAsync was called with the correct parameters // mockOutput.Items should contain the expected values } } } public class MyEventData { public string Id { get; set; } public string Text { get; set; } public int Number { get; set; } public bool Boolean { get; set; } } public class MockEventHubAsyncCollector : IAsyncCollector<MyEventData> { public readonly List<(MyEventData, string)> Items = new List<(MyEventData, string)>(); public Task AddAsync(MyEventData item, string partitionKey, System.Threading.CancellationToken cancellationToken = default) { Items.Add((item, partitionKey)); return Task.CompletedTask; } public Task FlushAsync(System.Threading.CancellationToken cancellationToken = default) => Task.CompletedTask; } }

样本数据:

enter image description here

    事件集(
  • MyEventData
    实例)表示来自 Cosmos DB 触发器的转换数据,每个事件都与正确的分区键关联。预期结果的具体细节取决于Azure Function中给出的输入文档。

制作活动:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.