使用DoFn使用Cloud Dataflow从PubSub写入Google Cloud Storage

弗兰克·威尔逊

我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google Cloud Storage。我知道TextIO / AvroIO不支持流传输管道。但是,我在[1]中读到,有可能从ParDo/DoFn作者的评论中的流管道中写入GCS 我尽可能地关注他们的文章,从而构建了一条管道。

我的目标是这种行为:

  • 在GCS中的对象(每个窗口窗格一个)中最多以100条为单位写出消息,该消息的路径与消息发布的时间相对应dataflow-requests/[isodate-time]/[paneIndex]

我得到不同的结果:

  • 每个小时的窗口中只有一个窗格。因此,我每个小时的“存储桶”中只会得到一个文件(这实际上是GCS中的一个对象路径)。将MAX_EVENTS_IN_FILE减少到10没什么区别,仍然只有一个窗格/文件。
  • 每个GCS对象中只有一条消息被写出
  • 写入GCS时,管道有时会引发CRC错误。

如何解决这些问题并获得预期的行为?

样本日志输出:

21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0

这是我的代码:

package com.example.dataflow;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import com.google.gcloud.storage.Storage;
import com.google.gcloud.storage.StorageOptions;
import org.joda.time.Duration;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class PubSubGcsSSCCEPipepline {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);

    public static final String BUCKET_PATH = "dataflow-requests";

    public static final String BUCKET_NAME = "myBucketName";

    public static final Duration ONE_DAY = Duration.standardDays(1);
    public static final Duration ONE_HOUR = Duration.standardHours(1);
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10);

    public static final int MAX_EVENTS_IN_FILE = 100;

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";

    private static class DoGCSWrite extends DoFn<String, Void>
        implements DoFn.RequiresWindowAccess {

        public transient Storage storage;

        { init(); }

        public void init() { storage = StorageOptions.defaultInstance().service(); }

        private void readObject(java.io.ObjectInputStream in)
                throws IOException, ClassNotFoundException {
            init();
        }

        @Override
        public void processElement(ProcessContext c) throws Exception {
            String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
            String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, c.pane().getIndex());

            BlobId blobId = BlobId.of(BUCKET_NAME, blobName);
            LOG.info("writing pane {} to blob {}", c.pane().getIndex(), blobName);
            storage.create(BlobInfo.builder(blobId).contentType("text/plain").build(), c.element().getBytes());
            LOG.info("sucessfully write pane {} to blob {}", c.pane().getIndex(), blobName);
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        options.as(DataflowPipelineOptions.class).setStreaming(true);
        Pipeline p = Pipeline.create(options);

        PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
                .subscription(PUBSUB_SUBSCRIPTION);

        PCollection<String> streamData = p.apply(readFromPubsub);

        PCollection<String> windows = streamData.apply(Window.<String>into(FixedWindows.of(ONE_HOUR))
                .withAllowedLateness(ONE_DAY)
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
                        .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
                                AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(TEN_SECONDS))))
                .discardingFiredPanes());

        windows.apply(ParDo.of(new DoGCSWrite()));

        p.run();
    }


}

[1] https://labs.spotify.com/2016/03/10/spotifys-event-delivery-the-road-to-the-cloud-part-iii/

感谢Sam McVeety提供的解决方案。这是任何阅读者的更正代码:

package com.example.dataflow;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.WriteChannel;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import com.google.gcloud.storage.Storage;
import com.google.gcloud.storage.StorageOptions;
import org.joda.time.Duration;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;

public class PubSubGcsSSCCEPipepline {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);

    public static final String BUCKET_PATH = "dataflow-requests";

    public static final String BUCKET_NAME = "myBucketName";

    public static final Duration ONE_DAY = Duration.standardDays(1);
    public static final Duration ONE_HOUR = Duration.standardHours(1);
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10);

    public static final int MAX_EVENTS_IN_FILE = 100;

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";

    private static class DoGCSWrite extends DoFn<Iterable<String>, Void>
        implements DoFn.RequiresWindowAccess {

        public transient Storage storage;

        { init(); }

        public void init() { storage = StorageOptions.defaultInstance().service(); }

        private void readObject(java.io.ObjectInputStream in)
                throws IOException, ClassNotFoundException {
            init();
        }

        @Override
        public void processElement(ProcessContext c) throws Exception {
            String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
            long paneIndex = c.pane().getIndex();
            String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, paneIndex);

            BlobId blobId = BlobId.of(BUCKET_NAME, blobName);

            LOG.info("writing pane {} to blob {}", paneIndex, blobName);
            WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());
            LOG.info("blob stream opened for pane {} to blob {} ", paneIndex, blobName);
            int i=0;
            for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
                i++;
                writer.write(ByteBuffer.wrap(it.next().getBytes()));
                LOG.info("wrote {} elements to blob {}", i, blobName);
            }
            writer.close();
            LOG.info("sucessfully write pane {} to blob {}", paneIndex, blobName);
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        options.as(DataflowPipelineOptions.class).setStreaming(true);
        Pipeline p = Pipeline.create(options);

        PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
                .subscription(PUBSUB_SUBSCRIPTION);

        PCollection<String> streamData = p.apply(readFromPubsub);
        PCollection<KV<String, String>> keyedStream =
                streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
                    public String apply(String s) { return "constant"; } }));

        PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream
                .apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_HOUR))
                        .withAllowedLateness(ONE_DAY)
                        .triggering(AfterWatermark.pastEndOfWindow()
                                .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
                                .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
                                        AfterProcessingTime.pastFirstElementInPane()
                                                .plusDelayOf(TEN_SECONDS))))
                        .discardingFiredPanes())
                .apply(GroupByKey.create());


        PCollection<Iterable<String>> windows = keyedWindows
                .apply(Values.<Iterable<String>>create());


        windows.apply(ParDo.of(new DoGCSWrite()));

        p.run();
    }

}
山姆·麦克维蒂

