专门用来存这么的parition,Librdkafka源码分析-Content Table

  • topic-partition是kafka分布式的精华,
    也是对准kafka进行生产或消费的细小单元;
  • 在这篇里我们开端介绍有关的数据结构
  • 剧情如下:
    1. rd_kafka_topic_partition_t
    2. rd_kafka_topic_partition_list_t
    3. rd_kafka_toppar_s
  • 上一节我们讲了librdkakfa对topic-partition的封装,
    任何多少个partition都无法不要属于一下topic;
  • 大家那节就来分析一上librdkafka对topic的包裹


rd_kafka_topic_partition_t
  • 所在文书: src/rdkafka.h
  • 概念了二个partition的连锁数据结构, 不难定义, 占位符
  • 定义:

typedef struct rd_kafka_topic_partition_s {
        char        *topic;             /**< Topic name */
        int32_t      partition;         /**< Partition */
    int64_t      offset;            /**< Offset */
        void        *metadata;          /**< Metadata */ // 主要是leader, replicas, isr等信息
        size_t       metadata_size;     /**< Metadata size */
        void        *opaque;            /**< Application opaque */
        rd_kafka_resp_err_t err;        /**< Error code, depending on use. */
        void       *_private;           /**< INTERNAL USE ONLY,
                                         *   INITIALIZE TO ZERO, DO NOT TOUCH */
} rd_kafka_topic_partition_t;
rd_kafka_itopic_s
  • 所在文件: src/rdkafka_topic.h
  • 此处还有贰个档次rd_kafka_topic_t,定义:typedef struct rd_kafka_topic_s rd_kafka_topic_t;,那是个空定义没有具体,
    其实便是rd_kafka_itopic_s,
    这么些类型首尽管面向librdkafka的使用者,sdk里称作app topic,
    它有和好的引用计数. 在librdkafka内部使用rd_kafka_itopic,
    它也有谈得来的引用计数, 有点罗嗦啊~
  • 定义:

struct rd_kafka_itopic_s {
        // 定义成tailq的元素
    TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;

        //  引用计数
    rd_refcnt_t        rkt_refcnt;

    rwlock_t           rkt_lock;

        // 所属topic的名字
    rd_kafkap_str_t   *rkt_topic;

        // 表示一个未assigned的partition
    shptr_rd_kafka_toppar_t  *rkt_ua;  /* unassigned partition */

        // 拥有的partition列表
    shptr_rd_kafka_toppar_t **rkt_p;
    int32_t            rkt_partition_cnt;

        // 期望操作的partition, 但还没有从broker获取到其信息的partition列表
        rd_list_t          rkt_desp;              /* Desired partitions
                                                   * that are not yet seen
                                                   * in the cluster. */
        // 最近一次更新metadata的时间
    rd_ts_t            rkt_ts_metadata; /* Timestamp of last metadata
                         * update for this topic. */

        mtx_t              rkt_app_lock;    /* Protects rkt_app_* */
, 
       // 在application层一个rd_kafka_itopic_s对外表现为一个 rd_kafka_topic_t类型的对象, 且有自己的引用计数
        rd_kafka_topic_t *rkt_app_rkt;      /* A shared topic pointer
                                             * to be used for callbacks
                                             * to the application. */
    int               rkt_app_refcnt;   /* Number of active rkt's new()ed
                         * by application. */
        //  topic的三种状态:未知, 已存在, 不存在
    enum {
        RD_KAFKA_TOPIC_S_UNKNOWN,   /* No cluster information yet */
        RD_KAFKA_TOPIC_S_EXISTS,    /* Topic exists in cluster */
        RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
    } rkt_state;

        int               rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL   0x1 /* Leader lost/unavailable
                                               * for at least one partition. */
        // 所属的rd_kafka_t
    rd_kafka_t       *rkt_rk;

        shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */

    rd_kafka_topic_conf_t rkt_conf;
};
  • 开创一个rd_kafka_itopic_s对象rd_kafka_topic_new0,
    那是三个里头调用函数

shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
                                              const char *topic,
                                              rd_kafka_topic_conf_t *conf,
                                              int *existing,
                                              int do_lock) {
    rd_kafka_itopic_t *rkt;
        shptr_rd_kafka_itopic_t *s_rkt;
        const struct rd_kafka_metadata_cache_entry *rkmce;

       // topic名字check , 长度不能超512
    if (!topic || strlen(topic) > 512) {
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
                    EINVAL);
        return NULL;
    }

    if (do_lock)
                rd_kafka_wrlock(rk);
        // 所有创建的rd_kafka_itopic对象都会加入到对应的topic的rk->rk_topics中, 先从中查找, 如果找到就不再创建
    if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
                if (do_lock)
                        rd_kafka_wrunlock(rk);
        if (conf)
            rd_kafka_topic_conf_destroy(conf);
                if (existing)
                        *existing = 1;
        return s_rkt;
        }

        if (existing)
                *existing = 0;

        // 分配对应的内存, 设置各属性
    rkt = rd_calloc(1, sizeof(*rkt));

    rkt->rkt_topic     = rd_kafkap_str_new(topic, -1);
    rkt->rkt_rk        = rk;

    if (!conf) {
                if (rk->rk_conf.topic_conf)
                        conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
                else
                        conf = rd_kafka_topic_conf_new();
        }
    rkt->rkt_conf = *conf;
    rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
                        * since we dont want to rd_free internal members,
                        * just the placeholder. The internal members
                        * were copied on the line above. */

    /* Default partitioner: consistent_random */
    if (!rkt->rkt_conf.partitioner)
        rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;

    if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
        rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;

        rd_list_init(&rkt->rkt_desp, 16, NULL);
        rd_refcnt_init(&rkt->rkt_refcnt, 0);

        s_rkt = rd_kafka_topic_keep(rkt);

    rwlock_init(&rkt->rkt_lock);
        mtx_init(&rkt->rkt_app_lock, mtx_plain);

    /* Create unassigned partition */
    rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);

        // 加入到对应的rk_kafka_t中的topic列表
    TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
    rk->rk_topic_cnt++;

        /* Populate from metadata cache. */
        // 加入或更新到metadata cache
        if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
                if (existing)
                        *existing = 1;

                rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
                                               rkmce->rkmce_ts_insert);
        }

        if (do_lock)
                rd_kafka_wrunlock(rk);

    return s_rkt;
  • 创建rd_kafka_topic_t对象, 对外的接口rd_kafka_topic_new

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
                                      rd_kafka_topic_conf_t *conf) {
        shptr_rd_kafka_itopic_t *s_rkt;
        rd_kafka_itopic_t *rkt;
        rd_kafka_topic_t *app_rkt;
        int existing;

       // 创建一个`shptr_rd_kafka_itopic_t`对象
        s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
        if (!s_rkt)
                return NULL;

        // 指针转换, 从`shptr_rd_kafka_itopic_t`到`rd_kafka_itopic_t`, 引用计数不变
        rkt = rd_kafka_topic_s2i(s_rkt);

        /* Save a shared pointer to be used in callbacks. */
        // 引数计数加1, 指针转换成一个`rd_kafka_topic_t`
        // app相对应的引用计数也加1
    app_rkt = rd_kafka_topic_keep_app(rkt);

        /* Query for the topic leader (async) */
        if (!existing)
               // 发metadata request, 获取leader等相关信息
                rd_kafka_topic_leader_query(rk, rkt);

        /* Drop our reference since there is already/now a rkt_app_rkt */
        rd_kafka_topic_destroy0(s_rkt);

        return app_rkt;
}
  • 取稳妥前rd_kafka_t目的拥有的有所topic的名字,保存在3个rd_list

void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
        rd_kafka_itopic_t *rkt;

        rd_kafka_rdlock(rk);
        rd_list_grow(topics, rk->rk_topic_cnt);
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
                rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
        rd_kafka_rdunlock(rk);
}
  • 认清parition是或不是是有效的,正是判定其leader是或不是行得通

  int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
                    int32_t partition) {
    int avail;
    shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
        rd_kafka_broker_t *rkb;

    s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
                                     partition, 0/*no ua-on-miss*/);
    if (unlikely(!s_rktp))
        return 0;

        rktp = rd_kafka_toppar_s2i(s_rktp);
        rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
        avail = rkb ? 1 : 0;
        if (rkb)
                rd_kafka_broker_destroy(rkb);
    rd_kafka_toppar_destroy(s_rktp);
    return avail;
}
  • 环视全数topic的patitions:
    1. 筛出 kafka message过期的, 回调application层
    2. 找出供给刷新metadata的, 发送metadata request

