传递给 ParDo 函数时变量/结构体值变为 nil

问题描述 投票:0回答:1

嗨,我正在尝试编写一个梁管道,我们首先从 bigquery 读取数据,然后对每一行进行转换(如果值存在于地图中,则使用它,否则将值插入地图中并使用它),然后写入它到大查询。我写了以下代码

// Row struct for bigquery
type Row struct {
    Operation      string    `bigquery:"operation"`
    SrcZone        string    `bigquery:"src_zone"`
    BucketLocation string    `bigquery:"bucket_location"`
    BucketName     string    `bigquery:"bucket_name"`

}

// UserDataMap struct for user data
type UserDataMap struct {
    mu   sync.Mutex
    data map[string]string
}

func init() {
    beam.RegisterFunction(transformRow)
    // beam.RegisterFunction(writeData)
    beam.RegisterType(reflect.TypeOf((*Row)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*UserDataMap)(nil)).Elem())
}

func (dataMap *UserDataMap) ProcessElement(ctx context.Context, row Row, emit func(beam.X)) error {
    dataMap.mu.Lock()
    defer dataMap.mu.Unlock()

    fmt.Printf("map: %v\n", dataMap.data)

    if _, ok := dataMap.data[row.SrcZone]; !ok {
        dataMap.data[row.SrcZone] = uuid.New()
    }
    row.SrcZone = dataMap.data[row.SrcZone]
    emit(row)
    return nil
}

func main() {
    project := "pulkitaggarwal-gcs-prober"
    ipDataset := "benchmarks"
    ipTable := "latency_results"
    opDataset := "dataflow_dataset"
    opTable := "dataflow_table"

    ctx := context.Background()
    beam.Init()

    p := beam.NewPipeline()
    s := p.Root()

    rows := bigqueryio.Read(s, project, fmt.Sprintf("%s:%s.%s", project, ipDataset, ipTable), reflect.TypeOf(Row{}))

    dataMap := &UserDataMap{data: make(map[string]string)}
    transformedRows := beam.ParDo(s, dataMap, rows, beam.TypeDefinition{Var: beam.XType, T: reflect.TypeOf(Row{})})

    bigqueryio.Write(s, project, fmt.Sprintf("%s:%s.%s", project, opDataset, opTable), transformedRows)

    if err := beamx.Run(ctx, p); err != nil {
        fmt.Printf("failed to run pipeline: %v", err)
    }
}

但是它会引发错误恐慌:分配给 nil 映射中的条目。看起来 UserDataMap 被传递为 nil。

我什至尝试通过引用传递所有内容,但仍然遇到相同的错误。我尝试在 ParDo 函数中传递的任何参数都为零。我不知道为什么会发生这种情况,还是我遗漏了什么?

go google-cloud-dataflow apache-beam
1个回答
0
投票

如果没有真正理解管道及其设置方式,很明显,如果没有定义任何实际初始化

map
的构造函数,您基本上是在尝试写入
nil
映射,这会导致
panic
您看到了。

解决此问题的一个技巧是在获取锁后检查

map
是否为
nil

dataMap.mu.Lock()
defer dataMap.mu.Unlock()
if dataMap.data == nil { 
   dataMap.data = make(map[string]string) 
}

然而,正确的解决方案是使用一个构造函数来创建

UserDataMap
的初始化实例,例如:

func NewUserDataMap() *UserDataMap {
 return &UserDataMap{ data: make(map[string]string }
}

调用构造函数后,您会以某种方式将正确的初始化实例传递给管道。 另一种选择是使用可插入函数,并通过

main
中的匿名函数来确定所需的映射范围。

© www.soinside.com 2019 - 2024. All rights reserved.