Skip to content
Go back

Solving Race Conditions in EF Core's Outbox Pattern with Pessimistic Locking

TL;DR

The bug: Domain events in my EF Core outbox library were being processed twice - again. Yes, again.

The cause: This time it wasn’t a sneaky double registration. It was a genuine race condition between the SaveChangesInterceptor (which processes events immediately) and a background retry service (which picks up anything that failed). Both were grabbing the same event row with no coordination.

The fix: Database-level pessimistic row locking - SELECT ... FOR UPDATE SKIP LOCKED on PostgreSQL, SELECT ... WITH (UPDLOCK, READPAST, ROWLOCK) on SQL Server.

The gotcha I walked into: The row lock has to live inside an explicit transaction. Without one, EF Core auto-commits after the SELECT, and the lock vanishes before you’ve done anything useful. I shipped this bug and had to fix it in a follow-up commit, which was humbling.


Not this again

If you read my last post, you’ll know I spent four days chasing duplicate event processing through EF Core’s internals, only to find the whole thing was caused by a double interceptor registration in my test setup.

So when I saw events duplicating again, my first reaction was a kind of tired disbelief. I actually went back and checked the interceptor registrations. Twice. They were fine.

This was a different problem entirely - and honestly, one I should have anticipated from the start. Two independent processes (the interceptor and the background retry service) were both reading from the same Events table, both looking for rows where DispatchedOn IS NULL, and both happily processing whatever they found. No locking. No coordination. Just vibes.

The consequences range from “mildly annoying” (duplicate emails) to “actively bad” (double-charging a customer through a payment provider). So this needed fixing properly.

How the outbox works (quick refresher)

If you haven’t read the previous post, here’s the short version. Onward implements the Transactional Outbox pattern for EF Core:

  1. During SaveChangesAsync, an interceptor extracts domain events from entities
  2. Those events get written to an Events table in the same transaction as the business data
  3. After the transaction commits, the interceptor tries to process them immediately
  4. A background service polls for anything that wasn’t processed (failures, crashes, etc.)

Steps 3 and 4 are the problem. Two processors, one table, no locks. You can see where this is going.

// The interceptor - processes events right after save
public override async ValueTask<int> SavedChangesAsync(
    SaveChangesCompletedEventData eventData,
    int result, CancellationToken cancellationToken = default)
{
    var baseResult = await base.SavedChangesAsync(eventData, result, cancellationToken);

    while (_added.TryPop(out var addition))
    {
        try
        {
            await _unitOfWork.ProcessEvent(addition, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogWarning(ex,
                "Failed to process event {EventId} immediately after save. " +
                "Event will be retried by background service", addition);
        }
    }

    return baseResult;
}
// The background service - retries anything unprocessed
public async Task DoWork(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        _logger.LogInformation("Running retries for Onward events");
        var result = await _onwardRetryManager.RetryOnwardProcessing(stoppingToken);

        if (!result.IsSuccess)
        {
            var failureResult = result as UnsuccessfulRetryResult;
            _logger.LogWarning(failureResult.LastException,
                "Some Onward events could not be processed");
            await Task.Delay(_unsuccessfulRetryPeriod, stoppingToken);
        }
        else
        {
            var successResult = result as SuccessfulRetryResult;
            _logger.LogInformation(
                "Successfully completed retries. {0} events processed",
                successResult.NumberOfEventsProcessed);
            await Task.Delay(_successfulRetryPeriod, stoppingToken);
        }
    }
}

Both of these processes look at DispatchedOn IS NULL to decide what to work on. If the background service wakes up while the interceptor is mid-processing, they both grab the same row and both run the handler. Classic race condition.

First attempt: an application-level flag (don’t do this)

My first instinct was to add an IsProcessing boolean to the Event entity. Before processing, set it to true. Other processors check the flag and skip rows that are already claimed. Looking back, this was never going to work - it’s literally the textbook example of a race condition, and I walked right into it with full confidence. But that’s kind of the point of this blog. I write about the dumb things I do so that when you do them too, you know you’re not alone. We’re all out here reinventing broken locks and calling it engineering.

