저는 Kafka와 함께 일하고 있으며 원하는 모든 주제를 구독하는 단일 소비자를 구현하려고합니다. 3 개의 주제 (A, B, C)가 있다고 가정 해 보겠습니다. 각 주제의 메시지를 동 기적으로 처리하지만 주제간에 병렬로 처리하는 방법입니다. 따라서 A 주제의 경우 메시지를 하나씩 처리해야하지만 동시에 다른 주제의 메시지를 하나씩 처리해야합니다.
각 주제에 대해 별도의 스레드가 필요한 것 같습니다. 어떻게 구현할 수 있는지 조언 해 주시겠습니까? 이에 대한 준비된 해결책이 있습니까? 내 소비자는
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
... processing here
_consumer.Commit(consumeResult);
}
아직 C #에서 asnyc에 대한 경험이 없기 때문에 구현 방법을 모르겠습니다. Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2 와 같은 것을 찾았습니다 .
메시지를 받으면 해당 작업 스레드 / 작업으로 메시지를 리디렉션하는 방법은 무엇입니까?
Confluent.Kafka 1.3.0 패키지를 사용하여 Kafka 작업
var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
{
... processing here
_consumer.Commit(consumeResult);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}
이 코드는 구독하는 모든 주제에 대해 ActionBlock 을 생성 합니다.
그런 다음 메시지를 받으면 해당 채널로 리디렉션하십시오.
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume();
await channels[consumeResult.Topic].SendAsync(consumeResult);
}
@PauloMorgado 답변에서 기사 읽기
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다