Kafka elixir consumer keeps crashing

rafaelBackx

I have 2 mix projects one is called server which publishes factorials to kafka and the other is a consumer who is supposed to solve the factorials, but when i start the consumer it keeps crashing.

server.exs

defmodule Server do
 alias KafkaEx.Protocol.Produce.Request
 alias KafkaEx.Protocol.CreateTopics.TopicRequest

 def create_topic() do
   KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
 end

 def delete_topic() do
   KafkaEx.delete_topics("factorials-to-be-calculated")
 end

 def generate_number(max, min \\ 0) do
   number = :rand.uniform(max - min) + min
   message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
   IO.puts(number)
   request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
   {:ok, offset} = KafkaEx.produce(request)
 end
end

factorial_consumer.exs

defmodule Consumer.FactorialConsumer do
  use KafkaEx.GenConsumer
  require Logger
  alias KafkaEx.Protocol.Fetch.Message
  alias KafkaEx.Protocol.Produce.Request

  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end

  def factorial(0), do: 1
  def factorial(n), do: n * factorial(n-1)

end

application.exs (consumer)

defmodule Consumer.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application
  import Supervisor.Spec
  @impl true
  def start(_type, _args) do

    gen_consumer_impl = Consumer.FactorialConsumer
    consumer_group_name = "Factorials"
    topic_names = ["factorials-to-be-calculated"]
    consumer_group_opts = []

    children = [
      supervisor(
        KafkaEx.ConsumerGroup,
        [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
      )
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Consumer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

this is the error that i get when running iex -S mix run

Thank you for any help you can offer me

error description

edit: the link to the library i am using (KafkaEx) https://hexdocs.pm/kafka_ex/KafkaEx.html

stacktrace in plaintext: 17:07:13.790 [error] GenServer #PID<0.220.0> terminating ** (CaseClauseError) no case clause matching: {:error, {{:EXIT, {{:case_clause, {:error, {:undef, [{Consumer.FactorialConsumer, :init, ["factorials-to-be-calculated", 0, nil], []}, {KafkaEx.GenConsumer, :init, 1, [file: 'lib/kafka_ex/gen_consumer.ex', line: 545]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 417]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 385]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]}}}, [{KafkaEx.GenConsumer.Supervisor, :"-start_workers/3-fun-0-", 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 100]}, {Enum, :"-each/2-lists^foreach/1-0-", 2, [file: 'lib/enum.ex', line: 786]}, {KafkaEx.GenConsumer.Supervisor, :start_workers, 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 99]}, {KafkaEx.GenConsumer.Supervisor, :start_link, 4, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 57]}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 385]}, {:supervisor, :do_start_child, 2, [file: 'supervisor.erl', line: 371]}, {:supervisor, :handle_start_child, 2, [file: 'supervisor.erl', line: 677]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 426]}]}}, {:child, :undefined, :consumer, {KafkaEx.GenConsumer.Supervisor, :start_link, [{KafkaEx.GenConsumer, Consumer.FactorialConsumer}, "Factorials", [{"factorials-to-be-calculated", 0}], [commit_interval: 1000, generation_id: 224, member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd"]]}, :permanent, :infinity, :supervisor, [KafkaEx.GenConsumer.Supervisor]}}} (kafka_ex 0.11.0) lib/kafka_ex/consumer_group.ex:340: KafkaEx.ConsumerGroup.start_consumer/5 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:479: KafkaEx.ConsumerGroup.Manager.start_consumer/2 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:204: KafkaEx.ConsumerGroup.Manager.handle_info/2 (stdlib 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4 (stdlib 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6 (stdlib 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3 Last message: {:EXIT, #PID<0.224.0>, {:shutdown, :rebalance}} State: %KafkaEx.ConsumerGroup.Manager.State{assignments: [], consumer_module: Consumer.FactorialConsumer, consumer_opts: [commit_interval: 1000], consumer_supervisor_pid: #PID<0.225.0>, gen_consumer_module: KafkaEx.GenConsumer, generation_id: 223, group_name: "Factorials", heartbeat_interval: 1000, heartbeat_timer: #PID<0.224.0>, leader_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd", member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd", members: nil, partition_assignment_callback: &KafkaEx.ConsumerGroup.PartitionAssignment.round_robin/2, session_timeout: 30000, session_timeout_padding: 10000, supervisor_pid: #PID<0.219.0>, topics: ["factorials-to-be-calculated"], worker_name: #PID<0.221.0>}

rafaelBackx

I just fixed it by creating a new mix project

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Who keeps track of the last read message offset of the consumer in Apache Kafka?

分類Dev

Synaptic Package Manager keeps crashing

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

Unity keeps crashing back into login screen

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Visual Studio 2013 keeps crashing with error denev.exe

分類Dev

Trying to create a convolutional neural Autoencoder network in Keras but it keeps crashing

分類Dev

Please help my app keeps crashing on my phone and Emulator

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

Issue in implementing infinite retry in kafka consumer

分類Dev

Kafka - Assign messages to specific Consumer Groups

分類Dev

Apache Kafka 2.0 get the consumer lag

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

分類Dev

My application keeps crashing even though andriod studio shows application installated successfully

Related 関連記事

  1. 1

    Who keeps track of the last read message offset of the consumer in Apache Kafka?

  2. 2

    Synaptic Package Manager keeps crashing

  3. 3

    Kafka Consumer with JAVA

  4. 4

    Spring boot Kafka consumer

  5. 5

    Unity keeps crashing back into login screen

  6. 6

    Kafka consumer manual commit offset

  7. 7

    Kafka Stream: Consumer commit frequency

  8. 8

    Partition specific flink kafka consumer

  9. 9

    kafka consumer code is not running completely

  10. 10

    Kafka consumer hangs on poll when kafka is down

  11. 11

    Visual Studio 2013 keeps crashing with error denev.exe

  12. 12

    Trying to create a convolutional neural Autoencoder network in Keras but it keeps crashing

  13. 13

    Please help my app keeps crashing on my phone and Emulator

  14. 14

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  15. 15

    Kafka consumer group script to see all consumer group not working

  16. 16

    Kafka Avro Consumer with Decoderの問題

  17. 17

    Better way of error handling in Kafka Consumer

  18. 18

    Kafka Consumer get assigned partitions for a specific topic

  19. 19

    How do I implement Kafka Consumer in Scala

  20. 20

    Kafka consumer does not start from latest message

  21. 21

    Incorrect Kafka offset across consumer groups

  22. 22

    Connecting Apache Consumer to a single node on a Kafka Cluster

  23. 23

    Spring Boot Kafka consumer lags and reads wrong

  24. 24

    Issue in implementing infinite retry in kafka consumer

  25. 25

    Kafka - Assign messages to specific Consumer Groups

  26. 26

    Apache Kafka 2.0 get the consumer lag

  27. 27

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  28. 28

    Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

  29. 29

    My application keeps crashing even though andriod studio shows application installated successfully

ホットタグ

アーカイブ