Correctly using Postgres as queue


I am building a central notification dispatch system that is responsible for sending different kinds of notifications to the end customer. It relies on multiple third party APIs for sending the actual email/SMS notifications. At a high level architecture of the system is shown below.

NotificationSender exposes both REST and messaging interface for accepting consumer requests. Consumers here refer to the services that need to send the notification. This is what notification system does:

  • It accepts requests from upstream services and stores that in the Postgres database after doing validation. The notification event is written to the Postgres database in ENQUEUED state. It is returns back HTTP 202 ACCEPTED to the upstream services if the request is valid else it returns HTTP 400 Bad Request.
  • At a predefined frequency a poller that is part of the NotificationDispacther polls the Postgres database for new notification events i.e. events in ENQUEUED state. For now, it respects insertion time order.
  • If enqueued events are found then it processes them and sends actual notifications using the downstream SMS and Email services.
  • After processing the events it change state of the events to processed

I went with the poller based approach for its simplicity. Also, this design meets our NFR of sending at peak 10 million notifications per day.

I will not talk about the actual event processing in this post. I might cover it in a different post. Today, I will only focus on the queue aspect of our design.

One thing we considered from the start is our system’s ability to run multiple instances. When you have multiple dispatchers then you have to consider how each dispatcher will process unique events and each enqueued event is processed by one dispatcher only. This is where all the complexity lies in using Postgres as the queue.

In this post, I will use Java as a programming language. We are using Java17, Spring Boot 2.6.2, and Postgres 12.6 (we use AWS Aurora Postgres).

Let’s look at our poller code. It is a simple Spring bean as shown below.

public class NotificationPoller {

    @Autowired
    private NotificationEventRepository notificationEventRepository;

    @Value("queue.batch.size")
    private int batchSize;

    @Transactional
    @Scheduled(fixedDelay = 60_000)
    public void sendNotifications() {
        List<NotificationEvent> events = this.notificationEventRepository
                .findAndUpdate(batchSize);
        if (events.isEmpty()) {
            log.info("No notification events found to process so returning");
            return;
        }
        log.info("Processing {} events with ids {}", events.size(), eventIds);
        // sort the events since returned result set is not sorted
        // process the events
        // change state to processed or failed depending on the situation
    }
}

Every minute our poller will find the enqueued events and process them.

The interesting part here is the findAndUpdate method of NotificationEventRepository. You want to achieve following from it:

  1. Find the enqueued events and change their state to processing in a single atomic operation
  2. If two pollers query at the same time then they both should get a different set of events. For example, if our batch size is 1 and we have two notification events in the enqueued state then they both should get one event each. And, both events should be different. Similarly, if batch size is 2 and we have two poller then only one should do the processing and second should not get any event
  3. This point is covered at first but I am again repeating it. We want to return the updated events in the query result so that we don’t have to make a seperate query to fetch events

Let’s try to achieve this in a couple of steps.In the first step we will achieve point 1 and 3.

public interface NotificationEventRepository extends JpaRepository<NotificationEvent, String> {
    @Modifying
    @Query(value = """
            update notification_events
                set notification_event_status='PROCESSING'
                where id IN (
                    select id from notification_events e
                    where notification_event_status = 'ENQUEUED'
                    order by created_at
                    limit ?1)
            RETURNING *""", nativeQuery = true)
    List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);
}

In the code shown above

  • The subquery selects the enqueued events ordered by their timestamp and limit that to our batch size
  • Next, we update all the events returned by subquery to PROCESSING
  • And, finally using the RETURNING we return the result set. Spring Data JPA converts it to NotificationEvent

This might look like it will work. But, let’s write a test case to see how it works if we have multiple threads calling it.

We will use CountDownLatch to test it.

@Test
void two_threads_should_not_process_same_event_twice() throws Exception {
    var events = List.of(newEvent(),newEvent(),newEvent())

    this.notificationEventRepository.saveAll(events);

    var numberOfThreads = 2;
    CountDownLatch countDownLatch = new CountDownLatch(numberOfThreads);
    ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
    for (int i = 0; i < numberOfThreads; i++) {
        executorService.submit(() -> {
            this.poller.sendNotifications();
            countDownLatch.countDown();
        });
    }

    boolean reachedZero = countDownLatch.await(5, TimeUnit.SECONDS);
    if (!reachedZero) {
        fail("Test failed because countdown didn't reach zero");
    }

    assertThat(notificationEventRepository.findAll().stream().map(NotificationEvent::getNotificationEventStatus))
            .containsExactly(NotificationEventStatus.SUCCESSFULLY_PROCESSED,
                    NotificationEventStatus.SUCCESSFULLY_PROCESSED,
                    NotificationEventStatus.SUCCESSFULLY_PROCESSED);

    executorService.shutdown();

}

In the test case above we have following setup:

  • We saved three events in the db
  • Next, we create two threads and submitted them a task to call sendNotifications method
  • We used CountDownLatch that allows one or more threads to wait until a set of operations being performed in other thread completes
  • Finally, we asserted that all the three events are processed

If we run the test it will fail with the following failure message

Expecting actual:
  [ENQUEUED, SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED]
to contain exactly (and in same order):
  [SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED]

