Matt Howlett

Matt Howlett

every blog should have a tagline

Librdkafka Developer Notes

2020-06-14

This "blog post" is just a collection of notes I made whilst trying to refine my understanding of librdkafka's architecture. I have ambitions to one day write a higher level overview of all this... but first things first.

The architecture of librdkafka is very analogous to the message based microservice architectures people are building on top of Kafka. Internally, librdkafka is like a mini distributed system with communication between different components running on different threads happening asynchonously via queues. A lot of the interaction of applications with librdkafka is also via queues.

My goal was to document all of librdkafka's queues and threads in a fair amount of detail. I'm now maybe half way towards this goal? Consider this a work in progress. I'm unlikely to get to finishing it in the near term because I don't have a pressing need, but what is here may be useful to some people doing some very specific Google searches, so making it public anyway.

Queues: Building Blocks

rd_kafka_op_s/t [rdkafka_op.h]

  • An item enqueable on rd_kafka_q_t.
  • Exposed in the public API in places as rd_kafka_event_t, but for ABI compatibility, internals are not public - all access is via function calls.
  • rd_kafka_op_ts of type (ERR, CONSUMER_ERR or FETCH) are converted to rd_kafka_message_ts by rd_kafka_message_get [rdkafka_msg.c] when consuming.
  • Selected properties:
    • rko_type - the type of the op (internal). there are many different op types. There are also two flags (bit 29 'CB' and bit 30 'REPLY'), used on this field.
    • rko_evtype - For ops exposed to the application, the event type.
    • rko_flags - miscellaneous flags attached to the op.
    • rko_version - allows outdated ops to be filtered out when being served.
    • rko_prio - if set, enqued item is placed according to priority.
    • rko_replyq - queue to place a result after processing the op.
    • rko_op_cb - the callback associated with rko_type | CB.
    • rko_u - op specific data (union).

rd_kafka_q_s/t [rdkafka_queue.h]

  • A queue of rd_kafka_op_ss.
  • Has forwarding capability:
    • If rkq_fwdq is set (also a rd_kafka_q_t), all operations on the queue will use that queue instead (recursively).
    • The one exception to that is rkq_serve provides a mechanism for using an op handling callback set on the original queue. On calling rd_kafka_q_enq to enqueue an op, rko->rko_serve will be set to the first (outermost) non-NULL rkq_serve in the forwarding chain.

rd_kafka_q_serve [rdkafka_queue.c]

  • A function for 'serving' (processing ops in) a rd_kafka_q_t. If an op cannot be handled then rd_assert.
  • If rd_kafka_q_t has a rkq_fwdq, then this is served instead.
  • Waits for up to timeout_ms for (an) op(s), using rkq_cond and rkq_lock.
  • Serves at most max_cnt ops using rd_kafka_op_handle which:
    • Attempts to handle the op using rd_kafka_op_handle_std.
    • If not handled, attempts to handle using rko_serve if set else attempts to handle with the passed in callback if set.
    • Else rd_assert.
  • cb_type parameter is passed to rd_kafka_op_handle_std and used by it. It's also passed to the callback (and is used by some of them) [enum values].

