卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章34956本站已运行393

在 Go SDK 中等待 AWS Athena 查询执行

在 go sdk 中等待 aws athena 查询执行

php小编百草在Go SDK中等待AWS Athena查询执行指南中,旨在帮助开发者优化查询性能和提高应用程序的响应速度。AWS Athena是一种无服务器查询服务,可以直接在S3存储桶中运行SQL查询,无需预先定义模式或进行数据加载。然而,由于查询的异步执行特性,开发者需要等待查询完成后才能获取结果。本指南将介绍如何使用Go SDK中的等待机制,以便在查询执行期间进行有效的等待,并在查询完成后获取结果,从而提供更好的用户体验和应用程序性能。

问题内容

我有一个运行 athena 查询的工作代码,并通过使用以下代码轮询 getqueryresults 返回的 error 来等待查询完成:

func getqueryresults(client *athena.client, queryid *string) []types.row {

    params := &athena.getqueryresultsinput{
        queryexecutionid: queryid,
    }

    data, err := client.getqueryresults(context.todo(), params)

    for err != nil {
        println(err.error())
        time.sleep(time.second)
        data, err = client.getqueryresults(context.todo(), params)
    }

    return data.resultset.rows
}

问题是,如果查询失败,我绝对没有办法打破循环。

例如,在 python 中我可以执行以下操作:

    while athena.get_query_execution(QueryExecutionId=execution_id)["QueryExecution"][
        "Status"
    ]["State"] in ["RUNNING", "QUEUED"]:
        sleep(2)

我可以在 for 循环内进行类似 strings.contains(err.error(),"failed") 的检查,但我正在寻找一种更干净的方法。

我尝试寻找 go 的等效项,但没有成功。 go sdk有没有可以返回执行状态的函数?有没有更好的方法来进一步检查 go 中的错误而不是 err != nil

解决方法

sdk已提供重试功能。

这是一个使用 aws-sdk-go-v2 的示例。

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/service/athena"
    "github.com/aws/aws-sdk-go-v2/service/athena/types"
)

func main() {
    cfg := aws.NewConfig()
    ath := athena.NewFromConfig(*cfg)

    ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
    defer cancelFunc()

    rows, err := GetQueryResults(ctx, ath, aws.String("query-id"), 10)
    if err != nil {
        panic(err) // TODO: handle error
    }

    fmt.Println(rows)
}

func GetQueryResults(ctx context.Context, client *athena.Client, QueryID *string, attempts int) ([]types.Row, error) {
    t := time.NewTicker(time.Second * 5)
    defer t.Stop()

    attemptsFunc := func(o *athena.Options) { o.RetryMaxAttempts = attempts }

WAIT:
    for {
        select {
        case <-t.C:
            out, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{
                QueryExecutionId: QueryID,
            }, attemptsFunc)
            if err != nil {
                return nil, err
            }

            switch out.QueryExecution.Status.State {
            case types.QueryExecutionStateCancelled,
                types.QueryExecutionStateFailed,
                types.QueryExecutionStateSucceeded:
                break WAIT
            }

        case <-ctx.Done():
            break WAIT
        }
    }

    data, err := client.GetQueryResults(ctx, &athena.GetQueryResultsInput{
        QueryExecutionId: QueryID,
    })
    if err != nil {
        return nil, err
    }

    return data.ResultSet.Rows, nil
}
卓越飞翔博客
上一篇: 获取基于原始类型的类型的reflect.Kind
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