int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
    rd_kafka_itopic_t *rkt;
    rd_kafka_toppar_t *rktp;
        shptr_rd_kafka_toppar_t *s_rktp;
    int totcnt = 0;
        rd_list_t query_topics;

        rd_list_init(&query_topics, 0, rd_free);

    rd_kafka_rdlock(rk);
    TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
        int p;
                int cnt = 0, tpcnt = 0;
                rd_kafka_msgq_t timedout;
                int query_this = 0;

                rd_kafka_msgq_init(&timedout);

        rd_kafka_topic_wrlock(rkt);

                /* Check if metadata information has timed out. */
               // metadata cache中没有缓存,需要query metadata
                if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
                    !rd_kafka_metadata_cache_topic_get(
                            rk, rkt->rkt_topic->str, 1/*only valid*/)) {
                        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);

                        query_this = 1;
                }

                /* Just need a read-lock from here on. */
                rd_kafka_topic_wrunlock(rkt);
                rd_kafka_topic_rdlock(rkt);

                if (rkt->rkt_partition_cnt == 0) {
                        query_this = 1;
                }

        for (p = RD_KAFKA_PARTITION_UA ;
             p < rkt->rkt_partition_cnt ; p++) {
            int did_tmout = 0;

            if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
                continue;

                        rktp = rd_kafka_toppar_s2i(s_rktp);
            rd_kafka_toppar_lock(rktp);

                        /* Check that partition has a leader that is up,
                         * else add topic to query list. */
                       // partition leader无效时, 要request metadata
                        if (p != RD_KAFKA_PARTITION_UA &&
                            (!rktp->rktp_leader ||
                             rktp->rktp_leader->rkb_source ==
                             RD_KAFKA_INTERNAL ||
                             rd_kafka_broker_get_state(rktp->rktp_leader) <
                             RD_KAFKA_BROKER_STATE_UP)) {
                                query_this = 1;
                        }

            /* Scan toppar's message queues for timeouts */
            if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
                           &timedout, now) > 0)
                did_tmout = 1;

            tpcnt += did_tmout;

            rd_kafka_toppar_unlock(rktp);
            rd_kafka_toppar_destroy(s_rktp);
        }

                rd_kafka_topic_rdunlock(rkt);

                if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
                        totcnt += cnt;
                        // kafka mesage过期, 则需要回调到application层
                        rd_kafka_dr_msgq(rkt, &timedout,
                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
                }

                /* Need to re-query this topic's leader. */
                if (query_this &&
                    !rd_list_find(&query_topics, rkt->rkt_topic->str,
                                  (void *)strcmp))
                        rd_list_add(&query_topics,
                                    rd_strdup(rkt->rkt_topic->str));

        }
        rd_kafka_rdunlock(rk);

        if (!rd_list_empty(&query_topics))
                // 发送 metadata request
                rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
                                                 1/*force even if cached
                                                    * info exists*/,
                                                 "refresh unavailable topics");
        rd_list_destroy(&query_topics);

        return totcnt;
}
  • 履新topic的partition个数, partition个数或然增添,
    也大概回落rd_kafka_topic_partition_cnt_update, 简单讲:

    1. 新增的partition, 创建;
    2. 老的partition, 删除;

static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
                        int32_t partition_cnt) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t **rktps;
    shptr_rd_kafka_toppar_t *rktp_ua;
        shptr_rd_kafka_toppar_t *s_rktp;
    rd_kafka_toppar_t *rktp;
    rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
    int32_t i;

        更新前后partition数量相同的话, 不作任何处理
    if (likely(rkt->rkt_partition_cnt == partition_cnt))
        return 0; /* No change in partition count */

    /* Create and assign new partition list */
       // 创建新的partition list, 分配内存
    if (partition_cnt > 0)
        rktps = rd_calloc(partition_cnt, sizeof(*rktps));
    else
        rktps = NULL;

        // 如果新个数大于老个数
    for (i = 0 ; i < partition_cnt ; i++) {
                // 多出来的都是新扩容的partition
        if (i >= rkt->rkt_partition_cnt) {
            /* New partition. Check if its in the list of
             * desired partitions first. */
                        // 检查是否在desired partition 列表中
                        s_rktp = rd_kafka_toppar_desired_get(rkt, i);

                        rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
                        if (rktp) {
                                // 在desired partition 列表中,则移除它
                rd_kafka_toppar_lock(rktp);
                                rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;

                                /* Remove from desp list since the
                                 * partition is now known. */
                                rd_kafka_toppar_desired_unlink(rktp);
                                rd_kafka_toppar_unlock(rktp);
            } else
                s_rktp = rd_kafka_toppar_new(rkt, i);
                        // 赋值rktps[i]
            rktps[i] = s_rktp;
        } else {
                        // 如果是已经存在的partition, 放到rktps[i], 并且作引用计数的增减
            /* Existing partition, grab our own reference. */
            rktps[i] = rd_kafka_toppar_keep(
                rd_kafka_toppar_s2i(rkt->rkt_p[i]));
            /* Loose previous ref */
            rd_kafka_toppar_destroy(rkt->rkt_p[i]);
        }
    }

    rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);

        /* Propagate notexist errors for desired partitions */
        // 扫描desired partition 列表中, 还余下的都是无主的, 集群中不存在的partition, 回调error
        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
                rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
                                          RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
        }

    /* Remove excessive partitions */
        // 处理更新后的partition个数小于更新前的情况, 需要删除一部分partition
    for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
        s_rktp = rkt->rkt_p[i];
                rktp = rd_kafka_toppar_s2i(s_rktp);
        rd_kafka_toppar_lock(rktp);

        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
                                     "Topic %s [%"PRId32"] is desired "
                                     "but no longer known: "
                                     "moving back on desired list",
                                     rkt->rkt_topic->str, rktp->rktp_partition);
                        // 是DESIRED状态的话, 再放回到desired列表
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
                        rd_kafka_toppar_desired_link(rktp);

                        if (!rd_kafka_terminating(rkt->rkt_rk))
                                rd_kafka_toppar_enq_error(
                                        rktp,
                                        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
                        // 解除和broker的联系, 实际上是关联到内部的UA broker
            rd_kafka_toppar_broker_delegate(rktp, NULL, 0);

        } else {
            /* Tell handling broker to let go of the toppar */
            rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
            rd_kafka_toppar_broker_leave_for_remove(rktp);
        }

        rd_kafka_toppar_unlock(rktp);

        rd_kafka_toppar_destroy(s_rktp);
    }


    if (rkt->rkt_p)
        rd_free(rkt->rkt_p);

    rkt->rkt_p = rktps;
    rkt->rkt_partition_cnt = partition_cnt;

    return 1;
}
  • 将在UA partition上待发送的kafka
    message重新分配到有效的patition上rd_kafka_topic_assign_uas:

