嗨,我正在尝试编写一个梁管道,我们首先从 bigquery 读取数据,然后对每一行进行转换(如果值存在于地图中,则使用它,否则将值插入地图中并使用它),然后写入它到大查询。我写了以下代码
// Row struct for bigquery
type Row struct {
Operation string `bigquery:"operation"`
SrcZone string `bigquery:"src_zone"`
BucketLocation string `bigquery:"bucket_location"`
BucketName string `bigquery:"bucket_name"`
}
// UserDataMap struct for user data
type UserDataMap struct {
mu sync.Mutex
data map[string]string
}
func init() {
beam.RegisterFunction(transformRow)
// beam.RegisterFunction(writeData)
beam.RegisterType(reflect.TypeOf((*Row)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*UserDataMap)(nil)).Elem())
}
func (dataMap *UserDataMap) ProcessElement(ctx context.Context, row Row, emit func(beam.X)) error {
dataMap.mu.Lock()
defer dataMap.mu.Unlock()
fmt.Printf("map: %v\n", dataMap.data)
if _, ok := dataMap.data[row.SrcZone]; !ok {
dataMap.data[row.SrcZone] = uuid.New()
}
row.SrcZone = dataMap.data[row.SrcZone]
emit(row)
return nil
}
func main() {
project := "pulkitaggarwal-gcs-prober"
ipDataset := "benchmarks"
ipTable := "latency_results"
opDataset := "dataflow_dataset"
opTable := "dataflow_table"
ctx := context.Background()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
rows := bigqueryio.Read(s, project, fmt.Sprintf("%s:%s.%s", project, ipDataset, ipTable), reflect.TypeOf(Row{}))
dataMap := &UserDataMap{data: make(map[string]string)}
transformedRows := beam.ParDo(s, dataMap, rows, beam.TypeDefinition{Var: beam.XType, T: reflect.TypeOf(Row{})})
bigqueryio.Write(s, project, fmt.Sprintf("%s:%s.%s", project, opDataset, opTable), transformedRows)
if err := beamx.Run(ctx, p); err != nil {
fmt.Printf("failed to run pipeline: %v", err)
}
}
但是它会引发错误恐慌:分配给 nil 映射中的条目。看起来 UserDataMap 被传递为 nil。
我什至尝试通过引用传递所有内容,但仍然遇到相同的错误。我尝试在 ParDo 函数中传递的任何参数都为零。我不知道为什么会发生这种情况,还是我遗漏了什么?
如果没有真正理解管道及其设置方式,很明显,如果没有定义任何实际初始化
map
的构造函数,您基本上是在尝试写入 nil
映射,这会导致 panic
您看到了。
解决此问题的一个技巧是在获取锁后检查
map
是否为 nil
:
dataMap.mu.Lock()
defer dataMap.mu.Unlock()
if dataMap.data == nil {
dataMap.data = make(map[string]string)
}
然而,正确的解决方案是使用一个构造函数来创建
UserDataMap
的初始化实例,例如:
func NewUserDataMap() *UserDataMap {
return &UserDataMap{ data: make(map[string]string }
}
调用构造函数后,您会以某种方式将正确的初始化实例传递给管道。 另一种选择是使用可插入函数,并通过
main
中的匿名函数来确定所需的映射范围。