So, it looks like our threads picked the same events twice. We can verify this by looking at our logs.

Thread 2 logs

2022-01-27 07:21:38.289  INFO [notification-service:pool-4-thread-2]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  2 events to processing. Events updated ids are [382ba261-47b4-405f-8bf9-99f28c19d0ed, 9ad72fe5-272f-43d5-89bc-06950eaa659d]

Thread 1 logs

2022-01-27 07:21:38.301  INFO [notification-service:pool-4-thread-1]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  2 events to processing. Events updated ids are [382ba261-47b4-405f-8bf9-99f28c19d0ed, 9ad72fe5-272f-43d5-89bc-06950eaa659d]

Log lines confirm that threads processed the same events.

This is not right. Let’s see what we can do.

Postgres has SELECT FOR UPDATE that locks all the rows that match the condition. So, we can change our query to the following and things should work.

@Modifying
@Query(value = """
        update notification_events
            set notification_event_status='PROCESSING'
            where id IN (
                select id from notification_events e
                where notification_event_status = 'ENQUEUED'
                order by created_at
                FOR UPDATE
                limit ?1)
        RETURNING *""", nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);

If you run the test cases they will pass. You can declare victory.

Not so fast. If you look at the logs you will see that execution is serialized. Both threads are not running in parallel.

Let’s look at the logs

2022-01-27 08:22:25.353  INFO [notification-service:pool-4-thread-1]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  2 events to processing. Events updated ids are [5a11ec88-8861-46bb-b968-21da962c9c5a, 3026301a-7b49-4573-ae3f-13c76f77352e]


2022-01-27 08:22:25.757  INFO [notification-service:pool-4-thread-1]
                c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished

So, execution of thread-1 started at 08:22:25.353 and ended at 08:22:25.757

For the second thread, execution was

2022-01-27 08:22:25.768  INFO [notification-service:pool-4-thread-2]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  1 events to processing. Events updated ids are [26901e70-4f37-43e3-a265-4394ef6bef15]

2022-01-27 08:22:25.792  INFO [notification-service:pool-4-thread-2]
                c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished

So, execution of thread-2 started at 08:22:25.768 and ended at 08:22:25.792.

This is Postgres database row level locking in action.

There is another issue that I couldn’t reproduce in my tests but I read in post by folks at 2ndquadrant.

If two are started at exactly the same moment, the subSELECTs for each will be processed first. (It’s not that simple, but we can pretend for this purpose. Don’t rely on subqueries executing in any particular order for correctness). Each one scans for a row with is_done = 'f'. Both find the same row and attempt to lock it. One succeeds, one waits on the other one’s lock. Whoops, your “concurrent” queue is serialized. If the first xact to get the lock rolls back the second gets the row and tries the same row.

If the first xact commits, the second actually gets zero rows and UPDATE and returns nothing. PostgreSQL doesn’t re-execute the SELECT part, it just notices that the row was modified by another transaction while locked and re-evaluates the WHERE clause for the row. Since is_done = 't' now, the row no longer matches the condition and is excluded. The subquery returns zero rows, which is null, and no itemid is = NULL because nothing is equal to null, so the UPDATE does nothing.

It could happen that one thread does not process any row. So, we will only process two events. I couldn’t reproduce it.

The solution as suggested by 2ndquadrant folks is to use SKIP LOCKED with FOR UPDATE as shown below.

@Modifying
@Query(value = """
        update notification_events
            set notification_event_status='PROCESSING'
            where id IN (
                select id from notification_events e
                where notification_event_status = 'ENQUEUED'
                order by created_at
                FOR UPDATE SKIP LOCKED
                limit ?1)
        RETURNING *""", nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);

The open transaction holds a lock on the row now. Other transactions will skip it so long as this transaction keeps on running. If this transaction aborts, the row becomes unlocked and the deletion is undone by the abort so another transaction can grab the row. If this transaction commits, the deletion is committed along with the other changes in the xact, so other xacts won’t see the queue item anymore.

Logs also tell the same story. Both threads are doing their work in parallel.

2022-01-27 08:36:23.460  INFO [notification-service:pool-4-thread-1]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  1 events to processing. Events updated ids are [70e25068-44e3-4bd4-ada6-eeece2a8fbe8]
2022-01-27 08:36:23.460  INFO [notification-service:pool-4-thread-2]
                c.k.m.n.scheduler.NotificationPoller - Updated state of  2 events to processing. Events updated ids are [b817c8e9-2083-4c24-952e-140a4a3f854f, f2ca8eb3-2cb9-4441-abc9-c2cd73d60c43]20
2022-01-27 08:36:23.812  INFO [notification-service:pool-4-thread-1]
                c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished

2022-01-27 08:36:23.821  INFO [notification-service:pool-4-thread-2]
                c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished

We can rewrite our query using CTE to return results in sorted order based on created_at timestamp.

@Query(value = """
        with updated_notifications as (
            update notification_events
            set notification_event_status='PROCESSING'
            where id IN (
                select id from notification_events e
                where notification_event_status = 'ENQUEUED'
                order by created_at FOR UPDATE SKIP LOCKED LIMIT ?1)
            RETURNING *)
        select * from updated_notifications order by created_at""",
        nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);

That’s it for today.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: