< Summary

Information
Class: ClientManager.Shared.Messaging.MessageBrokerFactory
Assembly: ClientManager.Shared
File(s): /home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Shared/Messaging/MessageBrokerFactory.cs
Line coverage
68%
Covered lines: 66
Uncovered lines: 31
Coverable lines: 97
Total lines: 180
Line coverage: 68%
Branch coverage
53%
Covered branches: 30
Total branches: 56
Branch coverage: 53.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
GetConnectionAsync()100%44100%
GetOrCreateChannelAsync()100%22100%
GetPublishChannelAsync()87.5%88100%
GetConsumeChannelAsync()0%7280%
GetConnectionFactory()100%210%
OnAck()100%11100%
OnNack()100%210%
OnBasicReturn(...)0%110100%
CleanOutstandingConfirms()60%131069.23%
DeclareAndBindQueue()75%88100%
DisposeAsync()83.33%6685.71%

File(s)

/home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Shared/Messaging/MessageBrokerFactory.cs

#LineLine coverage
 1using System.Buffers.Binary;
 2using System.Collections.Concurrent;
 3using ClientManager.Shared.Configuration;
 4using RabbitMQ.Client;
 5using RabbitMQ.Client.Events;
 6
 7namespace ClientManager.Shared.Messaging;
 8
 19public class MessageBrokerFactory(RabbitMQConnectionConfiguration rabbitMQConnectionConfiguration) : IMessageBrokerFacto
 10{
 11    IConnection? _connection;
 112    readonly ConcurrentDictionary<string, IChannel> _channels = [];
 13    const string exchangeName = "client-manager";
 114    readonly LinkedList<ulong> outstandingConfirms = new();
 115    readonly SemaphoreSlim semaphore = new(1, 1);
 116    readonly TaskCompletionSource<bool> allMessagesConfirmedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously
 17    const int MESSAGE_COUNT = 10000;
 18    int confirmedCount = 0;
 19
 120    readonly ConnectionFactory _connectionFactory =
 121        new()
 122        {
 123            Uri = new Uri(rabbitMQConnectionConfiguration.Url),
 124            Port = rabbitMQConnectionConfiguration.AmqpPort,
 125            VirtualHost = rabbitMQConnectionConfiguration.VirtualHost,
 126            UserName = rabbitMQConnectionConfiguration.Username,
 127            Password = rabbitMQConnectionConfiguration.Password
 128        };
 29
 30    public async ValueTask<IConnection> GetConnectionAsync()
 31    {
 232        if (_connection != null && _connection.IsOpen)
 133            return _connection;
 34
 135        _connection = await _connectionFactory.CreateConnectionAsync();
 136        return _connection;
 237    }
 38
 39    public async ValueTask<IChannel> GetOrCreateChannelAsync(string channelName, CreateChannelOptions? options = null)
 40    {
 241        _connection = await GetConnectionAsync();
 42
 243        if (!_channels.TryGetValue(channelName, out var channel))
 44        {
 245            channel = await _connection.CreateChannelAsync(options);
 246            _channels.AddOrUpdate(channelName, channel, (key, oldValue) => channel);
 47        }
 48
 249        return channel;
 250    }
 51
 52    public async ValueTask<IChannel> GetPublishChannelAsync(string exchange, CreateChannelOptions? options = null)
 53    {
 454        exchange = string.IsNullOrWhiteSpace(exchange) ? exchangeName : exchange;
 55
 456        if (!_channels.TryGetValue(exchange, out IChannel? existing) || existing is null)
 57        {
 158            options ??= new CreateChannelOptions(
 159                publisherConfirmationsEnabled: true,
 160                publisherConfirmationTrackingEnabled: true,
 161                outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(1000)
 162            );
 163            var channel = await GetOrCreateChannelAsync(exchange, options);
 164            await channel.ExchangeDeclareAsync(exchange, "direct", durable: true, autoDelete: false, arguments: null);
 165            channel.BasicAcksAsync += OnAck;
 166            channel.BasicNacksAsync += OnNack;
 167            channel.BasicReturnAsync += OnBasicReturn;
 168            return channel;
 69        }
 70
 371        return existing;
 472    }
 73
 74    public async ValueTask<IChannel> GetConsumeChannelAsync(string queueName, string exchange = "", string routingKey = 
 75    {
 076        exchange = string.IsNullOrWhiteSpace(exchange) ? exchangeName : exchange;
 077        routingKey = string.IsNullOrWhiteSpace(routingKey) ? queueName : routingKey;
 78
 079        if (!_channels.TryGetValue(queueName, out IChannel? existing) || existing is null)
 80        {
 081            var channel = await GetOrCreateChannelAsync(queueName, options);
 082            await channel.ExchangeDeclareAsync(exchange, "direct", durable: true, autoDelete: false, arguments: null);
 083            await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false, autoDelete: false, arguments: nu
 084            await channel.QueueBindAsync(queueName, exchange, routingKey);
 085            return channel;
 86        }
 87
 088        return existing;
 089    }
 90
 091    public ConnectionFactory GetConnectionFactory() => _connectionFactory;
 92
 493    async Task OnAck(object sender, BasicAckEventArgs args) => await CleanOutstandingConfirms(args.DeliveryTag, args.Mul
 94
 095    async Task OnNack(object sender, BasicNackEventArgs args) => await CleanOutstandingConfirms(args.DeliveryTag, args.M
 96
 97    Task OnBasicReturn(object sender, BasicReturnEventArgs args)
 98    {
 099        var props = args.BasicProperties;
 0100        ulong sequenceNumber = 0;
 101
 0102        if (props.Headers != null && props.Headers.TryGetValue(Constants.PublishSequenceNumberHeader, out var headerValu
 103        {
 0104            sequenceNumber = headerValue switch
 0105            {
 0106                byte[] bytes => BinaryPrimitives.ReadUInt64BigEndian(bytes),
 0107                long longValue => (ulong)longValue,
 0108                int intValue => (ulong)intValue,
 0109                _ => 0
 0110            };
 111        }
 112
 0113        Console.WriteLine(
 0114            $"{DateTime.Now} [WARNING] message has been basic.return-ed: Exchange={args.Exchange}, RoutingKey={args.Rout
 0115        );
 0116        return Task.CompletedTask;
 117    }
 118
 119    async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
 120    {
 4121        await semaphore.WaitAsync();
 122        try
 123        {
 4124            if (multiple)
 125            {
 0126                while (outstandingConfirms.First is { } node && node.Value <= deliveryTag)
 127                {
 0128                    outstandingConfirms.RemoveFirst();
 0129                    confirmedCount++;
 0130                }
 131            }
 132            else
 133            {
 4134                outstandingConfirms.Remove(deliveryTag);
 4135                confirmedCount++;
 136            }
 4137        }
 138        finally
 139        {
 4140            semaphore.Release();
 141        }
 142
 4143        if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
 144        {
 4145            allMessagesConfirmedTcs.TrySetResult(true);
 146        }
 4147    }
 148
 149    public async ValueTask<IChannel> DeclareAndBindQueue(string queueName, string exchange = exchangeName, string routin
 150    {
 2151        exchange = string.IsNullOrWhiteSpace(exchange) ? exchangeName : exchange;
 2152        routingKey = string.IsNullOrWhiteSpace(routingKey) ? queueName : routingKey;
 153
 2154        if (!_channels.TryGetValue(queueName, out IChannel? existing) || existing is null)
 155        {
 1156            var channel = await GetOrCreateChannelAsync(queueName, options);
 1157            await channel.ExchangeDeclareAsync(exchange, "direct", durable: true, autoDelete: false, arguments: null);
 1158            await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false, autoDelete: false, arguments: nu
 1159            await channel.QueueBindAsync(queueName, exchange, routingKey);
 1160            return channel;
 161        }
 162
 1163        return existing;
 2164    }
 165
 166    public async ValueTask DisposeAsync()
 167    {
 6168        foreach (var channel in _channels.Values)
 169        {
 2170            await channel.CloseAsync();
 171        }
 1172        _channels.Clear();
 1173        if (_connection != null && _connection.IsOpen)
 174        {
 0175            await _connection.CloseAsync();
 176        }
 177
 1178        GC.SuppressFinalize(this);
 1179    }
 180}