KafkaトピックパーティションとSparkエグゼキューターマッピング

ファイサル・アーメド・シディキ

kafkaトピックでsparkストリーミングを使用しています。トピックは5つのパーティションで作成されます。私のすべてのメッセージは、テーブル名をキーとして使用してkafkaトピックに公開されます。これを考えると、そのテーブルのすべてのメッセージは同じパーティションに移動する必要があると思います。しかし、同じテーブルのスパークログメッセージで、エグゼキュータのノード1に送信されることもあれば、エグゼキュータのノード2に送信されることもあります。

次のコマンドを使用して、yarn-clusterモードでコードを実行しています。

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar

この送信により、ノード1とノード2の2つのエグゼキュータに1つのドライバが作成されます。

ノード1とノード2のエグゼキュータに同じパーティションを読み取らせたくありません。しかし、これは起こっています

また、次の構成を試してコンシューマーグループを指定しましたが、違いはありませんでした。

kafkaParams.put("group.id", "app1");

これは、createDirectStreamメソッドを使用してストリームを作成する方法です。* zookeeperを使用しないでください。

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("auto.offset.reset", "largest");
    kafkaParams.put("group.id", "app1");

        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc, 
                String.class, 
                String.class,
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, 
                topicsSet
        );

完全なコード:

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class DataProcessor2 implements Serializable {
    private static final long serialVersionUID = 3071125481526170241L;

    private static Logger log = LoggerFactory.getLogger("DataProcessor");

    public static void main(String[] args) {
        final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
        DataProcessorContextFactory3 factory = new DataProcessorContextFactory3();
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);

        // Start the process
        jssc.start();
        jssc.awaitTermination();
    }

}

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable {
    private static final long serialVersionUID = 6070911284191531450L;

    private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class);

    DataProcessorContextFactory3() {
    }

    @Override
    public JavaStreamingContext create() {
        logger.debug("creating new context..!");

        final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
        final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
        final String app = "app1";
        final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");

        logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app,
                offset);
        if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) {
            System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME
                    + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME
                    + " is a kafka topic to consume from \n\n\n");
            System.exit(1);
        }
        final String majorVersion = "1.0";
        final String minorVersion = "3";
        final String version = majorVersion + "." + minorVersion;
        final String applicationName = "DataProcessor-" + topic + "-" + version;
        // for dev environment
         SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName);
        // for cluster environment
        //SparkConf sparkConf = new SparkConf().setAppName(applicationName);
        final long sparkBatchDuration = Long
                .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));

        final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration));
        logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
        jssc.checkpoint(sparkCheckPointDir);

        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("auto.offset.reset", offset);
        kafkaParams.put("group.id", "app1");

//          @formatter:off
            JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                    jssc, 
                    String.class, 
                    String.class,
                    StringDecoder.class, 
                    StringDecoder.class, 
                    kafkaParams, 
                    topicsSet
            );
//          @formatter:on
        processRDD(messages, app);
        return jssc;
    }

    private void processRDD(JavaPairInputDStream<String, String> messages, final String app) {
        JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());

        rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {

            private static final long serialVersionUID = 250647626267731218L;

            @Override
            public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
                if (!currentRdd.isEmpty()) {
                    logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName());
                    currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() {

                        @Override
                        public void call(Iterator<MsgStruct> arg0) throws Exception {
                            while(arg0.hasNext()){
                                System.out.println(arg0.next().toString());
                            }
                        }
                    });
                } else {
                    logger.debug("Current RDD is empty.");
                }
                return null;
            }
        });
    }
    public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> {
        @Override
        public MsgStruct call(Tuple2<String, String> data) throws Exception {
            String message = data._2();
            System.out.println("message:"+message);
            return MsgStruct.parse(message);
        }

    }
    public static class MsgStruct implements Serializable{
        private String message;
        public static MsgStruct parse(String msg){
            MsgStruct m = new MsgStruct();
            m.message = msg;
            return m;
        }
        public String toString(){
            return "content inside="+message;
        }
    }

}
maasg

このDirectStreamアプローチを使用すると、Kafkaパーティションに送信されたメッセージが同じSparkパーティションに到達するというのは正しい仮定です。

想定できないのは、各Sparkパーティションが毎回同じSparkワーカーによって処理されるということです。バッチ間隔ごとOffsetRangeに、パーティションごとにSparkタスクが作成され、処理のためにクラスターに送信され、使用可能なワーカーに到達します。

パーティションの局所性を探しているもの。直接kafkaコンシューマーがサポートする唯一のパーティションローカリティは、SparkとKafkaのデプロイが同じ場所にある場合に処理されるオフセット範囲を含むkafkaホストです。しかし、それは私があまり見ない展開トポロジーです。

要件は、ホストの局所性を持つ必要性を決定した場合には、あなたがになっているはずですアパッチSamzaカフカストリーム

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spark + Kafka統合-KafkaパーティションのRDDパーティションへのマッピング

