使用Go SDK进行云数据流的并行性问题

问题描述 投票:1回答:1

我在Go SDK上有Apache Beam代码实现,如下所述。管道有3个步骤。一个是textio.Read,另一个是CountLines,最后一步是ProcessLinesProcessLines步骤大约需要10秒钟。为简洁起见,我刚刚添加了睡眠功能。

我打电话给20名工人。当我运行管道时,我的期望是20名工人将并行运行,textio.Read从文件读取20行,ProcessLines将在10秒内执行20次并行执行。但是,管道并没有像那样工作。它目前的工作方式是textio.Read从文件读取一行,将数据推送到下一步并等待,直到ProcessLines步骤完成其10秒工作。没有并行性,整个管道中的文件只有一个行字符串。你能否澄清一下我对并行性的错误?如何更新代码以实现如上所述的并行性?

package main

import (
    "context"
    "flag"
    "time"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

// metrics to be monitored
var (
    input         = flag.String("input", "", "Input file (required).")
    numberOfLines = beam.NewCounter("extract", "numberOfLines")
    lineLen       = beam.NewDistribution("extract", "lineLenDistro")
)

func countLines(ctx context.Context, line string) string {
    lineLen.Update(ctx, int64(len(line)))
    numberOfLines.Inc(ctx, 1)

    return line
}

func processLines(ctx context.Context, line string) {
    time.Sleep(10 * time.Second)
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection 
{
    s = s.Scope("Count Lines")

    return beam.ParDo(s, countLines, lines)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")

    beam.ParDo0(s, processLines, lines)
}

func main() {
    // If beamx or Go flags are used, flags must be parsed first.
    flag.Parse()
    // beam.Init() is an initialization hook that must be called on startup. On
    // distributed runners, it is used to intercept control.
    beam.Init()

    // Input validation is done as usual. Note that it must be after Init().
    if *input == "" {
        log.Fatal(context.Background(), "No input file provided")
    }

    p := beam.NewPipeline()
    s := p.Root()

    l := textio.Read(s, *input)
    lines := CountLines(s, l)
    ProcessLines(s, lines)

    // Concept #1: The beamx.Run convenience wrapper allows a number of
    // pre-defined runners to be used via the --runner flag.
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf(context.Background(), "Failed to execute job: %v", err.Error())
    }
}

编辑:

在我得到关于问题的答案可能是由融合引起的之后,我改变了代码的相关部分,但它没有再次起作用。

现在第一步和第二步是并行工作,但第三步ProcessLines不是并行工作。我只进行了以下更改。谁能告诉我这是什么问题?

func AddRandomKey(s beam.Scope, col beam.PCollection) beam.PCollection {
    return beam.ParDo(s, addRandomKeyFn, col)
}

func addRandomKeyFn(elm beam.T) (int, beam.T) {
    return rand.Int(), elm
}

func countLines(ctx context.Context, _ int, lines func(*string) bool, emit func(string)) {
    var line string
    for lines(&line) {
        lineLen.Update(ctx, int64(len(line)))
        numberOfLines.Inc(ctx, 1)
        emit(line)
    }
}
func processLines(ctx context.Context, _ int, lines func(*string) bool) {
    var line string
    for lines(&line) {
        time.Sleep(10 * time.Second)
        numberOfLinesProcess.Inc(ctx, 1)
    }
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("Count Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    return beam.ParDo(s, countLines, grouped)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    beam.ParDo0(s, processLines, grouped)
}
go google-cloud-dataflow apache-beam
1个回答
0
投票

许多MapReduce类型管道的高级运行器融合了可以在内存中一起运行的阶段。 Apache Beam和Dataflow也不例外。

这里发生的是你的管道的三个步骤被融合,并发生在同一台机器上。此外,遗憾的是,Go SDK目前不支持拆分Read转换。

为了在第三个变换中实现并行性,你可以打破ReadProcessLines之间的融合。您可以这样做,为您的线添加随机键,以及GroupByKey变换。

在Python中,它将是:

(p | beam.ReadFromText(...)
   | CountLines()
   | beam.Map(lambda x: (random.randint(0, 1000), x))
   | beam.GroupByKey()
   | beam.FlatMap(lambda k, v: v)  # Discard the key, and return the values
   | ProcessLines())

这将允许您并行化ProcessLines

© www.soinside.com 2019 - 2024. All rights reserved.