This "blog post" is just a collection of notes I made whilst trying to refine my understanding of librdkafka's architecture. I have ambitions to one day write a higher level overview of all this... but first things first.
The architecture of librdkafka is very analogous to the message based microservice architectures people are building on top of Kafka. Internally, librdkafka is like a mini distributed system with communication between different components running on different threads happening asynchonously via queues. A lot of the interaction of applications with librdkafka is also via queues.
My goal was to document all of librdkafka's queues and threads in a fair amount of detail. I'm now maybe half way towards this goal? Consider this a work in progress. I'm unlikely to get to finishing it in the near term because I don't have a pressing need, but what is here may be useful to some people doing some very specific Google searches, so making it public anyway.
rd_kafka_event_t, but for ABI compatibility, internals are not public - all access is via function calls.
rd_kafka_op_ts of type (
FETCH) are converted to
rd_kafka_message_get[rdkafka_msg.c] when consuming.
rko_type- the type of the op (internal). there are many different op types. There are also two flags (bit 29 'CB' and bit 30 'REPLY'), used on this field.
rko_evtype- For ops exposed to the application, the event type.
rko_flags- miscellaneous flags attached to the op.
rko_version- allows outdated ops to be filtered out when being served.
rko_prio- if set, enqued item is placed according to priority.
rko_replyq- queue to place a result after processing the op.
rko_op_cb- the callback associated with
rko_type | CB.
rko_u- op specific data (union).
rkq_fwdqis set (also a
rd_kafka_q_t), all operations on the queue will use that queue instead (recursively).
rkq_serveprovides a mechanism for using an op handling callback set on the original queue. On calling
rd_kafka_q_enqto enqueue an op,
rko->rko_servewill be set to the first (outermost) non-NULL
rkq_servein the forwarding chain.
rd_kafka_q_t. If an op cannot be handled then
rkq_fwdq, then this is served instead.
timeout_msfor (an) op(s), using
rko_serveif set else attempts to handle with the passed in
cb_typeparameter is passed to
rd_kafka_op_handle_stdand used by it. It's also passed to the
callback(and is used by some of them) [enum values].
rd_kafka_q_serve, but returns if/when the first unhandled op is encountered.
versionparameter which is used to ignore outdated ops (
rd_kafka_q_servedoesn't have this):
versionis 0, no filtering is applied.
versionis set, ignores the op if
rko_rktpis set and
rd_kafka_op_handleattempts to handle an op using this method first.
rko_type. Handling of ops is attempted as follows (in order):
cb_type == FORCE_RETURN- not handled.
rko is ctrl message- (calls
rd_kafka_op_offset_store) then op is handled.
cb_type == EVENT && rko_type & CB- handling is delegated to
rd_kafka_op_call, which calls
rko_type == RD_KAFKA_OP_RECV_BUF- calls
rd_kafka_buf_handle_opthen op is handled.
These queues are created in
rd_kafka_poll_cbis related to this queue and used to handle ops in it (see below). However, it's also used to handle ops on other queues:
rk_ops->rkq_serveis set to this.
rk_logq->rkq_serveis set to this.
rd_kafka_background_queue_servewhich is the callback specified when serving
rd_kafka_queue_poll[rdkafka.c] (public API). This serves the passed in queue with
callback == rd_kafka_poll_cband
cb_type == EVENT. Used for handling result of admin operations with user created queue.
rd_kafka_queue_poll_callback. Same as
cb_type == CALLBACK.
rk_rep, the application can:
rd_kafka_poll[rdkafka.c]. This serves with
callback == rd_kafka_poll_cband
cb_type == CALLBACK(that is important only to
rd_kafka_consumer_poll[rdkafka.c] having previously redirected
replyqis set to the number ops in this queue.
rd_kafka_outq_len(useful on shutdown).
rkq_serveis never set on
RD_KAFKA_DR_MODE_EVENTmode, where delivery reports are not delivered to the application via CB's. TODO: Trace through how this works.
rd_kafka_new. Ops are served in
rd_kafka_thread_main[rdkafka.c]. No callback is passed to
rk_ops->rkq_serveis set to
rd_kafka_poll_cbis essentially responsible for handling all ops directly placed on this queue (not forwarded here) -
rd_kafka_op_handle_stddoesn't really come into play.
rd_kafka_DescribeConfigsputs a corresponding op on this queue. These ops get handled by
rd_kafka_poll_cbby way of
rd_kafka_poll_cbhandles the op by calling
rko_op_cbwhich has been set to
rd_kafka_admin_workeron op creation. This implements some common worker state machine handling. This also results in additional ops being put on
InitProducerIdrequests (idempotent producer).
rktp->rktp_opsqueues are forward to this.
log_cbon arbitrary threads.
rd_kafka_set_log_queueand enabled by setting
rkq_serveis set to
rd_kafka_poll_cb. TODO: Why?
rd_kafka_queue_poll_callback. TODO: Check.
rd_kafka_background_queue_serve. There's no
background_event_cbconfig has been set.
background_event_cbif the op is eventable,
rk_ops, which is served in
rkcg_ops->rkq_serveis set to
rd_kafka_cgrp_op_serve, so when
rk_opsis served, this is used instead of
rk_kafka_poll_cbwhich is used for ops directly sent to
rkq_serveis set to
rkcg_opsbut when coordinator not available.
rd_kafka_queue_get_consumerto get the queue. TODO: I'm not exactly sure what it can/should do with it then.
rk_repwill be forwarded here with
rd_kafka_poll_set_consumerwhen consuming, so two poll calls are not necessary.
rkq_serveis not set.
rktp->rktp_fetchqis forwarded to this queue in
rd_kafka_cgrp_partitions_fetch_start). TODO: consider old non-consumer group API, where I believe this is not the case.
rkcg_qwhen fetches are occuring, which in turn is polled by the application for consumed messages and other events.
CONSUMER_ERRevents are put on this queue.
fetchq_size. Note: size is cached, and updated incrementally.
rd_kafka_consume, takes directly from
rktp_fetchqfor the specified toppar with
rd_kafka_message_get(to turn the op into a
rd_kafka_consume_callbacktakes directly from
rktp_fetchqfor the specifed toppar with
rd_kafka_consume_batchtake directly from
rd_kafka_q_serve_rkmessages[rdkafka_queue.c]. makes use of
rk_opssoon after construction in
rd_kafka_toppar_new0, meaning it's served by
rkq_serveis set to
rd_kafka_toppar_op_serve. Ops handled by this:
OFFSET_COMMIT | RD_KAFKA_OP_REPLY,
OFFSET_FETCH | RD_KAFKA_OP_REPLY.
rd_kafka_op_handle_OffsetFetchis called with the op, which calls
rd_kafka_thread_main. Three sub tasks:
rk->rk_timers) are set up all over the codebase using
rd_kafka_timer_start[_oneshot], passing an interval and callback. They're all served from here.
rd_interval_t, which just keeps track of whether an interval has passed or not (and is not registered with a serving loop).
callback- relies on
rkq_serve, which may be different depending on origin queue / forwarding). In particular,
rkcg_opsis forwarded here, and has callback
_UPstate has a sub state machine with states enumerated as:
CONFIGURED- A broker with address specified via the
bootstrap.serversconfig property, or using
LEARNED- A broker with address learned through broker metadata.
INTERNAL- Used for serving unassigned toppar's op queues. Created when rk is created.
LOGICAL- "Logical brokers act just like any broker handle, but will not have an initial address set. The address (or nodename is it is called internally) can be set from another broker handle by calling rd_kafka_broker_set_nodename(). This allows maintaining a logical group coordinator broker handle that can ambulate between real broker addresses."
rd_kafka_broker_serve, via one of three methods:
consumer_serve: calls out to:
consumer_toppar_serve. Sole operation is call to
toppar_fetch_decidewhich decides whether a toppar should be on the fetch list or not (TODO: this is a big function, analyze in detail).
broker_fetch_toppars: Builds and sends fetch requests. TODO: more detail.
broker_op_serve. Serves an op on
rkb_ops. Op type:
rd_kafka_broker_buf_enq0with the op's
rkbufwhich is put on
rd_kafka_send is where
rkb_outbufs is sent to the broker. called in
all protocol requests to a broker will go t hrough that broker's thread. if you're on aonther thread, typially main, enqueue an op, response want response on this queue. and the queue.
rkcg_join_stateare as expected.
rkcg_opsspecified as the return queue,
rd_kafka_cgrp_handle_JoinGroupas the operation callback, and
rkcg_coordas the broker.
rd_kafka_buf_t, then uses
rd_kafka_broker_buf_enq_replyqto create an
XMIT_BUFop, which it enqueus on group coordinator's
rkcg_ops, which is forwarded to
rk_ops, this will end up getting served in
rko_serve(since no callback is specified) which was set to
rkcg_ops->rkq_serveon creation of the op, which was set to
rd_kafka_cgrp_op_serveon queue initialization.
rd_kafka_cgrp_handle_JoinGroupchecks the join state is
WAIT_JOIN. For simplicity assume this is not the leader.
rd_kafka_op_handle_stdseems a bit inside out?
rd_kafka_poll_cbdoes too much?
LOGICALbroker abstraction maybe not the best way to model this?
rd_kafka_topic_partition_list_tfunctions are not consistent in which elements they operate on.