rd_kafka_q_pop_serve [rdkafka_queue.c]

  • Like rd_kafka_q_serve, but returns if/when the first unhandled op is encountered.
  • Has a version parameter which is used to ignore outdated ops (rd_kafka_q_serve doesn't have this):
    • If version is 0, no filtering is applied.
    • If version is set, ignores the op if rko_version < version.
    • Also ignores the op if rko_rktp is set and rko_version < rktp_version.

rd_kafka_op_handle_std [rdkafka_op.c]

  • rd_kafka_op_handle attempts to handle an op using this method first.
  • Behavior depends on cb_type and rko_type. Handling of ops is attempted as follows (in order):
    • cb_type == FORCE_RETURN - not handled.
    • rko is ctrl message - (calls rd_kafka_op_offset_store) then op is handled.
    • cb_type == EVENT && rko_type & CB - handling is delegated to rd_kafka_op_call, which calls rko_op_cb.
    • rko_type == RD_KAFKA_OP_RECV_BUF - calls rd_kafka_buf_handle_op then op is handled.
    • check if dest queue disable - handled.

RK Queues

These queues are created in rd_kafka_new [rdkafka.c]:

rk_rep [rdkafka_int.h]

  • Used to send notifications to the application.
  • Events that may be placed on this queue:
  • rd_kafka_poll_cb is related to this queue and used to handle ops in it (see below). However, it's also used to handle ops on other queues:
    • rk_ops->rkq_serve is set to this.
    • rk_logq->rkq_serve is set to this.
    • called directly in the function rd_kafka_background_queue_serve which is the callback specified when serving rk->rk_background.q.
    • called directly when batch consuming from rktp->rktp_fetchq (using rd_kafka_consume_batch).
    • called directly when consuming from specific toppars (using rd_kafka_consume and rd_kafka_consume_queue) on rktp->rktp_fetchq.
    • It's specified as the callback to use when serving the temporary queue set up to handle watermark offset request responses (in rd_kafka_query_watermark_offsets).
    • Same for rd_kafka_offsets_for_times.
    • Same for rd_kafka_list_groups.
    • In rd_kafka_queue_poll [rdkafka.c] (public API). This serves the passed in queue with rd_kafka_q_serve, where callback == rd_kafka_poll_cb and cb_type == EVENT. Used for handling result of admin operations with user created queue.
    • In rd_kafka_queue_poll_callback. Same as rd_kafka_queue_poll, but cb_type == CALLBACK.
  • To serve rk_rep, the application can:
    • Call rd_kafka_poll [rdkafka.c]. This serves with rd_kafka_q_serve, where callback == rd_kafka_poll_cb and cb_type == CALLBACK (that is important only to rd_kafka_poll, not rd_kafka_op_handle_std).
    • Call rd_kafka_consumer_poll [rdkafka.c] having previously redirected rk_rep to rkcg_q using rd_kafka_poll_set_consumer.
  • In the stats, replyq is set to the number ops in this queue.
  • Get the length of this queue with rd_kafka_outq_len (useful on shutdown).
  • AFAICT, rkq_serve is never set on rk_rep.
  • There's a RD_KAFKA_DR_MODE_EVENT mode, where delivery reports are not delivered to the application via CB's. TODO: Trace through how this works.

rk_ops [rdkafka_int.h]

  • Used to receive ops (from anywhere) to be actioned on the librdkafka main thread.
  • The main thread is started in rd_kafka_new. Ops are served in rd_kafka_thread_main [rdkafka.c]. No callback is passed to rd_kafka_q_serve, but rk_ops->rkq_serve is set to rd_kafka_poll_cb in rd_kafka_new.
  • rd_kafka_poll_cb is essentially responsible for handling all ops directly placed on this queue (not forwarded here) - rd_kafka_op_handle_std doesn't really come into play.
  • Events that may be placed on this queue:
    • Calling any of the admin functions, e.g. rd_kafka_DescribeConfigs puts a corresponding op on this queue. These ops get handled by rd_kafka_poll_cb by way of rk_ops->rkq_serve. rd_kafka_poll_cb handles the op by calling rd_kafka_op_call which calls rko_op_cb which has been set to rd_kafka_admin_worker on op creation. This implements some common worker state machine handling. This also results in additional ops being put on rk->rk_ops.
    • Reply queue for FindCoordinator requests.
    • Reply queue for InitProducerId requests (idempotent producer).
    • rktp->rktp_ops queues are forward to this.
    • Reply queue for metadata requests.
    • Bunch of stuff related to transactions.
    • Some stuff related to termination / purging.

rk_logq [rdkafka_int.h]

  • If set, log messages are sent to it rather than invoking log_cb on arbitrary threads.
  • Set with rd_kafka_set_log_queue and enabled by setting log.queue to true.
  • rkq_serve is set to rd_kafka_poll_cb. TODO: Why?
  • Application needs to serve the queue itself with rd_kafka_queue_poll or rd_kafka_queue_poll_callback. TODO: Check.

rk_background.q [rdkafka_int.h]

  • Any application exposed queue can be forwarded to this queue to effectively convert the queue based API into a callback based one.
  • Served by a dedicated background thread running rd_kafka_background_thread_main using rd_kafka_q_serve with callback rd_kafka_background_queue_serve. There's no rkq_serve set.
  • The background thread serving this queue is created and served if the background_event_cb config has been set.
  • rd_kafka_background_queue_serve calls background_event_cb if the op is eventable, rd_kafka_poll_cb otherwise.
  • Typically used with admin functions, but more generally applicable.

RKCG Queues

rkcg_ops

  • Used as the reply queue for all the consumer group related operations - heartbeat, join group, etc.
  • Forwarded to rk_ops, which is served in rd_kafka_thread_main.
  • rkcg_ops->rkq_serve is set to rd_kafka_cgrp_op_serve, so when rk_ops is served, this is used instead of rk_kafka_poll_cb which is used for ops directly sent to rk_ops.

rkcg_wait_coord_q

  • rkq_serve is set to rd_kafka_cgrp_op_serve, like rkcg_ops.
  • Usage: similar to rkcg_ops but when coordinator not available.

rkcg_q

  • Served by the application by calling rd_kafka_consumer_poll.
  • Alternatively, the application can use rd_kafka_queue_get_consumer to get the queue. TODO: I'm not exactly sure what it can/should do with it then.
  • Typically, rk_rep will be forwarded here with rd_kafka_poll_set_consumer when consuming, so two poll calls are not necessary.
  • rkq_serve is not set.
  • Events types that are sent directly to this queue: CONSUMER_ERR, REBALANCE.
  • rktp->rktp_fetchq is forwarded to this queue in rd_kafka_toppar_op_fetch_start from rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_partitions_fetch_start). TODO: consider old non-consumer group API, where I believe this is not the case.

