I am building a central notification dispatch system that is responsible for sending different kinds of notifications to the end customer. It relies on multiple third party APIs for sending the actual email/SMS notifications. At a high level architecture of the system is shown below.

NotificationSender exposes both REST and messaging interface for accepting consumer requests. Consumers here refer to the services that need to send the notification. This is what notification system does:
- It accepts requests from upstream services and stores that in the Postgres database after doing validation. The notification event is written to the Postgres database in
ENQUEUED
state. It is returns back HTTP 202ACCEPTED
to the upstream services if the request is valid else it returns HTTP 400Bad Request
. - At a predefined frequency a poller that is part of the NotificationDispacther polls the Postgres database for new notification events i.e. events in
ENQUEUED
state. For now, it respects insertion time order. - If enqueued events are found then it processes them and sends actual notifications using the downstream SMS and Email services.
- After processing the events it change state of the events to processed
I went with the poller based approach for its simplicity. Also, this design meets our NFR of sending at peak 10 million notifications per day.
I will not talk about the actual event processing in this post. I might cover it in a different post. Today, I will only focus on the queue aspect of our design.
One thing we considered from the start is our system’s ability to run multiple instances. When you have multiple dispatchers then you have to consider how each dispatcher will process unique events and each enqueued event is processed by one dispatcher only. This is where all the complexity lies in using Postgres as the queue.
In this post, I will use Java as a programming language. We are using Java17, Spring Boot 2.6.2, and Postgres 12.6 (we use AWS Aurora Postgres).
Let’s look at our poller code. It is a simple Spring bean as shown below.
public class NotificationPoller {
@Autowired
private NotificationEventRepository notificationEventRepository;
@Value("queue.batch.size")
private int batchSize;
@Transactional
@Scheduled(fixedDelay = 60_000)
public void sendNotifications() {
List<NotificationEvent> events = this.notificationEventRepository
.findAndUpdate(batchSize);
if (events.isEmpty()) {
log.info("No notification events found to process so returning");
return;
}
log.info("Processing {} events with ids {}", events.size(), eventIds);
// sort the events since returned result set is not sorted
// process the events
// change state to processed or failed depending on the situation
}
}
Every minute our poller will find the enqueued events and process them.
The interesting part here is the findAndUpdate
method of NotificationEventRepository
. You want to achieve following from it:
- Find the enqueued events and change their state to processing in a single atomic operation
- If two pollers query at the same time then they both should get a different set of events. For example, if our batch size is 1 and we have two notification events in the enqueued state then they both should get one event each. And, both events should be different. Similarly, if batch size is 2 and we have two poller then only one should do the processing and second should not get any event
- This point is covered at first but I am again repeating it. We want to return the updated events in the query result so that we don’t have to make a seperate query to fetch events
Let’s try to achieve this in a couple of steps.In the first step we will achieve point 1 and 3.
public interface NotificationEventRepository extends JpaRepository<NotificationEvent, String> {
@Modifying
@Query(value = """
update notification_events
set notification_event_status='PROCESSING'
where id IN (
select id from notification_events e
where notification_event_status = 'ENQUEUED'
order by created_at
limit ?1)
RETURNING *""", nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);
}
In the code shown above
- The subquery selects the enqueued events ordered by their timestamp and limit that to our batch size
- Next, we update all the events returned by subquery to
PROCESSING
- And, finally using the
RETURNING
we return the result set. Spring Data JPA converts it toNotificationEvent
This might look like it will work. But, let’s write a test case to see how it works if we have multiple threads calling it.
We will use CountDownLatch to test it.
@Test
void two_threads_should_not_process_same_event_twice() throws Exception {
var events = List.of(newEvent(),newEvent(),newEvent())
this.notificationEventRepository.saveAll(events);
var numberOfThreads = 2;
CountDownLatch countDownLatch = new CountDownLatch(numberOfThreads);
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
this.poller.sendNotifications();
countDownLatch.countDown();
});
}
boolean reachedZero = countDownLatch.await(5, TimeUnit.SECONDS);
if (!reachedZero) {
fail("Test failed because countdown didn't reach zero");
}
assertThat(notificationEventRepository.findAll().stream().map(NotificationEvent::getNotificationEventStatus))
.containsExactly(NotificationEventStatus.SUCCESSFULLY_PROCESSED,
NotificationEventStatus.SUCCESSFULLY_PROCESSED,
NotificationEventStatus.SUCCESSFULLY_PROCESSED);
executorService.shutdown();
}
In the test case above we have following setup:
- We saved three events in the db
- Next, we create two threads and submitted them a task to call
sendNotifications
method - We used
CountDownLatch
that allows one or more threads to wait until a set of operations being performed in other thread completes - Finally, we asserted that all the three events are processed
If we run the test it will fail with the following failure message
Expecting actual:
[ENQUEUED, SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED]
to contain exactly (and in same order):
[SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED, SUCCESSFULLY_PROCESSED]
So, it looks like our threads picked the same events twice. We can verify this by looking at our logs.
Thread 2 logs
2022-01-27 07:21:38.289 INFO [notification-service:pool-4-thread-2]
c.k.m.n.scheduler.NotificationPoller - Updated state of 2 events to processing. Events updated ids are [382ba261-47b4-405f-8bf9-99f28c19d0ed, 9ad72fe5-272f-43d5-89bc-06950eaa659d]
Thread 1 logs
2022-01-27 07:21:38.301 INFO [notification-service:pool-4-thread-1]
c.k.m.n.scheduler.NotificationPoller - Updated state of 2 events to processing. Events updated ids are [382ba261-47b4-405f-8bf9-99f28c19d0ed, 9ad72fe5-272f-43d5-89bc-06950eaa659d]
Log lines confirm that threads processed the same events.
This is not right. Let’s see what we can do.
Postgres has SELECT FOR UPDATE
that locks all the rows that match the condition. So, we can change our query to the following and things should work.
@Modifying
@Query(value = """
update notification_events
set notification_event_status='PROCESSING'
where id IN (
select id from notification_events e
where notification_event_status = 'ENQUEUED'
order by created_at
FOR UPDATE
limit ?1)
RETURNING *""", nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);
If you run the test cases they will pass. You can declare victory.
Not so fast. If you look at the logs you will see that execution is serialized. Both threads are not running in parallel.
Let’s look at the logs
2022-01-27 08:22:25.353 INFO [notification-service:pool-4-thread-1]
c.k.m.n.scheduler.NotificationPoller - Updated state of 2 events to processing. Events updated ids are [5a11ec88-8861-46bb-b968-21da962c9c5a, 3026301a-7b49-4573-ae3f-13c76f77352e]
2022-01-27 08:22:25.757 INFO [notification-service:pool-4-thread-1]
c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished
So, execution of thread-1 started at 08:22:25.353
and ended at 08:22:25.757
For the second thread, execution was
2022-01-27 08:22:25.768 INFO [notification-service:pool-4-thread-2]
c.k.m.n.scheduler.NotificationPoller - Updated state of 1 events to processing. Events updated ids are [26901e70-4f37-43e3-a265-4394ef6bef15]
2022-01-27 08:22:25.792 INFO [notification-service:pool-4-thread-2]
c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished
So, execution of thread-2 started at 08:22:25.768
and ended at 08:22:25.792
.
This is Postgres database row level locking in action.
There is another issue that I couldn’t reproduce in my tests but I read in post by folks at 2ndquadrant.
If two are started at exactly the same moment, the sub
SELECT
s for each will be processed first. (It’s not that simple, but we can pretend for this purpose. Don’t rely on subqueries executing in any particular order for correctness). Each one scans for a row withis_done = 'f'
. Both find the same row and attempt to lock it. One succeeds, one waits on the other one’s lock. Whoops, your “concurrent” queue is serialized. If the first xact to get the lock rolls back the second gets the row and tries the same row.If the first xact commits, the second actually gets zero rows and
UPDATE
and returns nothing. PostgreSQL doesn’t re-execute theSELECT
part, it just notices that the row was modified by another transaction while locked and re-evaluates theWHERE
clause for the row. Sinceis_done = 't'
now, the row no longer matches the condition and is excluded. The subquery returns zero rows, which is null, and no itemid is= NULL
because nothing is equal to null, so theUPDATE
does nothing.
It could happen that one thread does not process any row. So, we will only process two events. I couldn’t reproduce it.
The solution as suggested by 2ndquadrant folks is to use SKIP LOCKED
with FOR UPDATE
as shown below.
@Modifying
@Query(value = """
update notification_events
set notification_event_status='PROCESSING'
where id IN (
select id from notification_events e
where notification_event_status = 'ENQUEUED'
order by created_at
FOR UPDATE SKIP LOCKED
limit ?1)
RETURNING *""", nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);
The open transaction holds a lock on the row now. Other transactions will skip it so long as this transaction keeps on running. If this transaction aborts, the row becomes unlocked and the deletion is undone by the abort so another transaction can grab the row. If this transaction commits, the deletion is committed along with the other changes in the xact, so other xacts won’t see the queue item anymore.
Logs also tell the same story. Both threads are doing their work in parallel.
2022-01-27 08:36:23.460 INFO [notification-service:pool-4-thread-1]
c.k.m.n.scheduler.NotificationPoller - Updated state of 1 events to processing. Events updated ids are [70e25068-44e3-4bd4-ada6-eeece2a8fbe8]
2022-01-27 08:36:23.460 INFO [notification-service:pool-4-thread-2]
c.k.m.n.scheduler.NotificationPoller - Updated state of 2 events to processing. Events updated ids are [b817c8e9-2083-4c24-952e-140a4a3f854f, f2ca8eb3-2cb9-4441-abc9-c2cd73d60c43]20
2022-01-27 08:36:23.812 INFO [notification-service:pool-4-thread-1]
c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished
2022-01-27 08:36:23.821 INFO [notification-service:pool-4-thread-2]
c.k.m.n.scheduler.NotificationPoller - Notification poller execution finished
We can rewrite our query using CTE to return results in sorted order based on created_at timestamp.
@Query(value = """
with updated_notifications as (
update notification_events
set notification_event_status='PROCESSING'
where id IN (
select id from notification_events e
where notification_event_status = 'ENQUEUED'
order by created_at FOR UPDATE SKIP LOCKED LIMIT ?1)
RETURNING *)
select * from updated_notifications order by created_at""",
nativeQuery = true)
List<NotificationEvent> fetchAndUpdateEventsStateToProcessing(int batchSize);
That’s it for today.