0

I am building a consumer application that reads messages from a queue and runs basic sanitization and deduplication and persists them to an RDBMS.

The functional requirements are

  • messages must not be lost
  • Validate and sanitize the arriving messages before persisting them.
  • Invalid messages go to the database in an error table with an error code.
  • Valid messages go to the database in the main table.

Non functional requirements

  • The database must have every single record that arrived from the queue (valid or invalid).
  • The system must process 1-5 million records per hour or more.

Tech stack used

  • spring boot cloud stream (consumer stack)
  • Solace pubsub+ for message queue
  • Sql server as the database

Points to note:

  • My deduplication checks the database against every query - something I don't like.
  • The application uses two uuids in the message payload as the primary key so no query is wasted to fetch next primary keys in the database. The combination of these two uuids will almost never collide.
  • I use 10 concurrent consumers to speed up the message consumptions.

What optimizations can I make to the above system to make the writes fast and still consistent?

Sujal Mandal
  • 131
  • 1
  • 5
  • sounds like 99% of the work is done by the db. you might have rethink what "duplicate" really means – Ewan Jan 21 '23 at 19:46

1 Answers1

1

Hmmm, my first design choice likely would have been Kafka. It scales, it has a terrific story for exactly-once delivery with causal ordering, and the log replay comes in very handy, both in dev and during exceptional events in prod.

messages must not be lost

Good. Looks like you'll have to use Solace at-least-once delivery. And then de-dup becomes interesting.

5 million records per hour

So 1400 row/second, more during peak bursts. Backend is a SQL server; I recommend you adopt Postgres with partitioned table. Bench it to verify throughput in your environment. Then bench end-to-end, with Solace plus backend.

It's unclear why you chose ten workers. Typically we measure first, then size appropriately. Let's assume each worker must handle 140 row/second. Sounds reasonable.

The most important thing to work out at this stage of the design is your dedup approach. It sounds like the source data can produce duplicates. And Solace can produce dups, for example around the time that servers reboot. At the time it is originally created, there is something about a record that makes it "unique" at app level, some subset of its fields. And we can hash those fields. Ideally you would route on the hash. Failing that you can identify ten or more values of field f1, or values of some (f1, f2, f3) tuple, that can be assigned across the ten workers for somewhat even loading. It sounds like your uuid1 field fits the bill. I'm not sure what's going on with your uuid2 field, perhaps you feel 128 random bits doesn't offer collision safety and you need the catenated 256.

DB tables will have compound Primary Key of (uuid1, uuid2). Or perhaps you decide that uuid1 suffices, and uuid2 is simply stored as an attribute. We store UUIDs on-disk in efficient binary form.

We have many messages flowing through the system, and there will be m1, m2 duplicate pairs. The key is to ensure that if m1 & m2 could potentially be dups, they will be routed to the same worker. That lets workers maintain an in-memory priority queue of recently seen messages. Write down a specification for max delay a dup message might experience. That will let you size the pqueue appropriately. When workers come and go, there can still be room to lose, so the backend's unique PK index is still the final dup arbiter. Be prepared to see an INSERT / COMMIT occasionally fail with "unique constraint violated", and go back to carefully insert rows one-by-one, or at least send them to a table on the side where another worker can spend time carefully inserting them.

Which brings us to batch size.

There is no way you can possibly COMMIT single rows at a time, you just won't be able to keep up with offered load. As a rule of thumb, it is usually desirable for bulk inserts to send around 500 msec of work to the DB, and no more than 5 seconds worth. More than that isn't bad, it just won't see any throughput gains, there's no more network and I/O latency for us to amortize over.

So your minimum batch size is likely to be ~ 70 rows being inserted per COMMIT. Benchmark this early in the project, and verify your ten workers all get along with one another. This will inform your table partitioning strategy. Likely you will want backend sharding to match how you're routing messages to workers, based on same field(s).

Let's talk about timestamps.

Inevitably you will want to do reporting on recent rows, and at some point you will discard ancient rows. Hopefully the origin generates timestamps, using NTP synchronized clocks. Write down the max permitted skew. Part of your validation process will be to sanity check stamps and declare error for bad timestamp.

Consider putting some form of timestamp, maybe with daily or hourly granularity, at the front of your compound PK, to keep recent rows together. It will make a huge performance difference for anything that reports on recently arrived rows. It will also make it easier to quickly prune ancient rows. If your partitioning scheme exposes such timestamps, then a DROP is the ideal way to discard year-old rows.

It has been said that the man who has two wristwatches never knows what time it is. There may be some temptation to tack on a timestamp as messages encounter one layer or another. Resist that temptation. If you do succumb, and add a received_at column, be sure to write down the semantics. That way maintainers a year or two down the road will know whether they should be examining a date range based on this stamp or that one.

J_H
  • 2,739
  • 11
  • 19
  • Thank you for the response! A lot of information there. I can already identify some nice tricks that I can already use. Do add to it if you have any other ideas. – Sujal Mandal Jan 22 '23 at 18:51