这里有一个陷阱,这是您需要一个GroupByKey以便适当地聚合窗格。Spotify示例将其引用为“窗格的标准化是在“聚合事件”转换中完成的,这仅是GroupByKey转换”,但这是一个微妙的要点。您需要提供一个密钥才能执行此操作,在您的情况下,它似乎可以使用恒定值。

  PCollection<String> streamData = p.apply(readFromPubsub);
  PCollection<KV<String, String>> keyedStream =
        streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
           public Integer apply(String s) { return "constant"; } }));

在这一点上,您可以应用窗口功能,然后应用最终控件GroupByKey来获得所需的行为:

  PCollection<String, Iterable<String>> keyedWindows = keyedStream.apply(...)
       .apply(GroupByKey.create());
  PCollection<Iterable<String>> windows = keyedWindows
       .apply(Values.<Iterable<String>>create());

现在,中的元素processElement将为Iterable<String>,大小为100或更大。

我们已提交https://issues.apache.org/jira/browse/BEAM-184,以使此行为更清晰。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用NodeJS为Google Cloud Storage创建签名的URL

来自分类Dev

上载Google Cloud Storage iOS

来自分类Dev

如何使用Java从Google Cloud Storage下载文件?

来自分类Dev

Android的Google Cloud Storage管理

来自分类Dev

使用Flask将文件上传到Google Cloud Storage

来自分类Dev

使用PHP API可恢复上传到Google Cloud Storage

来自分类Dev

使用Fog时,Google Cloud Storage中的额外对象

来自分类Dev

无法使用Android从Google Cloud Storage网址下载图片

来自分类Dev

使用Appengine,连接到其他Google Cloud Storage

来自分类Dev

Google Cloud Storage Javascript使用情况

来自分类Dev

使用.boto Google Cloud Storage API进行身份验证

来自分类Dev

Configure Django and Google Cloud Storage?

来自分类Dev

在Appengine上将Google Cloud Storage与Golang结合使用

来自分类Dev

如何使用Elixir或Erlang创建Google Cloud Storage签名URL?

来自分类Dev

Google Cloud Storage Force下载

来自分类Dev

是否使用Java API在Google Cloud Storage中批量插入?

来自分类Dev

IP限制Google Cloud Storage

来自分类Dev

如何使用Google Cloud Storage作为Delta Lake的存储层?

来自分类Dev

使用generateSignedPostPolicyV4的@ google-cloud / storage错误

来自分类Dev

Google Cloud Storage与Google Cloud CDN

来自分类Dev

如何使用Google API从Google Cloud Storage获取报告

来自分类Dev

如何使用curl从Google Cloud Storage下载文件

来自分类Dev

使用Google Cloud Storage托管图像共享站点的图像

来自分类Dev

必须使用PHP在Google Cloud Storage客户端上登录

来自分类Dev

在本地主机上使用Google Cloud Storage JSON API

来自分类Dev

使用水槽写入Google Cloud Storage上的HDFS / GS所需的最少设置是什么?

来自分类Dev

使用Google Cloud Dataflow合并Google Cloud Storage中的文件

来自分类Dev

Google Cloud Storage - 权限不足

来自分类Dev

Google Cloud Storage API 写入限制

Related 相关文章

  1. 1

    使用NodeJS为Google Cloud Storage创建签名的URL

  2. 2

    上载Google Cloud Storage iOS

  3. 3

    如何使用Java从Google Cloud Storage下载文件?

  4. 4

    Android的Google Cloud Storage管理

  5. 5

    使用Flask将文件上传到Google Cloud Storage

  6. 6

    使用PHP API可恢复上传到Google Cloud Storage

  7. 7

    使用Fog时,Google Cloud Storage中的额外对象

  8. 8

    无法使用Android从Google Cloud Storage网址下载图片

  9. 9

    使用Appengine,连接到其他Google Cloud Storage

  10. 10

    Google Cloud Storage Javascript使用情况

  11. 11

    使用.boto Google Cloud Storage API进行身份验证

  12. 12

    Configure Django and Google Cloud Storage?

  13. 13

    在Appengine上将Google Cloud Storage与Golang结合使用

  14. 14

    如何使用Elixir或Erlang创建Google Cloud Storage签名URL?

  15. 15

    Google Cloud Storage Force下载

  16. 16

    是否使用Java API在Google Cloud Storage中批量插入?

  17. 17

    IP限制Google Cloud Storage

  18. 18

    如何使用Google Cloud Storage作为Delta Lake的存储层?

  19. 19

    使用generateSignedPostPolicyV4的@ google-cloud / storage错误

  20. 20

    Google Cloud Storage与Google Cloud CDN

  21. 21

    如何使用Google API从Google Cloud Storage获取报告

  22. 22

    如何使用curl从Google Cloud Storage下载文件

  23. 23

    使用Google Cloud Storage托管图像共享站点的图像

  24. 24

    必须使用PHP在Google Cloud Storage客户端上登录

  25. 25

    在本地主机上使用Google Cloud Storage JSON API

  26. 26

    使用水槽写入Google Cloud Storage上的HDFS / GS所需的最少设置是什么?

  27. 27

    使用Google Cloud Dataflow合并Google Cloud Storage中的文件

  28. 28

    Google Cloud Storage - 权限不足

  29. 29

    Google Cloud Storage API 写入限制

热门标签

归档