static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
                                       rd_kafka_resp_err_t err) {
    rd_kafka_t *rk = rkt->rkt_rk;
    shptr_rd_kafka_toppar_t *s_rktp_ua;
        rd_kafka_toppar_t *rktp_ua;
    rd_kafka_msg_t *rkm, *tmp;
    rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
    rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
    int cnt;

    if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
        return;

        // 没有UA partition,就直接返回了
    s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
    if (unlikely(!s_rktp_ua)) {
        return;
    }

        rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);

        // 将ua partition上的msg移动到临时队列上
    rd_kafka_toppar_lock(rktp_ua);
    rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
    cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
    rd_kafka_toppar_unlock(rktp_ua);

    TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
        /* Fast-path for failing messages with forced partition */
               // 无效的msg放到failed 队列
        if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
            rkm->rkm_partition >= rkt->rkt_partition_cnt &&
            rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
            rd_kafka_msgq_enq(&failed, rkm);
            continue;
        }

               // 重新路由kafka message到相应的partition, 失败则放入failed 队列
        if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
            /* Desired partition not available */
            rd_kafka_msgq_enq(&failed, rkm);
        }
    }

        // 失败的msg, 都回调给application层
    if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
        /* Fail the messages */
        rd_kafka_dr_msgq(rkt, &failed,
                 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
                 err :
                 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
    }

    rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
}
  • 关于metadata相关的操作, 我们介绍metadata时再来分析

rd_kafka_topic_partition_list_t
  • 所在文件: src/rdkafka.h
  • 用来囤积 rd_kafka_topic_partition_t的可动态扩大体量的数组
  • 定义:

typedef struct rd_kafka_topic_partition_list_s {
        int cnt;               /**< Current number of elements */ 当前数组中放入的element数量
        int size;              /**< Current allocated size */ // 当前数组的容量
        rd_kafka_topic_partition_t *elems; /**< Element array[] */ 动态数组指针
} rd_kafka_topic_partition_list_t;
  • 扩大容积操作 rd_kafka_topic_partition_list_grow:

rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
                                    int add_size) {
        if (add_size < rktparlist->size)
                add_size = RD_MAX(rktparlist->size, 32);

        rktparlist->size += add_size;
        // 使用realloc重新分配内存
        rktparlist->elems = rd_realloc(rktparlist->elems,
                                       sizeof(*rktparlist->elems) *
                                       rktparlist->size);

}
  • 创设操作 rd_kafka_topic_partition_list_new:

rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
        rd_kafka_topic_partition_list_t *rktparlist;
        rktparlist = rd_calloc(1, sizeof(*rktparlist));
        rktparlist->size = size;
        rktparlist->cnt = 0;
        if (size > 0)
                rd_kafka_topic_partition_list_grow(rktparlist, size);
        return rktparlist;
}
  • 查找操作 rd_kafka_topic_partition_list_find:
    topic和partition都等于才终于卓殊

rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
                     const char *topic, int32_t partition) {
    int i = rd_kafka_topic_partition_list_find0(rktparlist,
                            topic, partition);
    if (i == -1)
        return NULL;
    else
        return &rktparlist->elems[i];
}
  • 按索引删除 rd_kafka_topic_partition_list_del_by_idx

rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
                      int idx) {
    if (unlikely(idx < 0 || idx >= rktparlist->cnt))
        return 0;

        // element数量减1
    rktparlist->cnt--;
        // destory 删除的元素 
    rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);

        // 作内存的移动, 但不回收
    memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
        (rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));

    return 1;
}
  • 排序rd_kafka_topic_partition_list_sort_by_topic
    topic名字不相同按topic名字排,topic名字一样按partition排