RKTP Queues

rktp_fetchq

  • broker threads -> application communication.
  • Forwarded to rkcg_q when fetches are occuring, which in turn is polled by the application for consumed messages and other events.
  • ERR and CONSUMER_ERR events are put on this queue.
  • Op count and size are exposed in tp stats as fetchq_cnt and fetchq_size. Note: size is cached, and updated incrementally.
  • Basic consume API:
    • rd_kafka_consume, takes directly from rktp_fetchq for the specified toppar with rd_kafka_q_pop, rd_kafka_poll_cb and rd_kafka_message_get (to turn the op into a rd_kafka_message_t).
    • rd_kafka_consume_callback takes directly from rktp_fetchq for the specifed toppar with rd_kafka_q_serve, and rd_kafka_consume_cb (which invokes rd_kafka_message_get).
    • rd_kafka_consume_batch take directly from rktp_fetchq using rd_kafka_q_serve_rkmessages [rdkafka_queue.c]. makes use of rd_kafka_poll_cb.
  • Messages ops arrive on rktp_fetchq as follows:
    • rktp_fetchq passed into rd_kafka_msgset_reader_init.
    • rd_kafka_msgset_reader_run is run.
    • This calls rd_kafka_msgset_reader.
    • This calls rd_kafka_msgset_reader_msg_v0_1, or rd_kafka_msgset_reader_v2.
    • This calls rd_kafka_msgset_reader_msg_v2 which creates RD_KAFKA_OP_FETCH ops via rd_kafka_op_new_fetch_msg and rd_kafka_op_new_ctrl_msg.

rktp_ops

  • Forwarded to rk_ops soon after construction in rd_kafka_toppar_new0, meaning it's served by rd_kafka_thread_main.
  • rkq_serve is set to rd_kafka_toppar_op_serve. Ops handled by this:
    • FETCH_START, FETCH_STOP, SEEK, PAUSE, OFFSET_COMMIT, OFFSET_COMMIT | RD_KAFKA_OP_REPLY, OFFSET_FETCH | RD_KAFKA_OP_REPLY.
  • Used as reply queue for:
    • rd_kafka_toppar_offset_fetch in rd_kafka_toppar_offset_request puts an OFFSET_FETCH op on rkcg_ops. In rd_kafka_cgrp_op_serve, rd_kafka_op_handle_OffsetFetch is called with the op, which calls rd_kafka_handle_OffsetFetch).
    • rd_kafka_handle_OffsetFetch via rd_kafka_toppar_offset_request directly.

rktp_msgq_wakeup_q

  • TODO

rktp_msgq (rd_kafka_msgq_t)

  • TODO

rktp_xmit_msgq (rd_kafka_msgq_t)

  • TODO

Threads

Main Thread

  • Unless there is a specific reason why not, internal operations typically take place on this this thread.
  • It's also the primary synchronization point - operations that run on other threads typically have their result end up on rk_ops.
  • Served by rd_kafka_thread_main. Three sub tasks:
    • (1) Serve the rd_kafka_timers_t via rd_kafka_timers_next.
      • Timers (rk->rk_timers) are set up all over the codebase using rd_kafka_timer_start[_oneshot], passing an interval and callback. They're all served from here.
      • Side note: librdkafka also has rd_interval_t, which just keeps track of whether an interval has passed or not (and is not registered with a serving loop).
    • (2) Serve the rk_ops queue via rd_kafka_q_serve (with no callback - relies on rkq_serve, which may be different depending on origin queue / forwarding). In particular, rkcg_ops is forwarded here, and has callback rd_kafka_cgrp_op_serve.
      • TODO: any other queue forwarded here?
    • (3) Serve the consumer group state machine via rd_kafka_cgrp_serve.
      • States are enumerated as: RD_KAFKA_CGRP_STATE_*.
      • The _UP state has a sub state machine with states enumerated as: RD_KAFKA_CGRP_JOIN_STATE_*.
  • All consumer group related operations happen on this thread, either via the ops handler or consumer group serve method.

