我有一张桌子,下面的结构。
trans_count
start_time, end_time, count
00:00:01 00:00:10 1000
00:00:11 00:00:20 800
Spark会监听来自Kafka的事件,并进行10秒钟的分组,并且必须将其插入Phoenix hbase表中。
10秒钟后,我必须首先检查start_time,end_time组合是否在表中。如果存在,我们必须获取现有计数并添加新计数,然后再次向上插入。
UPSERT INTO trans_count(start_time, end_time, count) SELECT start_time, end_time, count? FROM trans_count WHERE start_time = ? AND end_time = ?
如果在上面的语句中没有添加行,则只需添加数据。
在Apache Storm中,我能够在configure方法中创建Phoenix连接对象,并且能够每10秒使用一次与UPSERT的相同连接。
在Spark中,我无法创建连接对象,也无法为RDD中的每个对象使用相同的对象。我从spark的输出将是JavaDStream>,其中start_time,end_time,count是Map中的所有键。
我最终为RDD的每次迭代创建一个连接对象,我觉得这不是正确的方法。我已经读过Phoenix连接是轻量级的,但是为每个RDD创建一个连接对象似乎不是正确的方法。
我读了一些有关相关内容的博客,但无法解决这个问题。请帮助。
注意:该应用程序是在JAVA中内置的。
解决方案:
不是在RDD中为每个对象创建连接,而是在RDD中为每个分区创建连接并将其用于所有对象。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句