< Summary

Information
Class: ClientManager.Worker.Messaging.RabbitMQMessageConsumer
Assembly: ClientManager.Worker
File(s): /home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Worker/Messaging/RabbitMQMessageConsumer.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 61
Coverable lines: 61
Total lines: 111
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 30
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
StartAsync()0%620%
SubscribeAsync()100%210%
<SubscribeAsync()100%210%
StopAsync()0%620%
DiscoverMessageTypes()0%2040%
DispatchToHandlers()0%506220%
GetMessageContext(...)100%210%

File(s)

/home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Worker/Messaging/RabbitMQMessageConsumer.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using System.Text;
 3using System.Text.Json;
 4using ClientManager.Shared.Messaging;
 5using RabbitMQ.Client;
 6using RabbitMQ.Client.Events;
 7
 8namespace ClientManager.Worker.Messaging;
 9
 010public class RabbitMQMessageConsumer(IServiceScopeFactory serviceScopeFactory, IMessageBrokerFactory messageBrokerFactor
 11    : IMessageConsumer
 12{
 013    readonly IServiceScopeFactory _scopeFactory = serviceScopeFactory;
 014    readonly IMessageBrokerFactory _messageBrokerFactory = messageBrokerFactory;
 015    readonly ILogger<RabbitMQMessageConsumer> _logger = logger;
 016    readonly ConcurrentDictionary<string, IChannel> _channels = new();
 017    readonly IReadOnlyDictionary<string, Type> _messageTypeCache = DiscoverMessageTypes().ToDictionary(t => t.Name, t =>
 18
 19    public async Task StartAsync(CancellationToken cancellationToken)
 20    {
 021        foreach (var messageType in _messageTypeCache.Values)
 22        {
 023            await SubscribeAsync(messageType.Name, cancellationToken);
 24        }
 025    }
 26
 27    async Task SubscribeAsync(string queueName, CancellationToken cancellationToken)
 28    {
 029        var channel = await _messageBrokerFactory.GetConsumeChannelAsync(queueName);
 030        _channels.AddOrUpdate(queueName, channel, (key, oldValue) => channel);
 31
 032        var consumer = new AsyncEventingBasicConsumer(channel);
 33
 034        consumer.ReceivedAsync += async (_, ea) =>
 035        {
 036            var json = Encoding.UTF8.GetString(ea.Body.ToArray());
 037
 038            try
 039            {
 040                await DispatchToHandlers(queueName, json, cancellationToken);
 041                await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken);
 042            }
 043            catch (Exception ex)
 044            {
 045                _logger.LogError(ex, "Failed to process message from {queue}", queueName);
 046                await channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
 047            }
 048        };
 49
 050        await channel.BasicConsumeAsync(queueName, autoAck: false, consumer);
 051        _logger.LogInformation("Listening on queue: {queue}", queueName);
 052    }
 53
 54    public async Task StopAsync(CancellationToken cancellationToken)
 55    {
 056        foreach (var (queue, channel) in _channels)
 57        {
 058            await channel.BasicCancelAsync(queue, cancellationToken: cancellationToken);
 059            await channel.CloseAsync(cancellationToken);
 060        }
 61
 062        _logger.LogInformation("All channels closed.");
 063    }
 64
 65    static IEnumerable<Type> DiscoverMessageTypes() =>
 066        AppDomain
 067            .CurrentDomain.GetAssemblies()
 068            .SelectMany(assembly => assembly.GetTypes())
 069            .Where(type => typeof(ICommand).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
 070            .ToList();
 71
 72    async Task DispatchToHandlers(string queueName, string json, CancellationToken cancellationToken)
 73    {
 74        // Need to create a scope inside the singleton for every message
 075        await using var scope = _scopeFactory.CreateAsyncScope();
 076        var messageContextAccessor = scope.ServiceProvider.GetRequiredService<IMessageContextAccessor>();
 77
 078        if (!_messageTypeCache.TryGetValue(queueName, out var messageType))
 79        {
 080            _logger.LogWarning("No message type found for queue: {queue}", queueName);
 081            return;
 82        }
 83
 084        var envelope = JsonSerializer.Deserialize<MessageEnvelope<object>>(json);
 085        ArgumentNullException.ThrowIfNull(envelope);
 86
 087        var jsonElement = (JsonElement)envelope.Message;
 088        var message = jsonElement.Deserialize(messageType);
 89
 090        var messageContext = GetMessageContext(envelope);
 091        messageContextAccessor.SetCurrentContext(messageContext);
 92
 093        _logger.LogInformation("Received message from {queue}: \n\t[correlationId: {correlationId}]\n\t{@Message}", queu
 94
 95        // Find handler registered for that message type
 096        var handlerType = typeof(IHandleMessage<>).MakeGenericType(messageType);
 097        var handlers = scope.ServiceProvider.GetServices(handlerType);
 98
 099        foreach (var handler in handlers)
 100        {
 0101            if (handler is not null && message is not null)
 102            {
 0103                dynamic dynamicHandler = handler;
 0104                await dynamicHandler.HandleAsync((dynamic)message, messageContext, cancellationToken);
 105            }
 106        }
 0107    }
 108
 109    static MessageContext GetMessageContext(MessageEnvelope<object> envelope) =>
 0110        new MessageContext(CorrelationId: envelope.CorrelationId, CausationId: envelope.CausationId, Timestamp: envelope
 111}