我正在尝试通过Spark Streaming和Spark SQL处理日志。主要思想是将具有“查询”所需的“旧”数据转换为DataFrame的Parquet格式的“压缩”数据集,压缩数据集的加载方式如下:
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
DataFrame compact = null;
compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");
由于未压缩的数据集(我每天都会压缩数据集)由许多文件组成,因此我希望将当天的数据存储在DStream中,以便快速获取这些查询。
我尝试了DataFrame方法,但没有结果。
DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
df.registerTempTable("lastData");
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
......drop old data from lastData table
df.insertInto("lastData");
}
});
使用这种方法,例如,如果我在另一个线程中查询临时表,则不会得到任何结果。
我也尝试过使用RDD转换方法,更具体地说,我尝试遵循Spark示例,在其中创建一个空的RDD,然后将DSStream RDD内容与空的RDD合并:
JavaRDD<Row> lastData = sc.emptyRDD();
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
lastData.union(v1).filter(let only recent data....);
}
});
这种方法也行不通,因为我没有在lastData中获取任何内容
我可以为此目的使用Windowed计算或updateStateBy键吗?
有什么建议?
谢谢你的帮助!
好吧,我终于明白了。
我使用updateState函数,如果这样的时间戳早于24小时,则返回0。
final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
= (List<Long> values, Optional<Long> state) -> {
Long newSum = state.or(0L);
for (Long value : values) {
newSum += value;
}
//current milis uses UTC
if (System.currentTimeMillis() - newSum > 86400000L) {
return Optional.absent();
} else {
return Optional.of(newSum);
}
};
然后在每个批次上,我将DataFrame注册为临时表:
finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
if (!rdd.isEmpty()) {
HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
if (sqlContext1.cacheManager().isCached("alarm_recent")) {
sqlContext1.uncacheTable("alarm_recent");
}
DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
wordsDataFrame.registerTempTable("alarm_recent");
wordsDataFrame.cache();//
wordsDataFrame.first();
}
return null;
});
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句