| | | 1 | | using System.Text; |
| | | 2 | | using System.Text.Json; |
| | | 3 | | using Microsoft.Extensions.Logging; |
| | | 4 | | using RabbitMQ.Client; |
| | | 5 | | |
| | | 6 | | namespace ClientManager.Shared.Messaging; |
| | | 7 | | |
| | 1 | 8 | | public class MessagePublisher( |
| | 1 | 9 | | IMessageBrokerFactory messageBrokerFactory, |
| | 1 | 10 | | IRoutingConvention routingConvention, |
| | 1 | 11 | | IMessageContextAccessor messageContextAccessor, |
| | 1 | 12 | | MessagePublishPipeline messagePublishPipeline, |
| | 1 | 13 | | ILogger<MessagePublisher>? logger |
| | 1 | 14 | | ) : IMessagePublisher |
| | | 15 | | { |
| | 1 | 16 | | readonly IMessageBrokerFactory _messageBrokerFactory = messageBrokerFactory; |
| | 1 | 17 | | readonly IRoutingConvention _routingConvention = routingConvention; |
| | 1 | 18 | | readonly IMessageContextAccessor _messageContextAccessor = messageContextAccessor; |
| | 1 | 19 | | readonly MessagePublishPipeline _messagePublishPipeline = messagePublishPipeline; |
| | 1 | 20 | | readonly ILogger<MessagePublisher>? _logger = logger; |
| | 1 | 21 | | readonly LinkedList<ulong> outstandingConfirms = new(); |
| | | 22 | | |
| | | 23 | | public async Task PublishAsync<T>(T message, CancellationToken cancellationToken = default) |
| | | 24 | | where T : IMessage |
| | | 25 | | { |
| | 4 | 26 | | var context = _messageContextAccessor.GetOrCreateContext(); |
| | | 27 | | try |
| | | 28 | | { |
| | 4 | 29 | | _logger?.LogDebug("Starting publish for message {MessageType} with CorrelationId {CorrelationId}", typeof(T) |
| | | 30 | | |
| | 4 | 31 | | await _messagePublishPipeline.ExecuteAsync(message, FinalPublishMiddlewareAsync, cancellationToken); |
| | | 32 | | |
| | 4 | 33 | | _logger?.LogInformation("Successfully published message {MessageType}", typeof(T).Name); |
| | 4 | 34 | | } |
| | 0 | 35 | | catch (Exception ex) |
| | | 36 | | { |
| | 0 | 37 | | _logger?.LogError(ex, "Error publishing message {MessageType}", typeof(T).Name); |
| | 0 | 38 | | throw; |
| | | 39 | | } |
| | | 40 | | finally |
| | | 41 | | { |
| | 4 | 42 | | _messageContextAccessor.ClearContext(); |
| | | 43 | | |
| | 4 | 44 | | _logger?.LogDebug("Message context cleared for {MessageType}", typeof(T).Name); |
| | | 45 | | } |
| | 4 | 46 | | } |
| | | 47 | | |
| | | 48 | | public async Task FinalPublishMiddlewareAsync<T>(T message, CancellationToken cancellationToken = default) |
| | | 49 | | where T : IMessage |
| | | 50 | | { |
| | 4 | 51 | | var (exchange, routingKey) = _routingConvention.ResolveFor(typeof(T)); |
| | 4 | 52 | | var channel = await _messageBrokerFactory.GetPublishChannelAsync(exchange); |
| | | 53 | | |
| | 4 | 54 | | var envelope = CreateEnvelope(message); |
| | | 55 | | |
| | 4 | 56 | | var serializedMessage = JsonSerializer.Serialize(envelope); |
| | 4 | 57 | | var body = Encoding.UTF8.GetBytes(serializedMessage); |
| | | 58 | | |
| | 4 | 59 | | var props = new BasicProperties |
| | 4 | 60 | | { |
| | 4 | 61 | | MessageId = envelope.EnvelopeId.ToString(), |
| | 4 | 62 | | CorrelationId = envelope.CorrelationId.ToString(), |
| | 4 | 63 | | Timestamp = new AmqpTimestamp(envelope.CreatedUtc.ToUnixTimeSeconds()), |
| | 4 | 64 | | Type = typeof(T).Name, |
| | 4 | 65 | | }; |
| | | 66 | | |
| | 4 | 67 | | ulong sequenceNumber = await channel.GetNextPublishSequenceNumberAsync(); |
| | 4 | 68 | | outstandingConfirms.AddLast(sequenceNumber); |
| | | 69 | | |
| | 4 | 70 | | await channel.BasicPublishAsync(exchange, routingKey, mandatory: true, basicProperties: props, body, cancellatio |
| | 4 | 71 | | } |
| | | 72 | | |
| | | 73 | | MessageEnvelope<T> CreateEnvelope<T>(T message) |
| | | 74 | | { |
| | 4 | 75 | | if (_messageContextAccessor.Current is null) |
| | 0 | 76 | | throw new NullReferenceException("Current context accessor is not set. Call GetOrCreateContext() first."); |
| | | 77 | | |
| | 4 | 78 | | var context = _messageContextAccessor.Current; |
| | 4 | 79 | | var correlationId = context!.CorrelationId; |
| | 4 | 80 | | var causationId = context.CausationId ?? Guid.Empty; |
| | 4 | 81 | | return new MessageEnvelope<T> |
| | 4 | 82 | | { |
| | 4 | 83 | | Message = message, |
| | 4 | 84 | | CorrelationId = correlationId, |
| | 4 | 85 | | CausationId = causationId |
| | 4 | 86 | | }; |
| | | 87 | | } |
| | | 88 | | } |