librdkafka:rd_kafka_assignment返回分配的分区的偏移量-1001

凯文

当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量均为-1001。如果我打印出接收到的消息的偏移量,则偏移量将设置为正确的值。

这是我用来消耗消息的代码:

static void print_partition_list(FILE* fp,
    const rd_kafka_topic_partition_list_t
    * partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%d] offset %lld",
            i > 0 ? "," : "",
            partitions->elems[i].topic,
            partitions->elems[i].partition,
            partitions->elems[i].offset);
    }
    fprintf(fp, "\n");

}

static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
    fprintf(stderr, "%% Consumer group rebalanced: ");
    switch (err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, partitions);
        break;
    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, NULL);
        break;
    default:
        fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
        rd_kafka_assign(rk, NULL);
        break;
    }
}

int main()
{

    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_resp_err_t err;

    char errstr[512];
    const char* brokers{ "localhost:9092" };
    const char* groupid{ "OffsetTest" };
    const char* topics[] = { "OffsetTesting" };

    rd_kafka_topic_partition_list_t* subscription;

    conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }


    conf = NULL;

    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);

    int runningCounter = 0;

    while (runningCounter != 10) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm) {
            Sleep(2000);
            runningCounter++;
            continue;
        }
        if (rkm->err) {
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        rd_kafka_topic_partition_list_t* list;
        err = rd_kafka_assignment(rk, &list);

        if (err) {
            fprintf(stderr,
                "%% Failed to subscribe to %d topics: %s\n",
                subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            return 1;
        }

        print_partition_list(stderr, list);

        rd_kafka_topic_partition_list_destroy(list);

        printf("Message on %s [%d] at offset %lld:\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        if (rkm->key)
            printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        if (rkm->payload)
            printf(" Value: %.*s\n",
            (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->key)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_commit_message(rk, rkm, 0);

        rd_kafka_message_destroy(rkm);

        runningCounter++;
    }

    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);

    rd_kafka_destroy(rk);

    return 0;

}

我知道这里有一个类似问题的答案LibRdKafka:commited_offset始终为-1001,但这无济于事。我将主题分区列表分配给中的使用者rebalance_cb

更新:

这是例如2条消息的输出:

> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).

> % Subscribed to 1 topic(s), waiting for rebalance and messages...

> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
> 
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1
米凯尔之家

我相信这可能是设计使然。

rd_kafka_assignment()方法返回通过提供的分配rd_kafka_assign()当为消费者分配组中的分区时,分配只是分区列表,没有偏移量。

同样在Java库中,assignment()return Set<TopicPartition>,这里也没有偏移量。在librdkafka中,rd_kafka_assignment()给出一个rd_kafka_topic_partition_list_t与相似的Set<TopicPartition>主要区别在于,它重用了rd_kafka_topic_partition_t类型,类型具有一些额外的字段,例如offset

rd_kafka_topic_partition_t类型在很多地方都使用过,并且它的所有字段在所有情况下都没有意义。分配上下文就是这种情况,因此某些字段设置为“空白”值,这是-1001针对偏移量的。

如果要获取分配的当前偏移量,则需要使用rd_kafka_position()同样,在Java中,您将使用position()

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

返回-1而不是1001或1002或1003的HttpsURLConnection.getResponseCode()

来自分类Dev

Swift URLSession.shared.dataTask GET 请求 -1001 返回超时

来自分类Dev

Kafka分区偏移量分布不均

来自分类Dev

Google OAuth错误-1001

来自分类Dev

NSURLConnection 以错误结束 - 1001

来自分类Dev

给定偏移量列表,从特定的Kafka分区读取偏移量的最快方法是什么?

来自分类Dev

如何获取kafka主题分区的最后/结束偏移量?

来自分类Dev

如何获取kafka主题分区的最新偏移量?

来自分类Dev

在kafka分区中压缩的偏移量会怎样?

来自分类Dev

NSURLErrorDomain代码= -1001请求超时

来自分类Dev

错误代码崩溃:-1001错误:NSURLErrorTimedOut

来自分类Dev

安装程序错误:进程必须退出1001

来自分类Dev

RestKit-错误域代码1001

来自分类Dev

RestKit-错误域代码1001

来自分类Dev

iOS应用提交-超时(-1001,2)

来自分类Dev

kafka主题的每个分区中的提交数和偏移量

来自分类Dev

我可以检索Kafka分区的最新可用偏移量而不检索所有消息吗?

来自分类Dev

在偏移量为0的分区0中找不到消息Kafka kafdrop

来自分类Dev

如何在Python中以编程方式获取每个Kafka主题分区的最新偏移量

来自分类Dev

清理分区后,Kafka是否将偏移量更改为0?

来自分类Dev

kafka 如何使用主题/分区/偏移量实现恰好一次的消息传递逻辑

来自分类Dev

Kafka 1.0 重置偏移量不会将所有分区重置为 0

来自分类Dev

在 Kafka 中,消费者在哪个 __consumer_offsets 分区上提交偏移量?

来自分类Dev

分区 0 的偏移量非常接近其余分区的偏移量之和

来自分类Dev

Kafka Connect偏移量。获取/设置?

来自分类Dev

Kafka-偏移量和logSize

来自分类Dev

Apache Kafka偏移量是如何生成的?

来自分类Dev

jQuery偏移量返回错误的值

来自分类Dev

jQuery偏移量返回负值

Related 相关文章

  1. 1

    返回-1而不是1001或1002或1003的HttpsURLConnection.getResponseCode()

  2. 2

    Swift URLSession.shared.dataTask GET 请求 -1001 返回超时

  3. 3

    Kafka分区偏移量分布不均

  4. 4

    Google OAuth错误-1001

  5. 5

    NSURLConnection 以错误结束 - 1001

  6. 6

    给定偏移量列表,从特定的Kafka分区读取偏移量的最快方法是什么?

  7. 7

    如何获取kafka主题分区的最后/结束偏移量?

  8. 8

    如何获取kafka主题分区的最新偏移量?

  9. 9

    在kafka分区中压缩的偏移量会怎样?

  10. 10

    NSURLErrorDomain代码= -1001请求超时

  11. 11

    错误代码崩溃:-1001错误:NSURLErrorTimedOut

  12. 12

    安装程序错误:进程必须退出1001

  13. 13

    RestKit-错误域代码1001

  14. 14

    RestKit-错误域代码1001

  15. 15

    iOS应用提交-超时(-1001,2)

  16. 16

    kafka主题的每个分区中的提交数和偏移量

  17. 17

    我可以检索Kafka分区的最新可用偏移量而不检索所有消息吗?

  18. 18

    在偏移量为0的分区0中找不到消息Kafka kafdrop

  19. 19

    如何在Python中以编程方式获取每个Kafka主题分区的最新偏移量

  20. 20

    清理分区后,Kafka是否将偏移量更改为0?

  21. 21

    kafka 如何使用主题/分区/偏移量实现恰好一次的消息传递逻辑

  22. 22

    Kafka 1.0 重置偏移量不会将所有分区重置为 0

  23. 23

    在 Kafka 中,消费者在哪个 __consumer_offsets 分区上提交偏移量?

  24. 24

    分区 0 的偏移量非常接近其余分区的偏移量之和

  25. 25

    Kafka Connect偏移量。获取/设置?

  26. 26

    Kafka-偏移量和logSize

  27. 27

    Apache Kafka偏移量是如何生成的?

  28. 28

    jQuery偏移量返回错误的值

  29. 29

    jQuery偏移量返回负值

热门标签

归档