| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using System.Text; |
| | | 3 | | using System.Text.Json; |
| | | 4 | | using ClientManager.Shared.Messaging; |
| | | 5 | | using RabbitMQ.Client; |
| | | 6 | | using RabbitMQ.Client.Events; |
| | | 7 | | |
| | | 8 | | namespace ClientManager.Worker.Messaging; |
| | | 9 | | |
| | 0 | 10 | | public class RabbitMQMessageConsumer(IServiceScopeFactory serviceScopeFactory, IMessageBrokerFactory messageBrokerFactor |
| | | 11 | | : IMessageConsumer |
| | | 12 | | { |
| | 0 | 13 | | readonly IServiceScopeFactory _scopeFactory = serviceScopeFactory; |
| | 0 | 14 | | readonly IMessageBrokerFactory _messageBrokerFactory = messageBrokerFactory; |
| | 0 | 15 | | readonly ILogger<RabbitMQMessageConsumer> _logger = logger; |
| | 0 | 16 | | readonly ConcurrentDictionary<string, IChannel> _channels = new(); |
| | 0 | 17 | | readonly IReadOnlyDictionary<string, Type> _messageTypeCache = DiscoverMessageTypes().ToDictionary(t => t.Name, t => |
| | | 18 | | |
| | | 19 | | public async Task StartAsync(CancellationToken cancellationToken) |
| | | 20 | | { |
| | 0 | 21 | | foreach (var messageType in _messageTypeCache.Values) |
| | | 22 | | { |
| | 0 | 23 | | await SubscribeAsync(messageType.Name, cancellationToken); |
| | | 24 | | } |
| | 0 | 25 | | } |
| | | 26 | | |
| | | 27 | | async Task SubscribeAsync(string queueName, CancellationToken cancellationToken) |
| | | 28 | | { |
| | 0 | 29 | | var channel = await _messageBrokerFactory.GetConsumeChannelAsync(queueName); |
| | 0 | 30 | | _channels.AddOrUpdate(queueName, channel, (key, oldValue) => channel); |
| | | 31 | | |
| | 0 | 32 | | var consumer = new AsyncEventingBasicConsumer(channel); |
| | | 33 | | |
| | 0 | 34 | | consumer.ReceivedAsync += async (_, ea) => |
| | 0 | 35 | | { |
| | 0 | 36 | | var json = Encoding.UTF8.GetString(ea.Body.ToArray()); |
| | 0 | 37 | | |
| | 0 | 38 | | try |
| | 0 | 39 | | { |
| | 0 | 40 | | await DispatchToHandlers(queueName, json, cancellationToken); |
| | 0 | 41 | | await channel.BasicAckAsync(ea.DeliveryTag, multiple: false, cancellationToken); |
| | 0 | 42 | | } |
| | 0 | 43 | | catch (Exception ex) |
| | 0 | 44 | | { |
| | 0 | 45 | | _logger.LogError(ex, "Failed to process message from {queue}", queueName); |
| | 0 | 46 | | await channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true); |
| | 0 | 47 | | } |
| | 0 | 48 | | }; |
| | | 49 | | |
| | 0 | 50 | | await channel.BasicConsumeAsync(queueName, autoAck: false, consumer); |
| | 0 | 51 | | _logger.LogInformation("Listening on queue: {queue}", queueName); |
| | 0 | 52 | | } |
| | | 53 | | |
| | | 54 | | public async Task StopAsync(CancellationToken cancellationToken) |
| | | 55 | | { |
| | 0 | 56 | | foreach (var (queue, channel) in _channels) |
| | | 57 | | { |
| | 0 | 58 | | await channel.BasicCancelAsync(queue, cancellationToken: cancellationToken); |
| | 0 | 59 | | await channel.CloseAsync(cancellationToken); |
| | 0 | 60 | | } |
| | | 61 | | |
| | 0 | 62 | | _logger.LogInformation("All channels closed."); |
| | 0 | 63 | | } |
| | | 64 | | |
| | | 65 | | static IEnumerable<Type> DiscoverMessageTypes() => |
| | 0 | 66 | | AppDomain |
| | 0 | 67 | | .CurrentDomain.GetAssemblies() |
| | 0 | 68 | | .SelectMany(assembly => assembly.GetTypes()) |
| | 0 | 69 | | .Where(type => typeof(ICommand).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract) |
| | 0 | 70 | | .ToList(); |
| | | 71 | | |
| | | 72 | | async Task DispatchToHandlers(string queueName, string json, CancellationToken cancellationToken) |
| | | 73 | | { |
| | | 74 | | // Need to create a scope inside the singleton for every message |
| | 0 | 75 | | await using var scope = _scopeFactory.CreateAsyncScope(); |
| | 0 | 76 | | var messageContextAccessor = scope.ServiceProvider.GetRequiredService<IMessageContextAccessor>(); |
| | | 77 | | |
| | 0 | 78 | | if (!_messageTypeCache.TryGetValue(queueName, out var messageType)) |
| | | 79 | | { |
| | 0 | 80 | | _logger.LogWarning("No message type found for queue: {queue}", queueName); |
| | 0 | 81 | | return; |
| | | 82 | | } |
| | | 83 | | |
| | 0 | 84 | | var envelope = JsonSerializer.Deserialize<MessageEnvelope<object>>(json); |
| | 0 | 85 | | ArgumentNullException.ThrowIfNull(envelope); |
| | | 86 | | |
| | 0 | 87 | | var jsonElement = (JsonElement)envelope.Message; |
| | 0 | 88 | | var message = jsonElement.Deserialize(messageType); |
| | | 89 | | |
| | 0 | 90 | | var messageContext = GetMessageContext(envelope); |
| | 0 | 91 | | messageContextAccessor.SetCurrentContext(messageContext); |
| | | 92 | | |
| | 0 | 93 | | _logger.LogInformation("Received message from {queue}: \n\t[correlationId: {correlationId}]\n\t{@Message}", queu |
| | | 94 | | |
| | | 95 | | // Find handler registered for that message type |
| | 0 | 96 | | var handlerType = typeof(IHandleMessage<>).MakeGenericType(messageType); |
| | 0 | 97 | | var handlers = scope.ServiceProvider.GetServices(handlerType); |
| | | 98 | | |
| | 0 | 99 | | foreach (var handler in handlers) |
| | | 100 | | { |
| | 0 | 101 | | if (handler is not null && message is not null) |
| | | 102 | | { |
| | 0 | 103 | | dynamic dynamicHandler = handler; |
| | 0 | 104 | | await dynamicHandler.HandleAsync((dynamic)message, messageContext, cancellationToken); |
| | | 105 | | } |
| | | 106 | | } |
| | 0 | 107 | | } |
| | | 108 | | |
| | | 109 | | static MessageContext GetMessageContext(MessageEnvelope<object> envelope) => |
| | 0 | 110 | | new MessageContext(CorrelationId: envelope.CorrelationId, CausationId: envelope.CausationId, Timestamp: envelope |
| | | 111 | | } |