Matt Howlett

Matt Howlett

every blog should have a tagline

Sneak Peak: Confluent Schema Registry & Protobuf

2020-01-14

Support for Protobuf and JSON serialization formats in Confluent Schema Registry is nearly here!

Nothing's been released yet, but the PRs are open on the Schema Registry repo, and you can use them to get a working protobuf (or JSON) enabled Schema Registry and start playing with it.

In this blog post I'm going to walk though how to do that and give some related commentary. I'll be using the basic Java client, but you can expect to see broader support across the Confluent Platform. Without giving away any company secrets, the above mentioned PRs also include Kafka Connect integration and the KSQL team are also working on this right now.

Building a Protobuf Enabled Schema Registry

You want the add-protobuf-support branch by Robert Yokota (there's also another branch with JSON support). I have a tmp directory inside my git workspace directory that I use for repos that aren't important / won't be long lived. Assuming you have the same:

cd ~/git/tmp
git clone git@github.com:rayokota/schema-registry.git

To build:

cd schema-registry
mvn package -DskipTests

To install packages in the local maven cache (allows them to be used by the example application):

mvn install -DskipTests

Assuming you have a Kafka broker running on localhost:9092 (if you don't check out the Confluent Platform Quick start):

./bin/schema-registry-start ./config/schema-registry.properties

Now you have a protobuf enabled schema registry running on port 8081.

Tip: Check for the following text in the schema registry log output: Registering schema provider for PROTOBUF - if you see that, then protobuf support is definitely enabled.

Important note: This PR isn't merged yet, and is still subject to change. It's also probably buggy. Don't rely on it in production!

Java Protobuf Serdes Example

I posted a simple example demonstrating use of the protobuf serializer / deserializer on github. To get started, I used the playbook outlined in a previous blog post and then copied what @gwenshap did in her LogLine Avro example.

Here's Gwen's Avro schema:

{"namespace": "JavaSessionize.avro",
  "type": "record",
  "name": "LogLine",
  "fields": [
    {"name": "ip", "type": "string"},
    {"name": "timestamp",  "type": "long"},
    {"name": "url",  "type": "string"},
    {"name": "referrer",  "type": "string"},
    {"name": "useragent",  "type": "string"},
    {"name": "sessionid",  "type": ["null","int"], "default": null}
  ]
}

And here's a protobuf message declaration for the same information:

syntax = "proto3";

package com.mhowlett;

import "google/protobuf/wrappers.proto";

message LogLine {
  string ip = 1;
  uint64 timestamp = 2;
  string url = 3;
  string referer = 4;
  string useragent = 5;
  google.protobuf.UInt64Value session_id = 6;
}

The UInt64Value type imported from the the standard google/protobuf/wrappers.proto definitions is used to effectively allow the session_id to be nullable - proto3 doesn't natively allow for nullable values.

Classes corresponding to this message type are generated at compile time using the com.github.os72.protoc-jar-maven-plugin plugin.

You then use these in your application code:

  long nEvents = 10;

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer",
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
  props.put("schema.registry.url", "http://127.0.0.1:8081");

  String topic = "test-proto";

  Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);

  Random rnd = new Random();

  for (long i = 0; i<nEvents; ++i) {
      LogLine event = EventGenerator.getNext(rnd);
      ProducerRecord<String, LogLine> record
        = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
      producer.send(record).get();
  }

  producer.close();

This code is very similar to the Avro case - the Avro serializers are just swapped out for the protobuf ones and likewise for the generated classes.

It's interesting to see what gets registered in Schema Registry after running this. There are two schemas:

> curl -X GET http://localhost:8081/subjects
["test-proto-value","google/protobuf/wrappers.proto"]

Unlike Avro, Protobuf allows importing of embedded message types and the Protobuf serdes register them all with Schema Registry separately.

The Schema Registry API has been extend to support the new requirements. e.g.:

> curl -X GET http://localhost:8081/subjects/test-proto-value/versions
[1]
> curl -X GET http://localhost:8081/subjects/test-proto-value/versions/1
{
    "subject": "test-proto-value",
    "version": 1,
    "id": 102,
    "schemaType": "PROTOBUF",
    "references": [
        {
            "name": "google/protobuf/wrappers.proto",
            "subject": "google/protobuf/wrappers.proto",
            "version": 1
        }
    ],
    "schema": "syntax = \"proto3\";\npackage com.mhowlett;\n\nimport \"google/protobuf/wrappers.proto\";\n\nmessage LogLine {\n  string ip = 1;\n  uint64 timestamp = 2;\n  string url = 3;\n  string referer = 4;\n  string useragent = 5;\n  google.protobuf.UInt64Value session_id = 6;\n}\n"
}

The new additions here are the schemaType field, which specifies the serialization type and the references field, which specifies the dependencies of the registered schema.

Like Avro, compatibility checks are performed when registering schemas and schemas that violate compatibility rules are disallowed.

Consuming

Consuming is equally easy. Here's some code to read in the messages from our producer example:

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", UUID.randomUUID().toString());
  props.put("auto.offset.reset", "earliest");
  props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer",
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
  props.put("specific.protobuf.class",
    com.mhowlett.LogLineOuterClass.LogLine.class);
  props.put("schema.registry.url", "http://127.0.0.1:8081");

  String topic = "test-proto";

  Consumer<String, LogLine> consumer = new KafkaConsumer<String, LogLine>(props);

  consumer.subscribe(Arrays.asList(topic));

  while (true) {
      ConsumerRecords<String, LogLine> records = consumer.poll(100);
      for (ConsumerRecord<String, LogLine> record : records) {
          System.out.printf("%s%n", record.value());
      }
  }

Unlike Avro, the protobuf serialization format includes enough information to allow serialized messages to be interpreted without the exact schema used to serialize the message being available. This means that the protobuf deserializer doesn't strictly need to communicate with Schema Registry in order to deserialize a message - if a message is incompatible with the schema used to generate the message class used by the deserializer, this can be known without the writer schema present. With that said, I notice that the schema.registry.url field is marked as required - i'll need to investigate that further.

Summary

Support for Protobuf and JSON is coming to Schema Registry. This means that soon, you'll be able to choose which serialization format is best for your needs rather than being forced to using Avro - which although good for some use cases and conceptually well suited to the streaming use case (the most suitable in fact) - is not the best format for a lot of people in many scenarios.

Very much looking forward to this change!

If you enjoyed this post, you might also like the one I did for the Confluent blog, which provides more of a discussion around the benefits of using a Schema Registry.