< Summary

Information
Class: ClientManager.Shared.Messaging.MessagePublisher
Assembly: ClientManager.Shared
File(s): /home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Shared/Messaging/MessagePublisher.cs
Line coverage
92%
Covered lines: 47
Uncovered lines: 4
Coverable lines: 51
Total lines: 88
Line coverage: 92.1%
Branch coverage
41%
Covered branches: 5
Total branches: 12
Branch coverage: 41.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PublishAsync()37.5%9872.72%
FinalPublishMiddlewareAsync()100%11100%
CreateEnvelope(...)50%4490.9%

File(s)

/home/runner/work/ClientManagerDemo/ClientManagerDemo/src/ClientManager/ClientManager.Shared/Messaging/MessagePublisher.cs

#LineLine coverage
 1using System.Text;
 2using System.Text.Json;
 3using Microsoft.Extensions.Logging;
 4using RabbitMQ.Client;
 5
 6namespace ClientManager.Shared.Messaging;
 7
 18public class MessagePublisher(
 19    IMessageBrokerFactory messageBrokerFactory,
 110    IRoutingConvention routingConvention,
 111    IMessageContextAccessor messageContextAccessor,
 112    MessagePublishPipeline messagePublishPipeline,
 113    ILogger<MessagePublisher>? logger
 114) : IMessagePublisher
 15{
 116    readonly IMessageBrokerFactory _messageBrokerFactory = messageBrokerFactory;
 117    readonly IRoutingConvention _routingConvention = routingConvention;
 118    readonly IMessageContextAccessor _messageContextAccessor = messageContextAccessor;
 119    readonly MessagePublishPipeline _messagePublishPipeline = messagePublishPipeline;
 120    readonly ILogger<MessagePublisher>? _logger = logger;
 121    readonly LinkedList<ulong> outstandingConfirms = new();
 22
 23    public async Task PublishAsync<T>(T message, CancellationToken cancellationToken = default)
 24        where T : IMessage
 25    {
 426        var context = _messageContextAccessor.GetOrCreateContext();
 27        try
 28        {
 429            _logger?.LogDebug("Starting publish for message {MessageType} with CorrelationId {CorrelationId}", typeof(T)
 30
 431            await _messagePublishPipeline.ExecuteAsync(message, FinalPublishMiddlewareAsync, cancellationToken);
 32
 433            _logger?.LogInformation("Successfully published message {MessageType}", typeof(T).Name);
 434        }
 035        catch (Exception ex)
 36        {
 037            _logger?.LogError(ex, "Error publishing message {MessageType}", typeof(T).Name);
 038            throw;
 39        }
 40        finally
 41        {
 442            _messageContextAccessor.ClearContext();
 43
 444            _logger?.LogDebug("Message context cleared for {MessageType}", typeof(T).Name);
 45        }
 446    }
 47
 48    public async Task FinalPublishMiddlewareAsync<T>(T message, CancellationToken cancellationToken = default)
 49        where T : IMessage
 50    {
 451        var (exchange, routingKey) = _routingConvention.ResolveFor(typeof(T));
 452        var channel = await _messageBrokerFactory.GetPublishChannelAsync(exchange);
 53
 454        var envelope = CreateEnvelope(message);
 55
 456        var serializedMessage = JsonSerializer.Serialize(envelope);
 457        var body = Encoding.UTF8.GetBytes(serializedMessage);
 58
 459        var props = new BasicProperties
 460        {
 461            MessageId = envelope.EnvelopeId.ToString(),
 462            CorrelationId = envelope.CorrelationId.ToString(),
 463            Timestamp = new AmqpTimestamp(envelope.CreatedUtc.ToUnixTimeSeconds()),
 464            Type = typeof(T).Name,
 465        };
 66
 467        ulong sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
 468        outstandingConfirms.AddLast(sequenceNumber);
 69
 470        await channel.BasicPublishAsync(exchange, routingKey, mandatory: true, basicProperties: props, body, cancellatio
 471    }
 72
 73    MessageEnvelope<T> CreateEnvelope<T>(T message)
 74    {
 475        if (_messageContextAccessor.Current is null)
 076            throw new NullReferenceException("Current context accessor is not set.  Call GetOrCreateContext() first.");
 77
 478        var context = _messageContextAccessor.Current;
 479        var correlationId = context!.CorrelationId;
 480        var causationId = context.CausationId ?? Guid.Empty;
 481        return new MessageEnvelope<T>
 482        {
 483            Message = message,
 484            CorrelationId = correlationId,
 485            CausationId = causationId
 486        };
 87    }
 88}