void rd_kafka_topic_partition_list_sort_by_topic (
        rd_kafka_topic_partition_list_t *rktparlist) {
        rd_kafka_topic_partition_list_sort(rktparlist,
                                           rd_kafka_topic_partition_cmp, NULL);
}

Librdkafka源码分析-Content Table

rd_kafka_toppar_s
  • 所在文书: src/rdkafka_partition.h
  • 净重数据结构,topic, partition, leader, 生产, 消费,
    各样定时timer都在中间
  • 概念, 那些组织体巨庞大

struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;  /* rd_kafka_t link */
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
        CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_fetchlink; /* rkb_fetch_toppars */
    TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
        TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
        rd_kafka_itopic_t       *rktp_rkt;
        shptr_rd_kafka_itopic_t *rktp_s_rkt;  /* shared pointer for rktp_rkt */
    int32_t            rktp_partition;
        //LOCK: toppar_lock() + topic_wrlock()
        //LOCK: .. in partition_available()
        int32_t            rktp_leader_id;   /**< Current leader broker id.
                                              *   This is updated directly
                                              *   from metadata. */
    rd_kafka_broker_t *rktp_leader;      /**< Current leader broker
                                              *   This updated asynchronously
                                              *   by issuing JOIN op to
                                              *   broker thread, so be careful
                                              *   in using this since it
                                              *   may lag. */
        rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
                                              *   async migration op. */
    rd_refcnt_t        rktp_refcnt;
    mtx_t              rktp_lock;
 rd_atomic32_t      rktp_version;         /* Latest op version.
                                                  * Authoritative (app thread)*/
    int32_t            rktp_op_version;      /* Op version of curr command
                          * state from.
                          * (broker thread) */
        int32_t            rktp_fetch_version;   /* Op version of curr fetch.
                                                    (broker thread) */

    enum {
        RD_KAFKA_TOPPAR_FETCH_NONE = 0,
                RD_KAFKA_TOPPAR_FETCH_STOPPING,
                RD_KAFKA_TOPPAR_FETCH_STOPPED,
        RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
        RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
        RD_KAFKA_TOPPAR_FETCH_ACTIVE,
    } rktp_fetch_state;    
int32_t            rktp_fetch_msg_max_bytes; /* Max number of bytes to
                                                      * fetch.
                                                      * Locality: broker thread
                                                      */

        rd_ts_t            rktp_ts_fetch_backoff; /* Back off fetcher for
                                                   * this partition until this
                                                   * absolute timestamp
                                                   * expires. */

    int64_t            rktp_query_offset;    /* Offset to query broker for*/
    int64_t            rktp_next_offset;     /* Next offset to start
                                                  * fetching from.
                                                  * Locality: toppar thread */
    int64_t            rktp_last_next_offset; /* Last next_offset handled
                           * by fetch_decide().
                           * Locality: broker thread */
    int64_t            rktp_app_offset;      /* Last offset delivered to
                          * application + 1 */
    int64_t            rktp_stored_offset;   /* Last stored offset, but
                          * maybe not committed yet. */
        int64_t            rktp_committing_offset; /* Offset currently being
                                                    * committed */
    int64_t            rktp_committed_offset; /* Last committed offset */
    rd_ts_t            rktp_ts_committed_offset; /* Timestamp of last
                                                      * commit */

        struct offset_stats rktp_offsets; /* Current offsets.
                                           * Locality: broker thread*/
        struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
                                               * Updated periodically
                                               * by broker thread.
                                               * Locks: toppar_lock */

    int64_t rktp_hi_offset;              /* Current high offset.
                          * Locks: toppar_lock */
        int64_t rktp_lo_offset;         
 rd_ts_t            rktp_ts_offset_lag;

    char              *rktp_offset_path;     /* Path to offset file */
    FILE              *rktp_offset_fp;       /* Offset file pointer */
        rd_kafka_cgrp_t   *rktp_cgrp;            /* Belongs to this cgrp */

        int                rktp_assigned;   /* Partition in cgrp assignment */

        rd_kafka_replyq_t  rktp_replyq; /* Current replyq+version
                     * for propagating
                     * major operations, e.g.,
                     * FETCH_STOP. */
    int                rktp_flags;

        shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
                                                   * rkt_desp list */
        shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
                                                   * rkcg_toppars list */
        shptr_rd_kafka_toppar_t *rktp_s_for_rkb;  /* Shared pointer for
                                                   * rkb_toppars list */

    /*
     * Timers
     */
    rd_kafka_timer_t rktp_offset_query_tmr;  /* Offset query timer */
    rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
    rd_kafka_timer_t rktp_offset_sync_tmr;   /* Offset file sync timer */
        rd_kafka_timer_t rktp_consumer_lag_tmr;  /* Consumer lag monitoring
                          * timer */

        int rktp_wait_consumer_lag_resp;         /* Waiting for consumer lag
                                                  * response. */

    struct {
        rd_atomic64_t tx_msgs;
        rd_atomic64_t tx_bytes;
                rd_atomic64_t msgs;
                rd_atomic64_t rx_ver_drops;
    } rktp_c;
}
  • 创立3个 rd_kafka_toppar_t对象 rd_kafka_toppar_new0:

shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
                           int32_t partition,
                           const char *func, int line) {
    rd_kafka_toppar_t *rktp;

       // 分配内存
    rktp = rd_calloc(1, sizeof(*rktp));

        // 各项赋值
    rktp->rktp_partition = partition;

        // 属于哪个topic
    rktp->rktp_rkt = rkt;

        rktp->rktp_leader_id = -1;
    rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
        rktp->rktp_fetch_msg_max_bytes
            = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
    rktp->rktp_offset_fp = NULL;
        rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
        rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
        rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
    rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
    rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
    rd_kafka_msgq_init(&rktp->rktp_msgq);
        rktp->rktp_msgq_wakeup_fd = -1;
    rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
    mtx_init(&rktp->rktp_lock, mtx_plain);

        rd_refcnt_init(&rktp->rktp_refcnt, 0);
    rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
        rktp->rktp_ops    = rd_kafka_q_new(rkt->rkt_rk);
        rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
        rktp->rktp_ops->rkq_opaque = rktp;
        rd_atomic32_init(&rktp->rktp_version, 1);
    rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);

        // 开始一个timer, 来定时统计消息的lag情况, 目前看是一个`rd_kafka_toppar_t`对象就一个timer, 太多了, 可以用时间轮来作所有partiton的timer
        if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
            rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
            rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
                int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
                if (intvl < 10 * 1000 /* 10s */)
                        intvl = 10 * 1000;
        rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
                     &rktp->rktp_consumer_lag_tmr,
                                     intvl * 1000ll,
                     rd_kafka_toppar_consumer_lag_tmr_cb,
                     rktp);
        }

        rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);

        // 设置其fwd op queue到rd_kakfa_t中的rd_ops, 这样这个rd_kafka_toppar_t对象用到的ops_queue就是rd_kafka_t的了
    rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
    rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
             rkt->rkt_topic->str, rktp->rktp_partition, rktp,
             func, line);

    return rd_kafka_toppar_keep_src(func, line, rktp);
}
  • 销毁贰个rd_kafka_toppar_t对象rd_kafka_toppar_destroy_final

void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
        // 停掉相应的timer, 清空ops queue
        rd_kafka_toppar_remove(rktp);

        // 将msgq中的kafka message回调给app层后清空
    rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
             RD_KAFKA_RESP_ERR__DESTROY);
    rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
        rd_kafka_q_destroy_owner(rktp->rktp_ops);

    rd_kafka_replyq_destroy(&rktp->rktp_replyq);

    rd_kafka_topic_destroy0(rktp->rktp_s_rkt);

    mtx_destroy(&rktp->rktp_lock);

        rd_refcnt_destroy(&rktp->rktp_refcnt);

    rd_free(rktp);
}
  • 从一个rd_kafka_itopic_t(那一个我们前边会有尤其篇章来介绍,
    那里只必要通晓它象征topic即可,
    里面包含属于它的parition列表)获取钦命parition:

shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
                                               const rd_kafka_itopic_t *rkt,
                                               int32_t partition,
                                               int ua_on_miss) {
        shptr_rd_kafka_toppar_t *s_rktp;

        // 数组索引下标来获取 partition
    if (partition >= 0 && partition < rkt->rkt_partition_cnt)
        s_rktp = rkt->rkt_p[partition];
    else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
        s_rktp = rkt->rkt_ua;
    else
        return NULL;

    if (s_rktp)
               // 引用计数加1 
                return rd_kafka_toppar_keep_src(func,line,
                                                rd_kafka_toppar_s2i(s_rktp));

    return NULL;
}
  • 按topic名字和partition来取得1个rd_kafka_toppar_t对象,
    没有找到topic, 就先创建那一个 rd_kafka_itopic_t对象

shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
                                               const char *topic,
                                               int32_t partition,
                                               int ua_on_miss,
                                               int create_on_miss) {
    shptr_rd_kafka_itopic_t *s_rkt;
        rd_kafka_itopic_t *rkt;
        shptr_rd_kafka_toppar_t *s_rktp;

        rd_kafka_wrlock(rk);

        /* Find or create topic */
        // 所有的 rd_kafka_itopic_t对象都存在rd_kafka_t的rkt_topic的tailq队列里, 这里先查找
    if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
                if (!create_on_miss) {
                        rd_kafka_wrunlock(rk);
                        return NULL;
                }
                // 没找到就先创建  rd_kafka_itopic_t对象
                s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
                        NULL, 0/*no-lock*/);
                if (!s_rkt) {
                        rd_kafka_wrunlock(rk);
                        rd_kafka_log(rk, LOG_ERR, "TOPIC",
                                     "Failed to create local topic \"%s\": %s",
                                     topic, rd_strerror(errno));
                        return NULL;
                }
        }

        rd_kafka_wrunlock(rk);

        rkt = rd_kafka_topic_s2i(s_rkt);

    rd_kafka_topic_wrlock(rkt);
    s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
    rd_kafka_topic_wrunlock(rkt);

        rd_kafka_topic_destroy0(s_rkt);

    return s_rktp;
}
  • desired partition: desired partition状态的parititon,
    源码中的解释如下:

The desired partition list is the list of partitions that are
desired
(e.g., by the consumer) but not yet seen on a broker.
As soon as the partition is seen on a broker the toppar is moved
from
the desired list and onto the normal rkt_p array.
When the partition on the broker goes away a desired partition is
put
back on the desired list

不难说就是索要某一个partition,
可是这么些parition的求实音讯还没从broker拿掉,那样的parition正是desired
parition, 在rd_kafka_itopic_t中有叁个rkt_desp的list,
专门用来存这么的parition, 针对其有如下几个操作,都相比较不难:

rd_kafka_toppar_desired_get
rd_kafka_toppar_desired_link
rd_kafka_toppar_desired_unlink
rd_kafka_toppar_desired_add0
rd_kafka_toppar_desired_add
rd_kafka_toppar_desired_del
  • partition在broker间迁移rd_kafka_toppar_broker_migrate:

static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
                                            rd_kafka_broker_t *old_rkb,
                                            rd_kafka_broker_t *new_rkb) {
        rd_kafka_op_t *rko;
        rd_kafka_broker_t *dest_rkb;
        int had_next_leader = rktp->rktp_next_leader ? 1 : 0;

        /* Update next leader */
        if (new_rkb)
                rd_kafka_broker_keep(new_rkb);
        if (rktp->rktp_next_leader)
                rd_kafka_broker_destroy(rktp->rktp_next_leader);
        rktp->rktp_next_leader = new_rkb;

        // 在迁移没完成时有可能再次迁移了, 这个时候是不是需要加锁? 
        if (had_next_leader)
                return;

    if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) {
        rd_kafka_toppar_set_fetch_state(
            rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
        rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
                     &rktp->rktp_offset_query_tmr,
                     500*1000,
                     rd_kafka_offset_query_tmr_cb,
                     rktp);
    }

        //  迁移前broker放到LEAVE op
        if (old_rkb) {
                rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
                dest_rkb = old_rkb;
        } else {
                /* No existing broker, send join op directly to new leader. */
                rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
                dest_rkb = new_rkb;
        }

        rko->rko_rktp = rd_kafka_toppar_keep(rktp);

        rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}
  • broker的delegate操作:

void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
                      rd_kafka_broker_t *rkb,
                      int for_removal) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        int internal_fallback = 0;

        /* Delegate toppars with no leader to the
         * internal broker for bookkeeping. */
        // 如果迁移到的broker是NULL, 就获取一个internal broker -> rkb
        if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
                rkb = rd_kafka_broker_internal(rk);
                internal_fallback = 1;
        }

    if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
                 "%.*s [%"PRId32"]: not updating broker: "
                             "already on correct broker %s",
                 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                 rktp->rktp_partition,
                             rkb ? rd_kafka_broker_name(rkb) : "(none)");

                if (internal_fallback)
                        rd_kafka_broker_destroy(rkb);
        return;
        }

        // 实际的迁移操作
        if (rktp->rktp_leader || rkb)
                rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);

        if (internal_fallback)
                rd_kafka_broker_destroy(rkb);
}
  • 提交offstet到broker rd_kafka_toppar_offset_commit

void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
                    const char *metadata) {
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;

        // 构造 一个rd_kafka_topic_partition_list, 把当前的topic添加进去, 包括要提交的offset
        offsets = rd_kafka_topic_partition_list_new(1);
        rktpar = rd_kafka_topic_partition_list_add(
                offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
        rktpar->offset = offset;
        if (metadata) {
                rktpar->metadata = rd_strdup(metadata);
                rktpar->metadata_size = strlen(metadata);
        }

        // rd_kafka_toppar_t对象更新rktp_committing_offset,表示正在提交的offset
        rktp->rktp_committing_offset = offset;

       // 异步提交offset, 这个操作在之后介绍kafka consumer是会详细分析
        rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);

        rd_kafka_topic_partition_list_destroy(offsets);
}
  • 安装下1回拉取数据时开端的offset地点,即rd_kafka_toppar_trktp_next_offset

