How to implement CQRS with MediatR - Part 2

In this post we'll be using MediatR with a dotnet WebAPI. If you want to read about its implementation in a console application, check out my other blog post.

For basic cases you can implement the message types and handlers mentioned in the Part 1. Here we are trying a different use case. The use of CancellationToken in MediatR to drop the ongoing request processing. This is very helpful if the request processing is blocked and we want to implement a timeout or if the request is dropped by the initiator.

For the sample case we will create a web api that try to do a task within 5 seconds. The API method will accept time in milliseconds and if the given time is greater than 5 seconds the task will run for 5 seconds and then get cancelled, otherwise task will be executed for the given time.

Prerequisites

  • Install dotnet core
  • Install Visual Studio Code/Visual Studio IDE

Creating a dotnet WebAPI application

Run the following CLI command to create a WebAPI project.

dotnet new webapi MediatRSampleAPI

Configure Serilog for logging (Optional)

I'll be using Serilog and a flat file sink for logging. To configure this we need to install the following dependencies via Nuget.

  • Microsoft.Extensions.Logging
  • Serilog
  • Serilog.AspNetCore
  • Serilog.Sinks.File

Then update the Program.cs file.

public static void Main(string[] args)
{
    Log.Logger = new LoggerConfiguration()
                    .Enrich.FromLogContext()
                    .WriteTo.File("logs/MediatRSample.txt", rollingInterval: RollingInterval.Day)
                    .CreateLogger();
    try
    {
        Log.Information("Starting up");
        CreateHostBuilder(args).Build().Run();
    }
    catch (Exception ex)
    {
        Log.Fatal(ex, "Application start-up failed");
    }
    finally
    {
        Log.CloseAndFlush();
    }
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .UseSerilog()
        .ConfigureWebHostDefaults(webBuilder =>
        {
            webBuilder.UseStartup<Startup>();
        });

Install and Configure MediatR

Next we need to install MediatR (version 9.0.0 at the time of writing) via Nuget. To configure MediatR, add the following snippet to the ConfigureServices() method in Startup.cs file.

services.AddMediatR(typeof(Startup));

Create a Notification message and its handler

Our notification message carries time in milliseconds. Handler just accepts the time given by the user and waits for that much time. In the meantime if the request is cancelled, waiting will be stopped and exit.

I've added Stopwatch code to make sure that handler will wait for maximum 5 seconds. When cancellation is triggered and OperationCanceledException is thrown we need to catch it explicitly.

public class DelayNotificationMessage: INotification
{
    public int TimeInMilliSeconds { get; set; }
}

public class Notifier03 : INotificationHandler<DelayNotificationMessage>
{
    private readonly ILogger<Notifier03> _logger;
    public Notifier03(ILogger<Notifier03> logger)
    {
        _logger = logger;
    }
    public async Task Handle(DelayNotificationMessage notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation($"Notifier 03 -> Time In MIlli Seconds: {notification.TimeInMilliSeconds}");
        Stopwatch stopwatch = Stopwatch.StartNew();
        try
        {
            await Task.Delay(notification.TimeInMilliSeconds, cancellationToken);
        }
        catch(OperationCanceledException ex)
        {
            _logger.LogError("5 seconds passed and the task is cancelled");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex.Message);                
        }
        stopwatch.Stop();
        _logger.LogInformation($"Elapsed Time: {stopwatch.ElapsedMilliseconds}");            
    }
}

Create a mediator service

Mediator Service is a class that is used by the initiator to publish messages to the handlers. Here we'll the add the following code in the service class where CancellationToken is an optional parameter.

public void DelayedNotify(int timeInMilliSeconds, CancellationToken cancellationToken = default)
{
    _mediator.Publish(new DelayNotificationMessage { TimeInMilliSeconds = timeInMilliSeconds }, cancellationToken);
        

Add new method in API Controller

Now let's add a controller method that creates a cancellation token. CancellationToken is set to cancel after 5 seconds using the CancelAfter method of CancellationTokenSource

[HttpGet("/dowithin5seconds")]
public async Task<string> DoWithin5Seconds(int timeInMilliSeconds)
{
    CancellationTokenSource source = new CancellationTokenSource();
    CancellationToken token = source.Token;
    source.CancelAfter(5000);
    _mediatorService.DelayedNotify(timeInMilliSeconds, token);
    var message = "Finished within 5 seconds.";

    _logger.LogInformation(message);

    return message;
}

Run the application

To run the project via dotnet cli, run the following command.

dotnet run --project <Path to *.csproj file>

Once the port is open, invoke the DoWithin5Seconds method by entering 'http://localhost:62705/dowithin5seconds?timeInMilliSeconds=15000' in the browser (Your port number may vary. Also for the demo purpose it is better to disable https redirection).

You'll get the response immediately, but the process will run for a maximum of 5 seconds. If you check the log file, you can see that the execution stopped at 5 seconds.

2021-02-27 12:46:57.384 +05:30 [INF] Executing endpoint 'MediatRSampleAPI.Controllers.SlowTestController.DoWithin5Seconds (MediatRSampleAPI)'
2021-02-27 12:46:57.425 +05:30 [INF] Route matched with {action = "DoWithin5Seconds", controller = "SlowTest"}. Executing controller action with signature System.Threading.Tasks.Task`1[System.String] DoWithin5Seconds(Int32) on controller MediatRSampleAPI.Controllers.SlowTestController (MediatRSampleAPI).
2021-02-27 12:46:57.517 +05:30 [INF] Executing action method MediatRSampleAPI.Controllers.SlowTestController.DoWithin5Seconds (MediatRSampleAPI) - Validation state: "Valid"
2021-02-27 12:46:57.524 +05:30 [INF] Notifier 03 -> Time In MIlli Seconds: 15000
2021-02-27 12:46:57.526 +05:30 [INF] Finished within 5 seconds.
2021-02-27 12:47:02.576 +05:30 [ERR] 5 seconds passed and the task is cancelled
2021-02-27 12:47:02.577 +05:30 [INF] Elapsed Time: 5052

Full source code is available in GitHub.

Final Thoughts

If you are more interested in the use of CancellationToken, please checkout the post by Andrew Lock.

One thing I haven't explored so far is exception handling with MediatR. It requires a blog post of its own and I'll be posting it soon.