如何使用Java将数据从Spark Streaming保存到Cassandra?

Arsinux

我从linux终端中的流中获取了一些条目,将其分配为lines,分成words但是我不想将它们打印出来,而是将它们保存到Cassandra中。我有一个名为的Keyspace ks,其中有一个名为的表record我知道有些代码CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();必须完成这项工作,但我想我做错了什么。有人可以帮忙吗?

这是我的Cassandra ks.record模式(我通过CQLSH添加了这些数据)

id | birth_date                       | name
----+---------------------------------+-----------
10 | 1987-12-01 23:00:00.000000+0000  | Catherine
11 | 2004-09-07 22:00:00.000000+0000  |   Isadora
1  | 2016-05-10 13:00:04.452000+0000  |      John
2  | 2016-05-10 13:00:04.452000+0000  |      Troy
12 | 1970-10-01 23:00:00.000000+0000  |      Anna
3  | 2016-05-10 13:00:04.452000+0000  |    Andrew

这是我的Java代码:

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;


public class CassandraStreaming2 {
    public static void main(String[] args) {

        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming");
        JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1));

        // Create a DStream that will connect to hostname:port, like localhost:9999
        JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" "))
        );

        words.print();
        //CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();

        sc.start();              // Start the computation
        sc.awaitTermination();   // Wait for the computation to terminate

    }
}
拉斯

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md#saving-data-to-cassandra

根据文档,您还需要传递RowWriter工厂。最常见的方法是使用mapToRow(Class)api,这是描述的缺少参数。

但是您还有另一个问题,您的代码尚未以可以写入C *的方式指定数据。您的JavaDStream只有StringString对于给定的架构,不能将单个文件放入Cassandra行中。

基本上你是在告诉连接器

Write "hello" to CassandraTable (id, birthday, value)

不告诉它hello去向(id应该是什么?生日应该是什么?)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使用Java将数据从Spark Streaming保存到Cassandra?

来自分类Dev

无法将Java日期保存到Cassandra

来自分类Dev

无法将Java日期保存到Cassandra

来自分类Dev

使用TTL将火花保存到Cassandra

来自分类Dev

将数据从Spark保存到Cassandra会导致java.lang.ClassCastException

来自分类Dev

如何使用Hadoop将CQL集合对象保存到Cassandra?

来自分类Dev

Spark Streaming:如何有效地将foreachRDD数据保存到Mysql数据库中?

来自分类Dev

如何通过更改架构和添加其他属性将DataFrame从Spark保存到Cassandra表

来自分类Dev

使用Java将rdd保存到mongo数据库

来自分类Dev

如何使用Java将生成的PDF文件保存到MySQL数据库?

来自分类Dev

如何使用Java将生成的PDF文件保存到MySQL数据库?

来自分类Dev

如何使用 JAVA 将图像从 HTML 保存到数据库

来自分类Dev

如何使用VBScript将网站数据保存到文件?

来自分类Dev

如何使用React Native将数据保存到Firebase?

来自分类Dev

如何使用for循环将多行保存到数据库

来自分类Dev

用Java将数据保存到文件

来自分类Dev

spark-streaming:如何将流数据输出到cassandra

来自分类Dev

spark-streaming:如何将流数据输出到cassandra

来自分类Dev

使用REST将数据保存到Firebase

来自分类Dev

使用nodejs将数据保存到mongodb

来自分类Dev

如何使用Android Java将浮动内容保存到文件?

来自分类Dev

Spark Streaming-Java-将Kafka中的JSON插入Cassandra

来自分类Dev

使用Spark将数据写入Cassandra

来自分类Dev

将数据保存到socketio

来自分类Dev

将 Spark ML 管道保存到数据库

来自分类Dev

Spark流:使用PairRDD.saveAsNewHadoopDataset函数将数据保存到HBase

来自分类Dev

如何使用spark保存cassandra表的输出

来自分类Dev

通过Spark Streaming将原子写入Cassandra

来自分类Dev

如何将Spark Streaming数据转换为Spark DataFrame

Related 相关文章

  1. 1

    如何使用Java将数据从Spark Streaming保存到Cassandra?

  2. 2

    无法将Java日期保存到Cassandra

  3. 3

    无法将Java日期保存到Cassandra

  4. 4

    使用TTL将火花保存到Cassandra

  5. 5

    将数据从Spark保存到Cassandra会导致java.lang.ClassCastException

  6. 6

    如何使用Hadoop将CQL集合对象保存到Cassandra?

  7. 7

    Spark Streaming:如何有效地将foreachRDD数据保存到Mysql数据库中?

  8. 8

    如何通过更改架构和添加其他属性将DataFrame从Spark保存到Cassandra表

  9. 9

    使用Java将rdd保存到mongo数据库

  10. 10

    如何使用Java将生成的PDF文件保存到MySQL数据库?

  11. 11

    如何使用Java将生成的PDF文件保存到MySQL数据库?

  12. 12

    如何使用 JAVA 将图像从 HTML 保存到数据库

  13. 13

    如何使用VBScript将网站数据保存到文件?

  14. 14

    如何使用React Native将数据保存到Firebase?

  15. 15

    如何使用for循环将多行保存到数据库

  16. 16

    用Java将数据保存到文件

  17. 17

    spark-streaming:如何将流数据输出到cassandra

  18. 18

    spark-streaming:如何将流数据输出到cassandra

  19. 19

    使用REST将数据保存到Firebase

  20. 20

    使用nodejs将数据保存到mongodb

  21. 21

    如何使用Android Java将浮动内容保存到文件?

  22. 22

    Spark Streaming-Java-将Kafka中的JSON插入Cassandra

  23. 23

    使用Spark将数据写入Cassandra

  24. 24

    将数据保存到socketio

  25. 25

    将 Spark ML 管道保存到数据库

  26. 26

    Spark流:使用PairRDD.saveAsNewHadoopDataset函数将数据保存到HBase

  27. 27

    如何使用spark保存cassandra表的输出

  28. 28

    通过Spark Streaming将原子写入Cassandra

  29. 29

    如何将Spark Streaming数据转换为Spark DataFrame

热门标签

归档