Broker threads

  • There is one thread per broker. Protocol requests to brokers go though that appropriate brokers thread/rkb_ops queue.
  • TODO: toppar queues. Other things of significance?
  • Types of broker:
    • CONFIGURED - A broker with address specified via the bootstrap.servers config property, or using rd_kafka_brokers_add.
    • LEARNED - A broker with address learned through broker metadata.
    • INTERNAL - Used for serving unassigned toppar's op queues. Created when rk is created.
    • LOGICAL - "Logical brokers act just like any broker handle, but will not have an initial address set. The address (or nodename is it is called internally) can be set from another broker handle by calling rd_kafka_broker_set_nodename(). This allows maintaining a logical group coordinator broker handle that can ambulate between real broker addresses."
  • served by rd_kafka_broker_thread_main / rd_kafka_broker_serve, via one of three methods: internal_serve, producer_serve or consumer_serve as appropriate.
  • internal_serve: TODO
  • producer_serrve: TODO
  • consumer_serve: calls out to:
    • consumer_toppars_serve: -> consumer_toppar_serve. Sole operation is call to toppar_fetch_decide which decides whether a toppar should be on the fetch list or not (TODO: this is a big function, analyze in detail).
    • broker_fetch_toppars: Builds and sends fetch requests. TODO: more detail.
    • broker_ops_io_serve:
      • There's a lot going on in this method in addition to the below! (TODO)
      • -> broker_ops_serve: -> broker_op_serve. Serves an op on rkb_ops. Op type:
        • XMIT_BUF: calls rd_kafka_broker_buf_enq2 -> rd_kafka_broker_buf_enq0 with the op's rkbuf which is put on rkb_outbufs.rkbq_bufs (rd_kafka_bufq_t).
        • Others: TODO.
      • -> transport_io_serve calls transport_io_event which calls rd_kafka_send which calls rd_kafka_bufq_deq.

rd_kafka_send is where rkb_outbufs is sent to the broker. called in rd_kafka_transport_io_event

all protocol requests to a broker will go t hrough that broker's thread. if you're on aonther thread, typially main, enqueue an op, response want response on this queue. and the queue.

TODO

Putting It All Together

  • As an example of how all this fits together, consider a call to rd_kafka_cgrp_join:
    • First, this function checks that rkcg_state and rkcg_join_state are as expected.
    • Then it sets rkcg_join_state to WAIT_JOIN before calling rd_kafka_JoinGroupRequest with rkcg_ops specified as the return queue, rd_kafka_cgrp_handle_JoinGroup as the operation callback, and rkcg_coord as the broker.
    • rd_kafka_JoinGroupRequest creates a rd_kafka_buf_t, then uses rd_kafka_broker_buf_enq_replyq to create an XMIT_BUF op, which it enqueus on group coordinator's rkb_ops.
    • Since this is a consumer, the op is served by rd_kafka_broker_consumer_serve.
    • Since the reply queue is rkcg_ops, which is forwarded to rk_ops, this will end up getting served in rd_kafka_thread_main by rd_kafka_q_serve by rko_serve (since no callback is specified) which was set to rkcg_ops->rkq_serve on creation of the op, which was set to rd_kafka_cgrp_op_serve on queue initialization.
    • rd_kafka_cgrp_handle_JoinGroup checks the join state is WAIT_JOIN. For simplicity assume this is not the leader.
    • Join state is now set to WAIT_SYNC.
    • Since rkcg_join_state is WAIT_JOIN,

Other Concepts

Internal Broker

  • TODO

Possible Improvements?

  • rd_kafka_op_handle_std seems a bit inside out?
  • rd_kafka_poll_cb does too much?
  • LOGICAL broker abstraction maybe not the best way to model this?
  • Why RD_KAFKA_CGRP_F_WAIT_UNASSIGN and RD_KAFKA_CGRP_STATE_WAIT_UNASSIGN?
  • rd_kafka_topic_partition_list_t functions are not consistent in which elements they operate on.