// The "this seemed reasonable at the time" approach
var @event = await context.Set<Event>()
    .Where(e => e.Id == eventId && !e.DispatchedOn.HasValue && !e.IsProcessing)
    .FirstOrDefaultAsync(stoppingToken);

if (@event == null)
{
    await transaction.CommitAsync(stoppingToken);
    return null;
}

@event.IsProcessing = true;
@event.ProcessingStartedAt = DateTime.UtcNow;
await context.SaveChangesAsync(stoppingToken);
await transaction.CommitAsync(stoppingToken);

This doesn’t work, and the reason is the oldest concurrency bug in the book. There’s a gap between the SELECT and the UPDATE:

Time     Process A (Interceptor)       Process B (Background)
----     ----------------------        ----------------------
 T1      SELECT WHERE IsProcessing=false
 T2      -> Found event, IsProcessing=false
 T3                                    SELECT WHERE IsProcessing=false
 T4                                    -> Found event, IsProcessing=false
 T5      UPDATE SET IsProcessing=true
 T6      SaveChanges() ✓
 T7                                    UPDATE SET IsProcessing=true
 T8                                    SaveChanges() ✓
 T9      Processing event...           Processing event...

Both processes read IsProcessing=false before either one writes true. Both think they’ve claimed the event. Both process it.

On top of the race condition, if a process crashes while IsProcessing = true, that event is stuck forever unless you build cleanup logic. Which means more code, more edge cases, and more things to get wrong.

I scrapped this approach.

The actual fix: pessimistic row locking

After some research, I found that every serious outbox implementation (NServiceBus, MassTransit, Wolverine) uses database-native pessimistic locking for exactly this reason. The database is already good at this. Let it do its job.

The idea: instead of reading a row and then trying to claim it, you acquire an exclusive lock on the row as part of the SELECT. Any other process that tries to read the same row either blocks (waiting for the lock) or skips it entirely.