分類Dev

特定の Spark executor への kafka パーティションのマッピング

分類Dev

Kafkaを使用したデータモデリング?トピックとパーティション

分類Dev

KafkaトピックパーティションをSparkストリーミングに

分類Dev

Kafkaコンシューマー-特定のkafkaトピックパーティションからのイベントのポーリングを一時停止して、それを遅延キューとして使用します

分類Dev

Kafkaトピックとパーティショントピック

分類Dev

Kafkaログ圧縮トピックパーティションとディスクスペース

分類Dev

Kafka:単一のコンシューマーグループ、パーティションなし、複数のトピック

分類Dev

Kafkaトピック内の特定のパーティションからSparkを使用してデータをストリーミングする

分類Dev

ジャクソンとの非対称の名前/プロパティマッピング、よりエレガントなソリューション?

分類Dev

Sparkストリーミング-Kafkaトピックの特定のパーティションを消費することは可能ですか?

分類Dev

Azure ServiceBusトピックのパーティショニング

分類Dev

Kafkaトピックパーティションでのキーと値のavroメッセージの配布

分類Dev

C#オートマッパーマッピングディクショナリプロパティ

分類Dev

セッションとリクエストリピーターのFlaskセキュリティの問題

分類Dev

トピック内の特定のパーティションからのストリーミング(Kafka Streams)

分類Dev

Apache Kafka-トピック/パーティションのKafkaStream

分類Dev

Kafkaトピックパーティション

分類Dev

Kafkaプロデューサーはトピックとパーティションを作成できますか?

分類Dev

マレットでのトピックモデリング; ドキュメンテーション

分類Dev

ADFマッピングデータフロー-シンクトランスフォームダイナミックパーティション数

分類Dev

トピック、パーティション、キー

分類Dev

トピック、パーティション、キー

分類Dev

トピック、パーティション、キー

分類Dev

kafka-複数のトピックと複数のパーティション

分類Dev

AutoMapperナビゲーションプロパティをマッピングする際のタイプのマッピングエラー

分類Dev

ApacheIgniteコンピューティンググリッドとSparkのパフォーマンスの違い

分類Dev

複数のワーカー(同じ数のパーティション)を使用して、同じトピックでkafkaコンシューマーアプリケーションをスケーリングする方法

分類Dev

単一のプロデューサー、トピック、ブローカーのためのKafkaパーティショニング

Related 関連記事

  1. 1

    Spark + Kafka統合-KafkaパーティションのRDDパーティションへのマッピング

  2. 2

    特定の Spark executor への kafka パーティションのマッピング

  3. 3

    Kafkaを使用したデータモデリング?トピックとパーティション

  4. 4

    KafkaトピックパーティションをSparkストリーミングに

  5. 5

    Kafkaコンシューマー-特定のkafkaトピックパーティションからのイベントのポーリングを一時停止して、それを遅延キューとして使用します

  6. 6

    Kafkaトピックとパーティショントピック

  7. 7

    Kafkaログ圧縮トピックパーティションとディスクスペース

  8. 8

    Kafka:単一のコンシューマーグループ、パーティションなし、複数のトピック

  9. 9

    Kafkaトピック内の特定のパーティションからSparkを使用してデータをストリーミングする

  10. 10

    ジャクソンとの非対称の名前/プロパティマッピング、よりエレガントなソリューション?

  11. 11

    Sparkストリーミング-Kafkaトピックの特定のパーティションを消費することは可能ですか?

  12. 12

    Azure ServiceBusトピックのパーティショニング

  13. 13

    Kafkaトピックパーティションでのキーと値のavroメッセージの配布

  14. 14

    C#オートマッパーマッピングディクショナリプロパティ

  15. 15

    セッションとリクエストリピーターのFlaskセキュリティの問題

  16. 16

    トピック内の特定のパーティションからのストリーミング(Kafka Streams)

  17. 17

    Apache Kafka-トピック/パーティションのKafkaStream

  18. 18

    Kafkaトピックパーティション

  19. 19

    Kafkaプロデューサーはトピックとパーティションを作成できますか?

  20. 20

    マレットでのトピックモデリング; ドキュメンテーション

  21. 21

    ADFマッピングデータフロー-シンクトランスフォームダイナミックパーティション数

  22. 22

    トピック、パーティション、キー

  23. 23

    トピック、パーティション、キー

  24. 24

    トピック、パーティション、キー

  25. 25

    kafka-複数のトピックと複数のパーティション

  26. 26

    AutoMapperナビゲーションプロパティをマッピングする際のタイプのマッピングエラー

  27. 27

    ApacheIgniteコンピューティンググリッドとSparkのパフォーマンスの違い

  28. 28

    複数のワーカー(同じ数のパーティション)を使用して、同じトピックでkafkaコンシューマーアプリケーションをスケーリングする方法

  29. 29

    単一のプロデューサー、トピック、ブローカーのためのKafkaパーティショニング

ホットタグ

アーカイブ