Spark Streaming + Spark SQL

单身的

我正在尝试通过Spark Streaming和Spark SQL处理日志。主要思想是将具有“查询”所需的“旧”数据转换为DataFrame的Parquet格式的“压缩”数据集,压缩数据集的加载方式如下:

    SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
    DataFrame compact = null;
    compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");

由于未压缩的数据集(我每天都会压缩数据集)由许多文件组成,因此我希望将当天的数据存储在DStream中,以便快速获取这些查询。

我尝试了DataFrame方法,但没有结果。

    DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
    df.registerTempTable("lastData");
    JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
            ......drop old data from lastData table                                
            df.insertInto("lastData");

        }
    });

使用这种方法,例如,如果我在另一个线程中查询临时表,则不会得到任何结果。

我也尝试过使用RDD转换方法,更具体地说,我尝试遵循Spark示例,在其中创建一个空的RDD,然后将DSStream RDD内容与空的RDD合并:

  JavaRDD<Row> lastData = sc.emptyRDD();
  JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            lastData.union(v1).filter(let only recent data....);
        }
    });

这种方法也行不通,因为我没有在lastData中获取任何内容

我可以为此目的使用Windowed计算或updateStateBy键吗?

有什么建议?

谢谢你的帮助!

单身的

好吧,我终于明白了。

我使用updateState函数,如果这样的时间戳早于24小时,则返回0。

      final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
        = (List<Long> values, Optional<Long> state) -> {
            Long newSum = state.or(0L);
            for (Long value : values) {
                newSum += value;
            }
            //current milis uses UTC
            if (System.currentTimeMillis() - newSum > 86400000L) {
                return Optional.absent();
            } else {
                return Optional.of(newSum);
            }
        };

然后在每个批次上,我将DataFrame注册为临时表:

finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
        if (!rdd.isEmpty()) {
            HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
            if (sqlContext1.cacheManager().isCached("alarm_recent")) {
                sqlContext1.uncacheTable("alarm_recent");
            }
            DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
            wordsDataFrame.registerTempTable("alarm_recent");

            wordsDataFrame.cache();//    
            wordsDataFrame.first();
        }
        return null;
    });

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spark Streaming + Spark SQL

来自分类Dev

使用Spark Streaming填充的Cassandra表上的Spark SQL

来自分类Dev

Spark Structured Streaming / Spark SQL 中的条件爆炸

来自分类Dev

Spark Streaming历史状态

来自分类Dev

Spark Streaming累计字数

来自分类Dev

结合Spark Streaming + MLlib

来自分类Dev

Spark Streaming连续作业

来自分类Dev

Spark Streaming Kafka流

来自分类Dev

Spark Streaming StreamingContext错误

来自分类Dev

Spark Streaming textFileStream COPYING

来自分类Dev

在Spark Streaming(Spark 2.0)中使用Kafka

来自分类Dev

Spark Streaming过滤流数据

来自分类Dev

Spark Streaming MapWithState超时延迟?

来自分类Dev

Spark Streaming异常处理策略

来自分类Dev

Spark Streaming中的顺序处理

来自分类Dev

Spark Streaming中的并发操作

来自分类Dev

在Spark Streaming中缓存DStream

来自分类Dev

来自Kafka Consumer的Spark Streaming

来自分类Dev

Spark Streaming StreamingContext活动计数

来自分类Dev

Spark Streaming Standalone:保存日志

来自分类Dev

Spark Streaming不包含KafkaUtils

来自分类Dev

使用 Spark Streaming 分析日志

来自分类Dev

Spark Streaming + Kafka 集成 0.8.2.1

来自分类Dev

结构化 Spark Streaming 抛出 java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.internalCreateDataFrame

来自分类Dev

Spark提交失败,Spark Streaming WorkdCount Python代码

来自分类Dev

如何将Spark Streaming数据转换为Spark DataFrame

来自分类Dev

Spark和Spark Streaming中的时间序列预测

来自分类Dev

Spark Twitter Streaming异常:(org.apache.spark.Logging)classnotfound

来自分类Dev

从Spark-SQL-Streaming写入HDFS和S3时如何避免小文件问题