EF Core doesn’t support this natively (GitHub issue #26042), so it has to be raw SQL:

private static string GetLockingQuery(string providerName, Guid? eventId = null)
{
    var whereClause = eventId.HasValue
        ? $"WHERE \"Id\" = '{eventId.Value}' AND \"DispatchedOn\" IS NULL"
        : "WHERE \"DispatchedOn\" IS NULL ORDER BY \"CreatedOn\" LIMIT 1";

    return providerName switch
    {
        "Npgsql.EntityFrameworkCore.PostgreSQL" =>
            $@"SELECT * FROM ""Onwrd"".""Events"" {whereClause} FOR UPDATE SKIP LOCKED",

        "Microsoft.EntityFrameworkCore.SqlServer" =>
            eventId.HasValue
                ? $"SELECT * FROM Onwrd.Events WITH (UPDLOCK, READPAST, ROWLOCK) WHERE Id = '{eventId.Value}' AND DispatchedOn IS NULL"
                : "SELECT TOP 1 * FROM Onwrd.Events WITH (UPDLOCK, READPAST, ROWLOCK) WHERE DispatchedOn IS NULL ORDER BY CreatedOn",

        _ => throw new NotSupportedException(
            $"Database provider '{providerName}' is not supported for row locking")
    };
}

The SKIP LOCKED (PostgreSQL) / READPAST (SQL Server) part is important. Without it, the second process would block and wait for the first to finish, which could cause thread pool starvation or deadlocks. With it, the second process just gets back an empty result and moves on. No waiting, no blocking.

public async Task<UnitOfWorkResult> ProcessEvent(
    Guid eventId, CancellationToken cancellationToken)
{
    return await Process(GetAndMarkEvent, cancellationToken);

    async Task<Event> GetAndMarkEvent(TContext context, CancellationToken stoppingToken)
    {
        var providerName = context.Database.ProviderName;
        var lockingQuery = GetLockingQuery(providerName, eventId);

        var @event = await context.Set<Event>()
            .FromSqlRaw(lockingQuery)
            .FirstOrDefaultAsync(stoppingToken);

        return @event;
    }
}

And the IsProcessing / ProcessingStartedAt fields? Gone. The database handles all of it. The Event entity went back to being clean:

internal class Event
{
    public Guid Id { get; set; }
    public DateTime CreatedOn { get; set; }
    public DateTime? DispatchedOn { get; set; }
    public string TypeId { get; set; }
    public string Contents { get; set; }
    public string AssemblyName { get; set; }
}

The bug I shipped: forgetting the transaction

I deployed the pessimistic locking fix and felt good about it for approximately one day. Then I realised the lock wasn’t actually being held.

The problem: EF Core auto-commits each statement by default. A SELECT ... FOR UPDATE acquires a lock, but that lock only lives for the duration of the transaction. Without an explicit transaction, EF Core commits right after the SELECT, the lock disappears, and we’re back to square one.

Here’s the broken version I shipped:

// BROKEN - lock released immediately after SELECT
private async Task<UnitOfWorkResult> Process(
    Func<TContext, CancellationToken, Task<Event>> getEvent,
    CancellationToken cancellationToken)
{
    using var scope = _serviceProvider.CreateScope();
    var context = scope.ServiceProvider.GetRequiredService<TContext>();

    // No transaction! Lock vanishes after this line.
    var @event = await getEvent(context, cancellationToken);

    if (@event == null)
        return UnitOfWorkResult.NoEvents;

    var contents = @event.DeserializeContents();
    await _onwardProcessorOrchestrator.Process((@event, contents), scope, cancellationToken);

    @event.DispatchedOn = DateTime.UtcNow;
    await context.SaveChangesAsync(cancellationToken);

    return UnitOfWorkResult.Processed;
}

And the fixed version:

// FIXED - lock held until commit
private async Task<UnitOfWorkResult> Process(
    Func<TContext, CancellationToken, Task<Event>> getEvent,
    CancellationToken cancellationToken)
{
    using var scope = _serviceProvider.CreateScope();
    var context = scope.ServiceProvider.GetRequiredService<TContext>();

    await using var transaction = await context.Database.BeginTransactionAsync(cancellationToken);
    var @event = await getEvent(context, cancellationToken);

    if (@event == null)
    {
        await transaction.RollbackAsync(cancellationToken);
        return UnitOfWorkResult.NoEvents;
    }

    var contents = @event.DeserializeContents();
    await _onwardProcessorOrchestrator.Process((@event, contents), scope, cancellationToken);

    @event.DispatchedOn = DateTime.UtcNow;
    await context.SaveChangesAsync(cancellationToken);
    await transaction.CommitAsync(cancellationToken);

    return UnitOfWorkResult.Processed;
}

The transaction wraps everything: the locking SELECT, the event processing, and the final UPDATE. The lock is held the entire time and released automatically on commit or rollback. No manual cleanup, no orphaned locks if the process crashes.

How it actually behaves now

Time     Process A (Interceptor)       Process B (Background)
----     ----------------------        ----------------------
 T1      BEGIN TRANSACTION
 T2      SELECT ... FOR UPDATE SKIP LOCKED
 T3      -> Found event, LOCK ACQUIRED
 T4                                    BEGIN TRANSACTION
 T5                                    SELECT ... FOR UPDATE SKIP LOCKED
 T6                                    -> Row locked, SKIP LOCKED returns NULL
 T7                                    ROLLBACK (nothing to do)
 T8      Processing event...
 T9      UPDATE DispatchedOn
 T10     COMMIT TRANSACTION
 T11     Lock released

Process B sees the row is locked and skips it immediately. No blocking, no duplicate processing, no flags to manage.

Testing the locking behaviour

I wrote concurrency tests using Testcontainers to spin up real PostgreSQL and SQL Server instances. These aren’t unit tests with mocks - they test actual database locking behaviour.

Test 1: only one process can claim an event

One task acquires a lock on an event and holds it. A second task tries to process the same event while the lock is held. The second task should get NoEvents back because SKIP LOCKED skips the row:

[Theory]
[MemberData(nameof(SupportedDatabases.All), MemberType = typeof(SupportedDatabases))]
public async Task ProcessEvent_WhenTwoProcessesTryToClaimSameEvent_OnlyOneSucceeds(
    ISupportedDatabase supportedDatabase)
{
    _database = supportedDatabase;
    await _database.StartAsync();

    var serviceProvider = await DatabaseTestHelper.SetupTestEnvironment(supportedDatabase);
    var context = serviceProvider.GetRequiredService<TestContext>();
    var eventId = await DatabaseTestHelper.InsertEventDirectly(
        context, new TestEvent("ConcurrencyTest"));

    // Task 1: acquire and hold a lock
    using var scope1 = serviceProvider.CreateScope();
    var context1 = scope1.ServiceProvider.GetRequiredService<TestContext>();
    var semaphore = LockingTestHelper.CreateLockSemaphore();
    var lockQuery = DatabaseTestHelper.BuildLockQuery(
        context1.Database.ProviderName, eventId);

    var lockTask = Task.Run(async () =>
    {
        await LockingTestHelper.AcquireAndHoldLock(
            context1, lockQuery,
            TestConstants.LockHoldDuration, semaphore);
    });

    await semaphore.WaitAsync();

    // Task 2: try to process while Task 1 holds the lock
    using var scope2 = serviceProvider.CreateScope();
    var unitOfWork2 = scope2.ServiceProvider
        .GetRequiredService<IOnwardProcessingUnitOfWork<TestContext>>();

    var result = await unitOfWork2.ProcessEvent(eventId, CancellationToken.None);
    await lockTask;

    Assert.Equal(UnitOfWorkResult.NoEvents, result);
}

Test 2: locked events are skipped, unlocked events still process

Three events in the table. Lock the first one, then call ProcessNext twice. It should skip the locked event and process the other two:

[Theory]
[MemberData(nameof(SupportedDatabases.All), MemberType = typeof(SupportedDatabases))]
public async Task ProcessNext_WhenEventIsLockedByTransaction_SkipsToNextAvailableEvent(
    ISupportedDatabase supportedDatabase)
{
    _database = supportedDatabase;
    await _database.StartAsync();

    var serviceProvider = await DatabaseTestHelper.SetupTestEnvironment(supportedDatabase);
    var context = serviceProvider.GetRequiredService<TestContext>();

    await DatabaseTestHelper.InsertEventDirectly(context, new TestEvent("Event1"));
    await DatabaseTestHelper.InsertEventDirectly(context, new TestEvent("Event2"));
    await DatabaseTestHelper.InsertEventDirectly(context, new TestEvent("Event3"));

    var eventIds = await context.Set<Event>()
        .OrderBy(e => e.CreatedOn).Select(e => e.Id).ToListAsync();

    // Lock the first event
    using var scope1 = serviceProvider.CreateScope();
    var context1 = scope1.ServiceProvider.GetRequiredService<TestContext>();
    var semaphore = LockingTestHelper.CreateLockSemaphore();
    var lockQuery = DatabaseTestHelper.BuildLockQueryForFirstEvent(
        context1.Database.ProviderName, eventIds[0]);

    var lockTask = Task.Run(async () =>
    {
        await LockingTestHelper.AcquireAndHoldLock(
            context1, lockQuery,
            TestConstants.ExtendedLockHoldDuration, semaphore);
    });

    await semaphore.WaitAsync();

    // Process next events - should skip the locked one
    using var scope2 = serviceProvider.CreateScope();
    var unitOfWork2 = scope2.ServiceProvider
        .GetRequiredService<IOnwardProcessingUnitOfWork<TestContext>>();

    var event2Result = await unitOfWork2.ProcessNext(CancellationToken.None);
    var event3Result = await unitOfWork2.ProcessNext(CancellationToken.None);
    await lockTask;

    Assert.Equal(UnitOfWorkResult.Processed, event2Result);
    Assert.Equal(UnitOfWorkResult.Processed, event3Result);

    var dispatchedEvents = await context.Set<Event>()
        .Where(e => e.DispatchedOn.HasValue).ToListAsync();
    Assert.Equal(2, dispatchedEvents.Count);
    Assert.DoesNotContain(dispatchedEvents, e => e.Id == eventIds[0]);
}

Test 3: failed processing releases the lock for retry

If processing throws, the transaction rolls back, the lock is released, and the event becomes available again:

[Theory]
[MemberData(nameof(SupportedDatabases.All), MemberType = typeof(SupportedDatabases))]
public async Task ProcessEvent_WhenProcessingFails_EventBecomesAvailableForRetry(
    ISupportedDatabase supportedDatabase)
{
    _database = supportedDatabase;
    await _database.StartAsync();

    var attemptCount = 0;
    var serviceProvider = await DatabaseTestHelper.SetupTestEnvironment(
        supportedDatabase, shouldThrow: () => attemptCount == 0);

    var context = serviceProvider.GetRequiredService<TestContext>();
    var eventId = await DatabaseTestHelper.InsertEventDirectly(
        context, new TestEvent("FailureTest"));

    var unitOfWork = serviceProvider
        .GetRequiredService<IOnwardProcessingUnitOfWork<TestContext>>();

    // First attempt fails
    await Assert.ThrowsAsync<TargetInvocationException>(async () =>
        await unitOfWork.ProcessEvent(eventId, CancellationToken.None));

    var @event = await context.Set<Event>().SingleAsync();
    Assert.Null(@event.DispatchedOn);

    // Second attempt succeeds
    attemptCount++;
    var result = await unitOfWork.ProcessEvent(eventId, CancellationToken.None);

    Assert.Equal(UnitOfWorkResult.Processed, result);
    await context.Entry(@event).ReloadAsync();
    Assert.NotNull(@event.DispatchedOn);
}

What I took away from this

Application-level flags don’t solve concurrency problems. The gap between reading a flag and setting it is exactly where race conditions live. I knew this in theory. Now I know it from experience.

EF Core doesn’t support pessimistic locking natively. You need raw SQL with database-specific hints. GitHub issue #26042 has been open since 2021 if you want to add a thumbs-up.

The lock must live inside an explicit transaction. This one bit me. Without BeginTransactionAsync(), EF Core auto-commits and the lock disappears immediately. I wrote about this confidently in the code comments and then shipped the exact bug the comments warned about.

SKIP LOCKED is the key to non-blocking behaviour. Without it, the background service would sit there waiting for the interceptor to finish, which could cascade into thread pool starvation. With it, locked rows are just invisible.

This is how the grown-up frameworks do it. NServiceBus, MassTransit, and Wolverine all use FOR UPDATE SKIP LOCKED or UPDLOCK READPAST for their outbox implementations. I could have saved myself some time by reading their source code first, but where’s the learning in that?

Quick reference: the locking SQL

PostgreSQL:

SELECT * FROM "Onwrd"."Events"
WHERE "Id" = 'event-guid' AND "DispatchedOn" IS NULL
FOR UPDATE SKIP LOCKED

SQL Server:

SELECT * FROM Onwrd.Events
WITH (UPDLOCK, READPAST, ROWLOCK)
WHERE Id = 'event-guid' AND DispatchedOn IS NULL

Wrapping up

Two blog posts about duplicate event processing in the same library. At this point I’m thinking about making it a series.

The first time, the bug was a double interceptor registration hiding in test infrastructure. This time it was a genuine concurrency issue that needed database-level coordination to fix. Different root causes, same symptom, same sinking feeling when I saw it in the logs.

The final implementation is actually simpler than my first attempt with IsProcessing flags. Fewer fields on the entity, no cleanup logic for stale locks, no edge cases around process crashes. Sometimes the “more sophisticated” solution is also the less complicated one.

If you’re building an outbox pattern and haven’t thought about concurrent processing yet - maybe do that before you end up writing two blog posts about it.


Building something similar or have thoughts on this? Feel free to connect with me on LinkedIn.


Share this post on:

Previous Post
How I Became the Impostor: Breaking Free from Toxic Environments and Self-Fulfilling Prophecies
Next Post
Debugging Duplicate Event Processing: A 4-Day Journey Through EF Core Interceptors