Python 中的数据流未显示来自 Pubsub 订阅的输出集合

大步兽凯尼

我有一个用 Python 编写的 Dataflow 作业。很简单,只从订阅中读取,应用Fixed Window,然后写入GCS。

问题是从订阅读取后,FixedWindow 不显示任何输出集合。

我一直在尝试任何没有运气的东西。

这是我的代码

import apache_beam as beam
import argparse
import logging
import apache_beam.transforms.window as window
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

def run(argv=None):

    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        required=True,
                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        required=True,
                        default='gs://dataflow-samples/',
                        help='Output file to write results to.')
    parser.add_argument('--topic',
                        dest='topic',
                        required=True,
                        help='Topic for message.')
    parser.add_argument('--subscription',
                        dest='subscription',
                        required=True,
                        help='Subscription for message.')
    parser.add_argument('--entity_type',
                        dest='entity_type',
                        required=True,
                        help='Entity Type for message.')
    parser.add_argument('--event_type',
                        dest='event_type',
                        required=True,
                        help='Event Type for message.')                        
    parser.add_argument('--outputFilenamePrefix',
                        dest='outputFilenamePrefix',
                        required=True,
                        help='Output Filename Prefix Type for message.')   
    parser.add_argument('--outputFilenameSuffix',
                        dest='outputFilenameSuffix',
                        required=True,
                        help='Output Filename Suffix Type for message.') 

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming= True
    p = beam.Pipeline(options=pipeline_options)

    if known_args.subscription:
        messages = (p 
                | ReadFromPubSub(subscription=known_args.subscription, with_attributes=True))
    else:
        messages = (p 
                | ReadFromPubSub(subscription=known_args.topic, with_attributes=True))

    (messages 
            | beam.WindowInto(window.FixedWindows(120))
            | beam.io.WriteToText(known_args.output + known_args.outputFilenamePrefix, 
                                    file_name_suffix=known_args.outputFilenameSuffix,
                                    num_shards=1))

    result = p.run()
    result.wait_until_finish()

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

这个想法是将结果保存在提供的存储桶中。我一直在读到,尚不支持无界数据中的某些功能,例如 Window。在这种情况下,唯一的解决方案是使用 Java 来执行此操作。

亚历克斯·阿马托

写入 GCS 时,python 不支持 WriteToText。正如您讨论的那样,这将适用于 Java。或者您可以将记录写入单独的 IO,例如 BigQuery。

支持的 IO 内置

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

部署后使用requirements_file参数后,python数据流作业不接受来自pubsub订阅的消息

来自分类Dev

python pubsub订阅到期

来自分类Dev

python pubsub订阅多个主题

来自分类Dev

python pubsub订阅多个主题

来自分类Dev

在Dataflow Python中从PubSub读取AVRO消息

来自分类Dev

自动缩放中PubSub推送订阅的App Engine请求超时

来自分类Dev

区域丢失后从数据流中恢复PubSub Acked消息

来自分类Dev

Google PubSub Python 客户端中的内存泄漏

来自分类Dev

接收重复消息的 Google PubSub Python 多订阅者客户端

来自分类Dev

输出未显示在 python 中

来自分类Dev

Firebase Cloud pubsub订阅停止监听消息

来自分类Dev

从模板启动时,数据流作业不会从 PubSub 使用

来自分类Dev

如何在Snap中延迟输出数据流

来自分类Dev

在Android中读取python pickle数据流

来自分类Dev

在Python中快速比较数据流图

来自分类Dev

在数据流作业中查找重复项-Python

来自分类Dev

python客户端的PubSub publish()函数中的动态属性数量

来自分类Dev

Python流数据流“ WriteToPubSub”行为

来自分类Dev

通过HTTP的python pubsub /消息队列?

来自分类Dev

Python脚本输出未显示在CMD中

来自分类Dev

云数据流 python3 作业未解决依赖项

来自分类Dev

StackExchange.Redis:获取频道订阅数(即PUBSUB NUMSUB)

来自分类Dev

可以订阅的PubSub主题数量是否有限制?

来自分类Dev

如何在VPC服务控件中使用PubSub Push订阅?

来自分类Dev

使用PUBSUB订阅时,为什么不能PING?

来自分类Dev

Twisted SSE服务器通过pubsub订阅了Redis

来自分类Dev

订阅 PubSub 主题的速率限制/节流谷歌云功能

来自分类Dev

Python-PDSEND数据流

来自分类Dev

故意延迟订阅集合的数据同步

Related 相关文章

热门标签

归档