Matt Howlett

Matt Howlett

Every Blog Should Have a Tagline

Apache Kafka, .NET and Protobuf Serialized Data

2018-05-31

Confluent's Apache Kafka Client for .NET ships with a serializer and deserializer for the Avro serialization format, seamlessly integrated with Confluent Schema Registry. Avro is well matched to scenarios where you would like your schemas to be centrally managed, and as i'll explain in an upcoming blog post, this is often very desirable - especially in more complex scenarios - because it decreases coupling of producer and consumer applications.

In this post, I'll talk a little bit about another popular serialization format - protobuf. Unlike Avro, protobuf serialized data can be deserialized without the writer schema present. It's therefore possible to use protobuf without any system in place for schema management.

The .NET Kafka client doesn't ship with protobuf support out of the box, but it's straightforward to implement this yourself. Here's a suitable serializer:

public class ProtoSerializer<T> : ISerializer<T> where T : Google.Protobuf.IMessage<T>
{
    public IEnumerable<KeyValuePair<string, object>> 
        Configure(IEnumerable<KeyValuePair<string, object>> config, bool isKey)
            => config;

    public void Dispose() {}

    public byte[] Serialize(string topic, T data)
        => data.ToByteArray();
}

That's all! Default implementations are all you need for the Configure and Dispose methods and to Serialize, you just need to call the ToByteArray method on your protobuf IMessage instance.

This serializer can be used with any type generated using the protoc tool. Check out the official tutorial for more information on using this tool and protobuf with C# in general.

Let's define a simple LogMsg message type:

syntax = "proto3";

package example.proto;

enum Severity {
    None = 0;
    Verbose = 1;
    Info = 2;
    Warning = 3;
    Error = 4;
}

message LogMsg {
    string IP = 1;
    string Message = 2;
    Severity Severity = 3;
}

...and generate the corresponding C# classes:

protoc message.proto --csharp_out .

Now you're all set to produce a message to Kafka like so:

var config = new Dictionary<string, object> { { "bootstrap.servers", "localhost:9092" }};

using (var p = new Producer<Null, LogMsg>(config, null, new ProtoSerializer<LogMsg>()))
{
    p.ProduceAsync(
        "log-topic", 
        new Message<Null, LogMsg> { Value = new LogMsg 
            { 
                IP = "127.0.0.1",
                Message = "Connection established",
                Severity = Severity.Info 
            } 
    }).Wait();
}

Note that I'm using the new produce API available in the 1.0-experimental-3 version of the Confluent.Kafka nuget package (which I happen to be unreasonably excited about).

A generic protobuf deserializer is only slightly more tricky to implement than the serializer:

public class ProtoDeserializer<T> : IDeserializer<T> 
    where T : Google.Protobuf.IMessage<T>, new()
{
    private MessageParser<T> _parser;

    public ProtoDeserializer() { _parser = new MessageParser<T>(() => new T()); }

    public IEnumerable<KeyValuePair<string, object>> 
        Configure(IEnumerable<KeyValuePair<string, object>> config, bool isKey)
            => config;

    public void Dispose() {}

    public T Deserialize(string topic, byte[] data)
        => _parser.ParseFrom(data);
}

In this case, a MessageParser<T> instance is required in order to deserialize the message - we create this in the constructor.

Finally you can use the deserializer as follows:

var cconfig = new Dictionary<string, object> 
{ 
    { "bootstrap.servers", "localhost:9092" },
    { "group.id", Guid.NewGuid().ToString() }
};

using (var c = new Consumer<Null, LogMsg>(cconfig, null, new ProtoDeserializer<LogMsg>()))
{
    c.Subscribe("log-topic");

    while (true)
    {
        if (c.Consume(out ConsumerRecord<Null, LogMsg> msg, TimeSpan.FromSeconds(1)))
        {
            Console.WriteLine(msg.Message.Value);
        }
    }
}

It's worth noting that the type used by the consumer does not need to exactly match the type of the serialized data - it just needs to be compatible. The protobuf compatibility rules are fairly straightforward - you can read more about them in the official docs.

That's all for now! Protobuf is a widely used serialization format and as demonstrated in this article, it's super simple to use with Kafka and .NET.

Happy coding!