-
Notifications
You must be signed in to change notification settings - Fork 516
Expand file tree
/
Copy pathEventBusExtensions.cs
More file actions
59 lines (52 loc) · 2.21 KB
/
EventBusExtensions.cs
File metadata and controls
59 lines (52 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
namespace EvolutionaryArchitecture.Fitnet.Common.IntegrationTestsToolbox.TestEngine.EventBus;
using MassTransit;
using MassTransit.Testing;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.DependencyInjection;
public static class EventBusExtensions
{
public static ITestHarness GetTestExternalEventBus<T>(
this WebApplicationFactory<T> webApplicationFactory) where T : class =>
webApplicationFactory.Services.GetRequiredService<ITestHarness>();
public static WebApplicationFactory<T> WithTestEventBus<T>(
this WebApplicationFactory<T> webApplicationFactory,
params Type[] consumerTypes)
where T : class =>
webApplicationFactory.WithWebHostBuilder(webHostBuilder =>
webHostBuilder.ConfigureTestServices(services =>
{
services.AddMassTransitTestHarness(configurator =>
{
foreach (var consumerType in consumerTypes)
{
configurator.AddConsumer(consumerType);
}
});
}));
public static async Task<IReceivedMessage<TMessage>?> WaitToConsumeMessageAsync<TMessage>(
this ITestHarness testHarness,
CancellationToken cancellationToken = default)
where TMessage : class
{
var result = await testHarness.WaitToConsumeMessagesAllAsync<TMessage>(1, cancellationToken: cancellationToken);
return result.SingleOrDefault();
}
public static async Task<IEnumerable<IReceivedMessage<TMessage>>> WaitToConsumeMessagesAllAsync<TMessage>(
this ITestHarness testHarness,
int messageCount,
int maxDelaySeconds = 15,
CancellationToken cancellationToken = default)
where TMessage : class
{
var consumedMessages = testHarness.Consumed!.Select<TMessage>(cancellationToken).ToList();
for (var i = 0; i < maxDelaySeconds; i++)
{
var shouldWait = consumedMessages.Count < messageCount;
if (shouldWait)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
return [.. testHarness.Consumed!.Select<TMessage>(cancellationToken)];
}
}