卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章35246本站已运行394

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

apache beam 从 go 中的 pcollection 中选择前 n 行

Apache Beam 是一个开源的分布式数据处理框架,它提供了一种统一的编程模型,可以在不同的批处理和流处理引擎上运行。最近,Apache Beam 的 Go SDK 中新增了一个非常有用的功能——从 PCollection 中选择前 N 行。这个功能对于需要对大型数据集进行采样或者快速预览的场景非常有帮助。在本文中,我们将介绍如何在 Apache Beam 的 Go SDK 中使用这个功能,并展示一些实际的示例代码。让我们开始吧!

问题内容

我有一个 pcollection,我需要从中选择 n 个最大的行。我正在尝试使用 go 创建一个数据流管道并陷入困境。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

从上面的代码中,我需要根据 user.age 选择前 5 行 我发现链接顶部包具有相同的功能,但它说它返回单个元素 pcollection。有什么不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我像上面一样添加了选择前 5 行的函数,但出现错误 []main.user is not allocate to main.user

我需要与以前相同格式的 pcollection,因为我需要进一步处理。我怀疑这是因为 top.largest 函数返回单个元素 pcollection。关于如何转换格式有什么想法吗?

解决方法

最好的 pcollection 是 []user

所以尝试一下...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
卓越飞翔博客
上一篇: 尝试使用 golang 客户端为 google 工作区目录创建架构后,请求的身份验证范围不足
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