void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
                                         int64_t Offset) {
        // 如果Offset是BEGINNING,END, 发起一个rd_kafka_toppar_offset_request操作,从broker获取offset
        // 如果Offset是RD_KAFKA_OFFSET_INVALID, 需要enqueue一个error op, 设置fetch状态为RD_KAFKA_TOPPAR_FETCH_NONE
        if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
                /* Offset storage returned logical offset (e.g. "end"),
                 * look it up. */
                rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
                                      "update");
                return;
        }

        /* Adjust by TAIL count if, if wanted */
        // 获取从tail开始往前推cnt个offset的位置
        if (rktp->rktp_query_offset <=
            RD_KAFKA_OFFSET_TAIL_BASE) {
                int64_t orig_Offset = Offset;
                int64_t tail_cnt =
                        llabs(rktp->rktp_query_offset -
                              RD_KAFKA_OFFSET_TAIL_BASE);

                if (tail_cnt > Offset)
                        Offset = 0;
                else
                        Offset -= tail_cnt;
        }

        //设置rktp_next_offset
        rktp->rktp_next_offset = Offset;

        rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);

        /* Wake-up broker thread which might be idling on IO */
        if (rktp->rktp_leader)
                rd_kafka_broker_wakeup(rktp->rktp_leader);

}
  • 从coordinattor获取已交给的offset(FetchOffsetRequest)
    rd_kafka_toppar_offset_fetch:

void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
                                   rd_kafka_replyq_t replyq) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        rd_kafka_topic_partition_list_t *part;
        rd_kafka_op_t *rko;

        part = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add0(part,
                                           rktp->rktp_rkt->rkt_topic->str,
                                           rktp->rktp_partition,
                       rd_kafka_toppar_keep(rktp));

        // 构造OffsetFetch的operator
        rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
    rko->rko_rktp = rd_kafka_toppar_keep(rktp);
    rko->rko_replyq = replyq;

    rko->rko_u.offset_fetch.partitions = part;
    rko->rko_u.offset_fetch.do_free = 1;

        // OffsetFetch 请求是与消费有关的,放入cgrp的op queue里
        rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
}
  • 取得用于消费的有用的offset

void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
                     int64_t query_offset, int backoff_ms) {
    rd_kafka_broker_t *rkb;
        rkb = rktp->rktp_leader;

         // 如果rkb是无效的,需要下一个timer来定时query
        if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
                backoff_ms = 500;

        if (backoff_ms) {
                rd_kafka_toppar_set_fetch_state(
                        rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
                // 启动timer, timer到期会执行rd_kafka_offset_query_tmr_cb回调,这个回调还是调用当前这个函数
        rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
                     &rktp->rktp_offset_query_tmr,
                     backoff_ms*1000ll,
                     rd_kafka_offset_query_tmr_cb, rktp);
        return;
        }

        // stop这个重试的timer
        rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
                            &rktp->rktp_offset_query_tmr, 1/*lock*/);

        // 从coordinattor获取需要消费的offset
    if (query_offset == RD_KAFKA_OFFSET_STORED &&
            rktp->rktp_rkt->rkt_conf.offset_store_method ==
            RD_KAFKA_OFFSET_METHOD_BROKER) {
                /*
                 * Get stored offset from broker based storage:
                 * ask cgrp manager for offsets
                 */
                rd_kafka_toppar_offset_fetch(
            rktp,
            RD_KAFKA_REPLYQ(rktp->rktp_ops,
                    rktp->rktp_op_version));

    } else {
                shptr_rd_kafka_toppar_t *s_rktp;
                rd_kafka_topic_partition_list_t *offsets;

                /*
                 * Look up logical offset (end,beginning,tail,..)
                 */
                s_rktp = rd_kafka_toppar_keep(rktp);

        if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
            query_offset = RD_KAFKA_OFFSET_END;

                offsets = rd_kafka_topic_partition_list_new(1);
                rd_kafka_topic_partition_list_add(
                        offsets,
                        rktp->rktp_rkt->rkt_topic->str,
                        rktp->rktp_partition)->offset = query_offset;

                // 基本上用于reset offset, 获取当前partition的最旧offset或最新offset
                rd_kafka_OffsetRequest(rkb, offsets, 0,
                                       RD_KAFKA_REPLYQ(rktp->rktp_ops,
                                                       rktp->rktp_op_version),
                                       rd_kafka_toppar_handle_Offset,
                                       s_rktp);

                rd_kafka_topic_partition_list_destroy(offsets);
        }

        rd_kafka_toppar_set_fetch_state(rktp,
                RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
}

Librdkafka源码分析-Content Table

相关文章