BigQuery writeTableRows 始终写入缓冲区

坎佩

我们正在尝试使用 Apache Beam 和 avro 写入 Big Query。

以下似乎工作正常:-

p.apply("Input", AvroIO.read(DataStructure.class).from("AvroSampleFile.avro"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Load", BigQueryIO.writeTableRows().to(table).withSchema(schema));

然后我们尝试通过以下方式使用它从 Google Pub/Sub 获取数据

p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

当我们这样做时,它总是将其推送到缓冲区,而 Big Query 似乎需要很长时间才能从缓冲区读取。谁能告诉我为什么上面不会将记录直接写入 Big Query 表?

更新:- 看起来我需要添加以下设置,但这会引发 java.lang.IllegalArgumentException。

.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
坎佩

答案是您需要像这样包含“withNumFileShards”(可以是 1 到 1000)。

        p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
            .withMethod(Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

我在任何地方都找不到这个文档来说明 withNumFileShards 是强制性的,但是我在修复后找到了一个 Jira 票证。

https://issues.apache.org/jira/browse/BEAM-3198

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

流式缓冲区 - Google BigQuery

来自分类Dev

Google BigQuery错误:缓冲区空间不足(错误代码:无效)

来自分类Dev

在bigquery中使用标准sql查询缓冲区/未分区数据

来自分类Dev

使用bigquery api时表被截断(缓冲区大小问题?)

来自分类Dev

Flatbuffer缓冲区始终为空

来自分类Dev

FileStream将缓冲区写入文件

来自分类Dev

写入新的Python缓冲区接口

来自分类Dev

内置追加与字节。缓冲区写入

来自分类Dev

将文件写入缓冲区

来自分类Dev

Bash命令写入临时缓冲区

来自分类Dev

如果接收到的数据小于缓冲区的长度,则recv()写入缓冲区的内容

来自分类Dev

Ubuntu文件系统缓冲区/缓存始终为空

来自分类Dev

缓冲区两次写入文件

来自分类Dev

模具传递是否写入颜色缓冲区?

来自分类Dev

无法将大缓冲区写入EEPROM

来自分类Dev

直接写入std :: string的char *缓冲区

来自分类Dev

将opengl缓冲区写入视频的工件

来自分类Dev

两次写入同一OpenCL缓冲区

来自分类Dev

MongoDB RangeError:尝试在缓冲区边界之外写入

来自分类Dev

将byte []写入OutputStream时添加缓冲区

来自分类Dev

OpenCL内核随机写入__global缓冲区是否安全?

来自分类Dev

每次写入缓冲区时都应调用vkMapMemory吗?

来自分类Dev

只能将Char *写入LPVOID缓冲区吗?

来自分类Dev

“写入缓存缓冲区刷新”是什么意思

来自分类Dev

直接写入std :: strings缓冲区是否安全?

来自分类Dev

无法将大缓冲区写入EEPROM

来自分类Dev

无法写入C中的共享内存缓冲区

来自分类Dev

是否在缓冲区填充或“实时”时写入日志文件?

来自分类Dev

写入键盘缓冲区的快捷方式

Related 相关文章

热门标签

归档