GoとCloud Pub/SubをつかってBaseball Savantのクローラーを作ってみた (original) (raw)
最初に言っておきます, 「野球」っていうワード以上に「エンジニアリング」なエントリーです*1.
野球方面の人は最初の雰囲気を読んで最後まで読むか判断してください, 「オオタニサン」っていうワードは出てこず「Go」「Pub/Sub」「コンテナ」とかそんなのばっかり出てきます🙏
という前置きはさておき, 最近はGoでモノを作るのが好きになってきており, 今はというと,
- PyCon JP 2022およびデブサミでお披露目した「野球データ基盤」の刷新をGoで少しずつやる.
- 野球データ基盤を構成するマイクロサービスのうち, Goでやったほうが効率良さそうなサービスをGoに書き換え.
- クラウドサービスは引き続きGoogle Cloud(これはマスト)だが, マイクロサービスのワークロードはCloud Runに統一*2, 可能なら「マルチコンテナ*3」を採用してサービスを一つに統一したい.
といった事をしております, なお完成するとこんな感じになりそうな気がしています.
野球データ基盤第二弾の全体像(構想段階)
思ったよりGoで置き換えが可能なマイクロサービス多めで有ることに気が付きました.
その中でも特に「Baseball Savant(Statcast Search)からデータを収集してBigQueryに保存する」というデータ基盤らしい主要機能は現状すべて「Python + Cloud Functions」で処理しているのですが, これを「Go + Cloud Run」で丸っと書き換えを行っています.
Goで書き換え中のデータ基盤機能
作ってみた結果, GoとCloud Pub/Subの実践例として面白そうな気がしてきたのでコードを公開すると同時にブログとしてここに残そうと思います.
そんな当エントリーのお品書きはこちらとなります.
対象読者と前提知識
GoからGoogle Cloudのサービスを使ったことがある程度のリテラシーが必要となります.
この辺の内容が理解できればこの先の話も伝わるかと思います.
なお, 基本的なGoとGoogle Cloudの説明は文脈と文章量の都合上, 必要最低限に抑えます.
やりたい事は何か
まずやりたいことですが,
- 毎朝9:00(JST)にクローラーを起動
- 前日のStatcastデータを「投手」「打者」ごとに取得
- 取得したデータを
/YYYY-MM-DD/batter.csv
と/YYYY-MM-DD/pitcher.csv
として保存 - 取得したCSVを読み込みBigQueryに保存 ※このエントリーでは扱いません
以上となります.
なおデータはこんな感じで使われます.
データの利用例
シンプルな構成で考えると「1〜4の処理を一つのプログラムとして作ってcrontabで順番に動かす」なのですが(いわゆる「モノリス」なパターン, 別にこの方法でも全然構わない), コードのメンテナンスがキツイのと「CSVだけほしい」みたいな時に自由に使えないので,
- 毎朝9:00(JST)にクローラーを起動 → 「trigger」マイクロサービスとして, Cloud Run Jobs*4等の定期実行*5を使って起動, 2以降の処理を起動(下図「処理イメージ」の①).
- 前日のStatcastデータを「投手」「打者」ごとに取得 → マイクロサービス「exporter」内でStatcast SearchのURLを組み立て, CSVとして取得(下図「処理イメージ」の②).
- 取得したデータを
/YYYY-MM-DD/batter.csv
と/YYYY-MM-DD/pitcher.csv
として保存 → マイクロサービス「exporter」内で取得結果をCloud Storageに保存(下図「処理イメージ」の②). - 取得したCSVを読み込みBigQueryに保存 → マイクロサービス「importer」内でCSVを読み込みBigQueryにloading ※このエントリーでは扱いません&Go版は未実装.
という感じにマイクロサービスとして分けて考え, 構築することにしました*6.
処理イメージ
①のtriggerが以下のようなメッセージをPub/Sub TopicにPublishを行い,
{ "season": 2023, "player_type": "batter", "game_date": "2023-08-10T00:00:00Z" }
{ "season": 2023, "player_type": "pitcher", "game_date": "2023-08-10T00:00:00Z" }
②のexporterが上記のメッセージを受け取り(Pub/Sub的にはSubscribeされたメッセージをPullして)処理する.
といった流れになります.
基本的には上記Pub/Subクイックスタート「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用でコード自体は開発できちゃいます.
ちなみにPub/Subを使う理由ですが,
- 使い慣れているから(最大の理由).
- サービスの性質上, 外部公開(publicに外に出す)ものではなく, すべてinternalなネットワークに置きたかった.
- 「内部限定のWeb APIにしてアクセスする」方法もあるが, Pub/Subでのピタゴラスイッチの方が利点が多い*7.
といった所が採用した理由となります.
説明で結構な文字数を使った気がします(小声)が, 実装自体は実は大したことはしていません.
- Trigger(Pub/Sub的にはPublisher)がPub/Subにメッセージを投げる
- Exporter(Pub/Sub的にはSubscriber)がメッセージを受け取り,
- パラメーターからURLを組み立てる
- http getしてCSVを取得
- 取得したCSVをCloud Storageに保存
という流れとなります.
幸いにも, Baseball Savant(Statcast Search)は「日付(試合日)」「ポジション」をクエリにしてCSVを一発で引けるURLを用意してくれています.
2023-08-12時点の投手成績をCSVで取得
上記をうまくExporter内で実装してあげることでいい感じにできそうです&実際にできました.
Trigger
Pub/Sub的にはPublisherの役割です.
おさらい的な話を書くと, 以下のSchemaで定義されたメッセージをpublishできたらOKです.
{ "season": 2023, "player_type": "batter", "game_date": "2023-08-10T00:00:00Z" }
{ "season": 2023, "player_type": "pitcher", "game_date": "2023-08-10T00:00:00Z" }
このコードは「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用で実装できちゃいます.
※README.mdのサンプルから抜粋.
package main
import ( "cloud.google.com/go/pubsub" "context" "encoding/json" "log" "time" )
type PlayerType string
const ( PITCHER PlayerType = "pitcher" BATTER PlayerType = "batter" )
type Form struct {
Season int validate:"required, min=2015,max=2999" json:"season"
PlayerType PlayerType validate:"required" json:"player_type"
GameDate time.Time validate:"required" json:"game_date"
}
func Publish(ctx context.Context, topic *pubsub.Topic, form Form) { value, err := json.Marshal(form) if err != nil { log.Printf("Request Error: %s", err) } result := topic.Publish(ctx, &pubsub.Message{ Data: value, }) id, err := result.Get(ctx) if err != nil { log.Printf("Publish Error: %s", err) } log.Printf("Published a message; msg ID: %v\n", id) }
func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client { client, err := pubsub.NewClient(ctx, projectId) if err != nil { log.Fatalf("pubsub.NewClient: %v", err) } return client }
func Topic(client *pubsub.Client, topicID string) *pubsub.Topic { topic := client.Topic(topicID) return topic }
func main() {
t := time.Now().UTC()
gameDate := t.Add(-2 * time.Hour * 24)
ctx := context.Background()
client := NewPubSubClient(ctx, GoogleCloudProjectID)
topicExporter := Topic(client, PubTopicIDExporter)
formBatter := Form{Season: 2023, GameDate: gameDate, PlayerType: BATTER}
log.Printf("export batter game_date: %s", formBatter.GameDate)
Publish(ctx, topicExporter, formBatter)
log.Print("export batter end")
formPitcher := Form{Season: 2023, GameDate: gameDate, PlayerType: PITCHER}
log.Printf("export pitcher game_date: %s", formPitcher.GameDate)
Publish(ctx, topicExporter, formPitcher)
log.Print("export pitcher end")
}
全体で100行もいかない程度で書けちゃいました.
Exporter
Triggerは単なるメッセージを投げる役割なのでシュッとしたコードになりましたが, 受け手のExporterはやることが多いので分けて紹介します.
main
main.go自体はシンプルです.
「Pub/SubをSubscribeしてからの処理」をサラッと書いています.
package main
import ( "context" "fmt" "log"
"cloud.google.com/go/pubsub"
"github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/gcp"
"github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/savant"
)
func main() {
ctx := context.Background()
gcs := gcp.NewStorageClient(ctx)
bucket := gcp.GetBucket(gcs, GcsBucketName)
client := gcp.NewPubSubClient(ctx, GoogleCloudProjectID)
sub := gcp.Subscription(client, PubSubSubscriptionID)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
form, err2 := savant.CreateForm(string(msg.Data))
if err2 != nil {
log.Print(err2)
return
}
query := savant.Query(form)
filename := savant.Filename(form)
body, _ := savant.FetchAndAsString(query)
gcp.WriteObject(bucket, fmt.Sprintf("%s/%s", GcsPathName, filename), body, ctx)
log.Print(fmt.Sprintf("saved: %s", filename))
msg.Ack()
})
if err != nil {
log.Fatalf("sub.Receive: %s", err)
}
}
baseball savant
Baseball Savant(Statcast Search)周りの処理.
といってもやることは,
- URLを作る
- 保存先のファイル名・パスを決める
以上となります.
/savant/main.goに書いた主要な処理はこちらです.
package savant
import ( "fmt" "io" "net/http" )
const ( BASE_URL = "https://baseballsavant.mlb.com/statcast_search/csv" PARAMETERS = "all=true&hfPT=&hfAB=&hfGT=R%7C&hfPR=&hfZ=&stadium=&hfBBL=&hfNewZones=&hfPull=&hfC=&hfSit=&hfOuts=&opponent=&pitcher_throws=&batter_stands=&hfSA=&hfInfield=&team=&position=&hfOutfield=&hfRO=&home_road=&hfFlag=&hfBBT=&metric_1=&hfInn=&min_pitches=0&min_results=0&group_by=name&sort_col=pitches&player_event_sort=api_p_release_speed&sort_order=desc&min_pas=0&type=details&" QUERY_FORMAT = "player_type=%s&game_date_gt=%s&game_date_lt=%s&hfSea=%d" FILE_FORMAT = "%s/%s.csv" GAME_DT_FORMAT = "2006-01-02" )
func Query(form Form) string { params := fmt.Sprintf(QUERY_FORMAT, form.PlayerType, form.GameDate.Format(GAME_DT_FORMAT), form.GameDate.Format(GAME_DT_FORMAT), form.Season, ) return fmt.Sprintf("%s?%s&%s%s", BASE_URL, PARAMETERS, params, "%7C") }
func Filename(form Form) string { filename := fmt.Sprintf(FILE_FORMAT, form.GameDate.Format(GAME_DT_FORMAT), form.PlayerType, ) return filename }
func FetchAndAsString(query string) (string, error) {
resp, err := http.Get(query)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
/savant/form.goにFormなどのI/Fを定義.
package savant
import ( "encoding/json" "time" )
type PlayerType string
const ( PITCHER PlayerType = "pitcher" BATTER PlayerType = "batter" )
func CreateForm(value string) (Form, error) { var form Form if err := json.Unmarshal([]byte(value), &form); err != nil { return form, err
}
return form, nil
}
type Form struct {
Season int validate:"required, min=2015,max=2999" json:"season"
PlayerType PlayerType validate:"required" json:"player_type"
GameDate time.Time validate:"required" json:"game_date"
}
Google Cloud周りの処理
以下を参考に写経して試した後, 整理しました.
整理して作った結果がこちら.
/gcp/pubsub.goにPub/Sub処理を固めて,
package gcp
import ( "cloud.google.com/go/pubsub" "context" "log" )
func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client { client, err := pubsub.NewClient(ctx, projectId) if err != nil { log.Fatalf("pubsub.NewClient: %v", err) } return client }
func Subscription(client *pubsub.Client, subID string) *pubsub.Subscription { sub := client.Subscription(subID) return sub }
/gcp/gcs.goにCloud Storageに保存する処理を実装.
package gcp
import ( "cloud.google.com/go/storage" "context" "fmt" "log" )
func NewStorageClient(ctx context.Context) *storage.Client { client, err := storage.NewClient(ctx) if err != nil { log.Fatalf("storage.NewClient: %v", err) } return client }
func GetBucket(client *storage.Client, name string) *storage.BucketHandle { bucket := client.Bucket(name) return bucket
}
func WriteObject(bkt *storage.BucketHandle, name string, value string, ctx context.Context) { obj := bkt.Object(name) w := obj.NewWriter(ctx) if _, err := fmt.Fprintf(w, value); err != nil { log.Fatalf("fmt.Fprintf: %v", err) } if err := w.Close(); err != nil { log.Fatalf("w.Close: %v", err) } }
毎回思うのですが, Google Cloudのクイックスタートは写経で動くので楽です, その後のアレンジに集中できるのは強みかもしれません.
結び
結果として手元で意図通り動いたので,
- Cloud Runにホスト
- Trigger部分をCloud Run Jobsで動かしてバッチ処理化
を引き続き進めつつ, BigQueryに保存する処理もどうにかしたいと思っています.
今回は元々Pythonで作っていたものを元にGoで作り直すというアプローチで開発しましたが, Goは本当に使いやすいなと思いました.
しばらくこのまま開発をして, 何かしらの続編を年内に出せたらと思っています.
そしてそろそろ野球の新ネタも...乞うご期待ください, 最後までお読みいただきありがとうございました.