Matt Howlett

Matt Howlett

every blog should have a tagline

NewSQL + Streaming == NewStreams?


In previous blog post I wrote down some unconventional thoughts on the characteristics I'm most attracted to when thinking about what I want from a stream processing library¹. To summarize:

  1. NewSQL like state store capability.
  2. Lots of disparate processor types & no need for a DSL linking them together.
  3. Static partition assignments.

The purpose of this blog post is to go into some depth on the first of these capabilities - the only one that brings with it significant implementation complexity.

¹I should note that although I've worked with Kafka quite a lot, I'm still pretty new to stream processing and these opinions may not hold up as I continue to work them through. Definitely take everything I say here with a grain of salt!

Two Elephants in the Room

There are various factors driving the above preferences. What's behind #1 is a desire to build a diverse set of applications on top of streaming data. I'd like to be able to use the state stores provided by my stream processing library like a database - utilize the streaming platform as a system-of-record. Unfortunately (to my knowledge) there is no support for multi-column uniqueness or referential integrity in any existing stream processing library - two requirements frequently demanded by applications.

A bunch of "NewSQL" databases are now available that provide horizontally scalable solutions to these requirements (and more!). These include Cockroach DB, Vitess, YugaByte, Fauna and TiDB. When I talk to people about the idea of building NewSQL style functionality on top of Kafka, they commonly ask "why not just use one of these databases? wouldn't it be better?"

The answer to this question is they are not tightly integrated with stream processing capability, and I think there's likely a lot of value in doing that.

  • A NewSQL database is useful.
  • A stream processing library is useful.
  • A hybrid system would be extra useful - I think greater than the sum of the parts useful.

There's an additional benefit worth calling out as well - limiting the number of different systems you need to manage brings significant operational efficiency.

Multiple Unique Constraints - Implementation Notes

What I've been working on most recently is a proof-of-concept table abstraction, backed by Kafka, that allows multiple unique constraints to be enforced. For the remainder of this blog post, I'll provide some notes on the implementation. If you're just here for high level ideas, you can stop reading now :-).

Disclaimer: These are early days, and I'm not an expert at this. There are likely to be holes ... hopefully no fundamental flaws, but maybe. Also this is hard to explain. If this work goes anywhere, you can expect this to be just a first draft...

Let the Fun Begin

We fundamentally need to work with separate partitionings of the data by each of the unique keys.

For each partitioning, we'll have two topics:

  1. A change log topic (compacted), which will back a materialized view of each partitioning. These topics will only be read from when a materialized view needs to be regenerated.
  2. A 'command' topic, which will be used to validate commands, update the materialized view in response to new commands and provide a fine-grained locking mechanism.

Here are the topics required for an example users table that has a unique constraint applied to three columns (id, username and email):

      id             username           email
[ change_log ]    [ change_log ]    [ change-log ]
[  command   ]    [  command   ]    [  command   ]

All topics (change log + command) will have an equal number of partitions.

There will be one process per partition number.

This process is responsible for that partition number for all topics. i.e.:

     process for                 process for                 process for
     partition 0                 partition 1                 partition 2
  ------------------           -----------------           -----------------
  id changelog                 id changelog                 id changelog
  id command                   id command                   id command
  username changelog           username changelog           username changelog
  username command             username command             username command
  email changelog              email changelog              email changelog
  email command                email command                email command

Each process will have one consume loop (i.e. thread) per topic. This will be called the controller thread for that unique key partition. So in our example, there will be:

  • 3 processes
  • each with 6 controller threads (2 for each unique column) for a total of
  • 18 controller threads, reading from a single partition each.

None of the columns with a unique constraint is more important than any of the others (there is no primary key).

However, each operation on the table will be with respect to a specific key (from now on, referred to as the active key for the operation). For example:

  • update the user with username "matt" (active key is username).
  • get the user with id "42" (active key is id).

Each operation / request must be issued to the TablePartition instance that owns the value of the active key of the operation.

This can be determined using the table partitioner. There is a single partitioner used for all keys. There is currently only support for string values.

Get operations are simple - just look up the active key value in the relevant materialized view. In the initial implementation, this is just an in-memory hash table.

Add, Update, and Delete all work in a similar way:

  1. A Command_Change event is placed in the command topic of the active key.
  2. A TaskCompletionSource is created and an associated Task (a future) returned immediately to the application. The Task will be completed when the operation is complete and data across all partitions has been committed or aborted.
  3. A Command_Enter event is placed in the command topic of all other unique columns relevant to the operation. For add operations, events will be placed in partitions corresponding to the value of each of the other unique columns. For update commands, an additional Command_Enter event is needed for each unique column value that has changed (to remove the old value).
  4. The Command_Enter event instructs the relevant partition controller to lock the target value (assuming a uniqueness constraint isn't violated).
  5. Corresponding to each Command_Enter event, a Command_Verify event is sent back to the active key topic. This event includes information as to whether the operation was valid (violates a unique constraint) or not.
  6. When all Command_Verify events have been received by the active key partition controller, a corresponding Command_Exit event is sent back to the other command partitions that are currently locked.
  7. If all of the Command_Verify operations were successfully, the Command_Exit events will result in materialized state being updated. If not, they will simply unlock the value, and no changes will be made.
  8. Corresponding to each Command_Exit event, if the operation is not aborted, a Command_Ack event will be sent to the active key value partition.
  9. When all acks have been received by the active key controller, the application will be notified by completing the future. In the event of an aborted operation, this occurs earlier (after all verify events have been received).

An additional complication not discussed above is to that all logic must be correct under the assumption of at-least-once message delivery semantics. In particular there is a fair amount of work in managing when offsets should be committed.

I've glossed over a number of other details as well - the implementation turned out to be a fair bit more complex than I anticipated!

Here's an diagramatic representation of the flow of commands required to update a value (change username from john -> johns) in a table with three unique columns:

network settings dialog

This operation requires 9 writes to Kafka. If each topic is replicated 3 times, that's 27 log appends - quite a lot of overhead.

Also, there are 4 stages which must complete serially, the latency of each stage being determined by the maximum latency of more than one sub-operation.

This might seem expensive, however:

  • This is a fundamentally difficult problem - any approach is going to be expensive.
  • The approach is linearly scalable.
  • The approach can be optimized. For example, it would be possible to avoid having a replication factor of 3 on some topics, though this would make the implementation substantially more difficult.
  • Kafka has amazing throughput - I suspect throughput of this approach is going to be relatively good despite the 27x write amplification. Latency is going to be fairly poor though - I'd estimate best case is going to be ~50ms average write latency on a fairly under-utilized cluster. Read latency will be very fast - determined by the KV store.

The Plan Moving Forward

  • Get the implementation to a point where it's useful.
  • Put a lot of effort into understanding correctness / failure scenarios. Talk to experts.
  • Move on to implementing foreign key constraints.