In previous blog post I wrote down some unconventional thoughts on the characteristics I'm most attracted to when thinking about what I want from a stream processing library¹. To summarize:
The purpose of this blog post is to go into some depth on the first of these capabilities - the only one that brings with it significant implementation complexity.
¹I should note that although I've worked with Kafka quite a lot, I'm still pretty new to stream processing and these opinions may not hold up as I continue to work them through. Definitely take everything I say here with a grain of salt!
There are various factors driving the above preferences. What's behind #1 is a desire to build a diverse set of applications on top of streaming data. I'd like to be able to use the state stores provided by my stream processing library like a database - utilize the streaming platform as a system-of-record. Unfortunately (to my knowledge) there is no support for multi-column uniqueness or referential integrity in any existing stream processing library - two requirements frequently demanded by applications.
A bunch of "NewSQL" databases are now available that provide horizontally scalable solutions to these requirements (and more!). These include Cockroach DB, Vitess, YugaByte, Fauna and TiDB. When I talk to people about the idea of building NewSQL style functionality on top of Kafka, they commonly ask "why not just use one of these databases? wouldn't it be better?"
The answer to this question is they are not tightly integrated with stream processing capability, and I think there's likely a lot of value in doing that.
There's an additional benefit worth calling out as well - limiting the number of different systems you need to manage brings significant operational efficiency.
What I've been working on most recently is a proof-of-concept table abstraction, backed by Kafka, that allows multiple unique constraints to be enforced. For the remainder of this blog post, I'll provide some notes on the implementation. If you're just here for high level ideas, you can stop reading now :-).
Disclaimer: These are early days, and I'm not an expert at this. There are likely to be holes ... hopefully no fundamental flaws, but maybe. Also this is hard to explain. If this work goes anywhere, you can expect this to be just a first draft...
We fundamentally need to work with separate partitionings of the data by each of the unique keys.
For each partitioning, we'll have two topics:
Here are the topics required for an example users table that has a unique constraint applied to three columns (id, username and email):
id username email [ change_log ] [ change_log ] [ change-log ] [ command ] [ command ] [ command ]
All topics (change log + command) will have an equal number of partitions.
There will be one process per partition number.
This process is responsible for that partition number for all topics. i.e.:
process for process for process for partition 0 partition 1 partition 2 ------------------ ----------------- ----------------- id changelog id changelog id changelog id command id command id command username changelog username changelog username changelog username command username command username command email changelog email changelog email changelog email command email command email command
Each process will have one consume loop (i.e. thread) per topic. This will be called the controller thread for that unique key partition. So in our example, there will be:
None of the columns with a unique constraint is more important than any of the others (there is no primary key).
However, each operation on the table will be with respect to a specific key (from now on, referred to as the active key for the operation). For example:
Each operation / request must be issued to the TablePartition instance that owns the value of the active key of the operation.
This can be determined using the table partitioner. There is a single partitioner used for all keys. There is currently only support for string values.
Get operations are simple - just look up the active key value in the relevant materialized view. In the initial implementation, this is just an in-memory hash table.
Add, Update, and Delete all work in a similar way:
An additional complication not discussed above is to that all logic must be correct under the assumption of at-least-once message delivery semantics. In particular there is a fair amount of work in managing when offsets should be committed.
I've glossed over a number of other details as well - the implementation turned out to be a fair bit more complex than I anticipated!
Here's an diagramatic representation of the flow of commands required to update a value (change username from john -> johns) in a table with three unique columns:
This operation requires 9 writes to Kafka. If each topic is replicated 3 times, that's 27 log appends - quite a lot of overhead.
Also, there are 4 stages which must complete serially, the latency of each stage being determined by the maximum latency of more than one sub-operation.
This might seem expensive, however: