GoとCloud Pub/SubをつかってBaseball Savantのクローラーを作ってみた (original) (raw)

最初に言っておきます, 「野球」っていうワード以上に「エンジニアリング」なエントリーです*1.

野球方面の人は最初の雰囲気を読んで最後まで読むか判断してください, 「オオタニサン」っていうワードは出てこず「Go」「Pub/Sub」「コンテナ」とかそんなのばっかり出てきます🙏

という前置きはさておき, 最近はGoでモノを作るのが好きになってきており, 今はというと,

といった事をしております, なお完成するとこんな感じになりそうな気がしています.

野球データ基盤第二弾の全体像(構想段階)

思ったよりGoで置き換えが可能なマイクロサービス多めで有ることに気が付きました.

その中でも特に「Baseball Savant(Statcast Search)からデータを収集してBigQueryに保存する」というデータ基盤らしい主要機能は現状すべて「Python + Cloud Functions」で処理しているのですが, これを「Go + Cloud Run」で丸っと書き換えを行っています.

Goで書き換え中のデータ基盤機能

作ってみた結果, GoとCloud Pub/Subの実践例として面白そうな気がしてきたのでコードを公開すると同時にブログとしてここに残そうと思います.

github.com

そんな当エントリーのお品書きはこちらとなります.

対象読者と前提知識

GoからGoogle Cloudのサービスを使ったことがある程度のリテラシーが必要となります.

cloud.google.com

pkg.go.dev

この辺の内容が理解できればこの先の話も伝わるかと思います.

なお, 基本的なGoとGoogle Cloudの説明は文脈と文章量の都合上, 必要最低限に抑えます.

やりたい事は何か

まずやりたいことですが,

  1. 毎朝9:00(JST)にクローラーを起動
  2. 前日のStatcastデータを「投手」「打者」ごとに取得
  3. 取得したデータを/YYYY-MM-DD/batter.csv/YYYY-MM-DD/pitcher.csv として保存
  4. 取得したCSVを読み込みBigQueryに保存 ※このエントリーでは扱いません

以上となります.

なおデータはこんな感じで使われます.

データの利用例

シンプルな構成で考えると「1〜4の処理を一つのプログラムとして作ってcrontabで順番に動かす」なのですが(いわゆる「モノリス」なパターン, 別にこの方法でも全然構わない), コードのメンテナンスがキツイのと「CSVだけほしい」みたいな時に自由に使えないので,

  1. 毎朝9:00(JST)にクローラーを起動 → 「trigger」マイクロサービスとして, Cloud Run Jobs*4等の定期実行*5を使って起動, 2以降の処理を起動(下図「処理イメージ」の①).
  2. 前日のStatcastデータを「投手」「打者」ごとに取得 → マイクロサービス「exporter」内でStatcast SearchのURLを組み立て, CSVとして取得(下図「処理イメージ」の②).
  3. 取得したデータを/YYYY-MM-DD/batter.csv/YYYY-MM-DD/pitcher.csv として保存 → マイクロサービス「exporter」内で取得結果をCloud Storageに保存(下図「処理イメージ」の②).
  4. 取得したCSVを読み込みBigQueryに保存 → マイクロサービス「importer」内でCSVを読み込みBigQueryにloading ※このエントリーでは扱いません&Go版は未実装.

という感じにマイクロサービスとして分けて考え, 構築することにしました*6.

処理イメージ

①のtriggerが以下のようなメッセージをPub/Sub TopicにPublishを行い,

{ "season": 2023, "player_type": "batter", "game_date": "2023-08-10T00:00:00Z" }

{ "season": 2023, "player_type": "pitcher", "game_date": "2023-08-10T00:00:00Z" }

②のexporterが上記のメッセージを受け取り(Pub/Sub的にはSubscribeされたメッセージをPullして)処理する.

といった流れになります.

cloud.google.com

基本的には上記Pub/Subクイックスタート「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用でコード自体は開発できちゃいます.

ちなみにPub/Subを使う理由ですが,

といった所が採用した理由となります.

説明で結構な文字数を使った気がします(小声)が, 実装自体は実は大したことはしていません.

という流れとなります.

幸いにも, Baseball Savant(Statcast Search)は「日付(試合日)」「ポジション」をクエリにしてCSVを一発で引けるURLを用意してくれています.

2023-08-12時点の投手成績をCSVで取得

curl 'https://baseballsavant.mlb.com/statcast_search/csv?all=true&hfPT=&hfAB=&hfGT=R%7C&hfPR=&hfZ=&stadium=&hfBBL=&hfNewZones=&hfPull=&hfC=&hfSit=&hfOuts=&opponent=&pitcher_throws=&batter_stands=&hfSA=&hfInfield=&team=&position=&hfOutfield=&hfRO=&home_road=&hfFlag=&hfBBT=&metric_1=&hfInn=&min_pitches=0&min_results=0&group_by=name&sort_col=pitches&player_event_sort=api_p_release_speed&sort_order=desc&min_pas=0&type=details&&player_type=pitcher&game_date_gt=2023-08-12&game_date_lt=2023-08-12&hfSea=2023%7C' > pitcher.csv

上記をうまくExporter内で実装してあげることでいい感じにできそうです&実際にできました.

Trigger

Pub/Sub的にはPublisherの役割です.

おさらい的な話を書くと, 以下のSchemaで定義されたメッセージをpublishできたらOKです.

{ "season": 2023, "player_type": "batter", "game_date": "2023-08-10T00:00:00Z" }

{ "season": 2023, "player_type": "pitcher", "game_date": "2023-08-10T00:00:00Z" }

このコードは「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用で実装できちゃいます.

README.mdのサンプルから抜粋.

package main

import ( "cloud.google.com/go/pubsub" "context" "encoding/json" "log" "time" )

type PlayerType string

const ( PITCHER PlayerType = "pitcher" BATTER PlayerType = "batter" )

type Form struct { Season int validate:"required, min=2015,max=2999" json:"season" PlayerType PlayerType validate:"required" json:"player_type" GameDate time.Time validate:"required" json:"game_date" }

func Publish(ctx context.Context, topic *pubsub.Topic, form Form) { value, err := json.Marshal(form) if err != nil { log.Printf("Request Error: %s", err) } result := topic.Publish(ctx, &pubsub.Message{ Data: value, }) id, err := result.Get(ctx) if err != nil { log.Printf("Publish Error: %s", err) } log.Printf("Published a message; msg ID: %v\n", id) }

func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client { client, err := pubsub.NewClient(ctx, projectId) if err != nil { log.Fatalf("pubsub.NewClient: %v", err) } return client }

func Topic(client *pubsub.Client, topicID string) *pubsub.Topic { topic := client.Topic(topicID) return topic }

func main() {

t := time.Now().UTC()
gameDate := t.Add(-2 * time.Hour * 24)

ctx := context.Background()
client := NewPubSubClient(ctx, GoogleCloudProjectID)
topicExporter := Topic(client, PubTopicIDExporter)


formBatter := Form{Season: 2023, GameDate: gameDate, PlayerType: BATTER}
log.Printf("export batter game_date: %s", formBatter.GameDate)
Publish(ctx, topicExporter, formBatter)
log.Print("export batter end")


formPitcher := Form{Season: 2023, GameDate: gameDate, PlayerType: PITCHER}
log.Printf("export pitcher game_date: %s", formPitcher.GameDate)
Publish(ctx, topicExporter, formPitcher)
log.Print("export pitcher end")

}

全体で100行もいかない程度で書けちゃいました.

Exporter

Triggerは単なるメッセージを投げる役割なのでシュッとしたコードになりましたが, 受け手のExporterはやることが多いので分けて紹介します.

main

main.go自体はシンプルです.

「Pub/SubをSubscribeしてからの処理」をサラッと書いています.

package main

import ( "context" "fmt" "log"

"cloud.google.com/go/pubsub"
"github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/gcp"
"github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/savant"

)

func main() {

ctx := context.Background()
gcs := gcp.NewStorageClient(ctx)
bucket := gcp.GetBucket(gcs, GcsBucketName)
client := gcp.NewPubSubClient(ctx, GoogleCloudProjectID)


sub := gcp.Subscription(client, PubSubSubscriptionID)
sub.ReceiveSettings.MaxOutstandingMessages = 10


err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
    
    form, err2 := savant.CreateForm(string(msg.Data))
    if err2 != nil {
        log.Print(err2)
        return
    }

    
    query := savant.Query(form)
    filename := savant.Filename(form)

    
    body, _ := savant.FetchAndAsString(query)
    gcp.WriteObject(bucket, fmt.Sprintf("%s/%s", GcsPathName, filename), body, ctx)
    log.Print(fmt.Sprintf("saved: %s", filename))
    msg.Ack()
})
if err != nil {
    log.Fatalf("sub.Receive: %s", err)
}

}

baseball savant

Baseball Savant(Statcast Search)周りの処理.

といってもやることは,

以上となります.

/savant/main.goに書いた主要な処理はこちらです.

package savant

import ( "fmt" "io" "net/http" )

const ( BASE_URL = "https://baseballsavant.mlb.com/statcast_search/csv" PARAMETERS = "all=true&hfPT=&hfAB=&hfGT=R%7C&hfPR=&hfZ=&stadium=&hfBBL=&hfNewZones=&hfPull=&hfC=&hfSit=&hfOuts=&opponent=&pitcher_throws=&batter_stands=&hfSA=&hfInfield=&team=&position=&hfOutfield=&hfRO=&home_road=&hfFlag=&hfBBT=&metric_1=&hfInn=&min_pitches=0&min_results=0&group_by=name&sort_col=pitches&player_event_sort=api_p_release_speed&sort_order=desc&min_pas=0&type=details&" QUERY_FORMAT = "player_type=%s&game_date_gt=%s&game_date_lt=%s&hfSea=%d" FILE_FORMAT = "%s/%s.csv" GAME_DT_FORMAT = "2006-01-02" )

func Query(form Form) string { params := fmt.Sprintf(QUERY_FORMAT, form.PlayerType, form.GameDate.Format(GAME_DT_FORMAT), form.GameDate.Format(GAME_DT_FORMAT), form.Season, ) return fmt.Sprintf("%s?%s&%s%s", BASE_URL, PARAMETERS, params, "%7C") }

func Filename(form Form) string { filename := fmt.Sprintf(FILE_FORMAT, form.GameDate.Format(GAME_DT_FORMAT), form.PlayerType, ) return filename }

func FetchAndAsString(query string) (string, error) {

resp, err := http.Get(query)
if err != nil {
    return "", err
}
defer resp.Body.Close()


body, err := io.ReadAll(resp.Body)
if err != nil {
    return "", err
}

return string(body), nil

}

/savant/form.goにFormなどのI/Fを定義.

package savant

import ( "encoding/json" "time" )

type PlayerType string

const ( PITCHER PlayerType = "pitcher" BATTER PlayerType = "batter" )

func CreateForm(value string) (Form, error) { var form Form if err := json.Unmarshal([]byte(value), &form); err != nil { return form, err

}
return form, nil

}

type Form struct { Season int validate:"required, min=2015,max=2999" json:"season" PlayerType PlayerType validate:"required" json:"player_type" GameDate time.Time validate:"required" json:"game_date" }

Google Cloud周りの処理

以下を参考に写経して試した後, 整理しました.

cloud.google.com

pkg.go.dev

整理して作った結果がこちら.

/gcp/pubsub.goにPub/Sub処理を固めて,

package gcp

import ( "cloud.google.com/go/pubsub" "context" "log" )

func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client { client, err := pubsub.NewClient(ctx, projectId) if err != nil { log.Fatalf("pubsub.NewClient: %v", err) } return client }

func Subscription(client *pubsub.Client, subID string) *pubsub.Subscription { sub := client.Subscription(subID) return sub }

/gcp/gcs.goにCloud Storageに保存する処理を実装.

package gcp

import ( "cloud.google.com/go/storage" "context" "fmt" "log" )

func NewStorageClient(ctx context.Context) *storage.Client { client, err := storage.NewClient(ctx) if err != nil { log.Fatalf("storage.NewClient: %v", err) } return client }

func GetBucket(client *storage.Client, name string) *storage.BucketHandle { bucket := client.Bucket(name) return bucket

}

func WriteObject(bkt *storage.BucketHandle, name string, value string, ctx context.Context) { obj := bkt.Object(name) w := obj.NewWriter(ctx) if _, err := fmt.Fprintf(w, value); err != nil { log.Fatalf("fmt.Fprintf: %v", err) } if err := w.Close(); err != nil { log.Fatalf("w.Close: %v", err) } }

毎回思うのですが, Google Cloudのクイックスタートは写経で動くので楽です, その後のアレンジに集中できるのは強みかもしれません.

結び

結果として手元で意図通り動いたので,

を引き続き進めつつ, BigQueryに保存する処理もどうにかしたいと思っています.

今回は元々Pythonで作っていたものを元にGoで作り直すというアプローチで開発しましたが, Goは本当に使いやすいなと思いました.

しばらくこのまま開発をして, 何かしらの続編を年内に出せたらと思っています.

そしてそろそろ野球の新ネタも...乞うご期待ください, 最後までお読みいただきありがとうございました.