機械学習パイプラインLuigiのタスク同士の関係を良い感じに可視化する方法 (original) (raw)

はじめに

ドワンゴ教育事業でデータサイエンティストとして働いている中井です。

この記事では、PythonのパイプラインパッケージであるLuigiで構築したパイプラインにおいて、それを構成するタスク間の依存関係・タスクのグループ間(task_namespace で分けられる)の依存関係を良い感じに出力する方法についてお話しします。想定する読者はある程度Luigiを使ったことのある方としています。

Luigiではタスク全体の依存関係を出力できますが、大規模なタスクだともう少し荒い粒度であったり、全体のうちの一部だけ見たいといったこともあると思います。この記事を読むことでそのような荒い粒度の可視化やパイプラインの一部分に注目した可視化ができるようになります。この記事ではまずLuigiを使っていて課題に感じている部分について説明した後に、可視化対象のサンプルパイプラインについて少し触れて、そのパイプラインからの依存関係抽出と可視化について説明していきます。

なお、この記事で実装した可視化機能は記事の最後の付録にあります。その他のパイプラインのコードやサンプルデータもこちらで確認いただけます。

課題について

機械学習パイプラインの必要性とLuigiの導入について

現在、ZEN Studyでは学力定量化の取り組みを行っています。この一環として、問題の難易度推定やユーザの学力推定をする仕組みの構築を進めてきました。各種ログデータやマスターデータを合わせてデータ処理を行い妥当性のある難易度・学力推定をするためにはそれなりに大掛かりな仕組みが必要になることから、全体を適当な粒度でタスクに分割して設計・実装のできる機械学習パイプラインを導入することにしました。

機械学習パイプラインにおいて有名なAirflowでも良いのですがLuigiは比較的シンプルであることからこちらを選択しました。

実業務でLuigiを用いた時に生じた問題

多くのテーブルからデータを抽出して、加工・集計・データ分析をするパイプラインを構築するためには、どうしても多くのタスクを定義する必要が出てきてしまいます。
このように多くのタスクから構成されるパイプラインにおいて、タスクをグループ分けして階層化することはタスクの整理するうえで有効なアプローチです。そのために、Luigiではタスクを定義する際にクラス変数として task_namespace を定めておく方法があります。task_namespace でタスクをグループ分けしてLuigiに組み込まれている依存関係可視化機能で可視化した例が以下の画像です。パイプラインを構成する各タスクが {task_namespace}.{タスク名} と表示され、そのタスク間の依存関係が矢印で表現されていてわかりやすいですね。

Luigi組み込み機能による可視化の例

ですが、このパイプラインを構成するタスクの数が何十個にも増えてしまったときには、Luigiに組み込まれている可視化機能では何十個ものノードとそれらの間のエッジの織りなす複雑怪奇な巨大グラフが表示されてしまうことになります。人間が読み解くのはかなり無理がありそうですね……。

やりたいことと実装する機能

というわけで、パイプラインから依存関係を良い感じに抽出して可視化したい気持ちが生まれてきます。最終的なアウトプットで以下のようなことができるとパイプラインの設計・実装・レビュー時に嬉しいことがありそうです。

これらのことができるように以下を実装します!

やったこと

簡単な集計をするパイプラインの構築と今回実装した機能を使った依存関係抽出と可視化を行いました。

サンプルパイプラインの実装

まず、依存関係可視化対象のパイプラインについて説明します。

以下のようなe-learningサービスの問題の難易度や品質を測定するために10000名の被験者に20問の問題を解答してもらったことを想定したダミーデータから、不注意解答者を回答時間と正答率をもとに特定して、それらの解答を取り除いて各問題の正答率・平均解答時間を集計するパイプラインを構築しました。こちらのパイプラインのタスクの依存関係を可視化した例をあとで示します。なお、こちらのコードはこちらにあります。

user_id question_id response response_seconds
user_00001 q_001 1 14.14897598
user_00001 q_002 1 42.45777024
user_00001 q_003 1 13.62420403
... ... ... ...
user_10000 q_018 1 15.015039
user_10000 q_019 1 6.953223998
user_10000 q_020 0 32.19538489

依存関係の抽出と可視化

依存関係の抽出機能と抽出した依存関係の可視化機能の含まれた付録の luigi_dependency_graphs.pyを実装しました。

依存関係の抽出

依存関係を抽出してからそれを可視化するのですから、まずは依存関係抽出機能について解説すべきでしょう。
依存関係抽出機能を実装するにあたっての一番のキモは、luigi.Taskrequires メソッドと luigi.task.flatten 関数を組み合わせることによって以下のように_依存先のタスクをリストとして取得できること_にあります。これを用いて再帰的に依存先のタスクを取得していくことで依存関係の抽出機能を実装しています。

import luigi class TaskA1(luigi.Task): ... class TaskA2(luigi.Task): ... class TaskB(luigi.Task): def requires(self): return {'a1': TaskA1(), 'a2': TaskA2()} task = TaskB() luigi.task.flatten(task.requires())

それでは、luigi_dependency_graphs.py(の一部分)を見ながら、その中心的な関数である get_task_dependencies とその関連するクラス・関数の設計について見ていきましょう。

import textwrap from collections import defaultdict from dataclasses import dataclass from os import linesep from pathlib import Path from typing import Callable, Type

import graphviz import luigi

@dataclass(order=True, frozen=True) class TaskInfo: """ タスク情報を保持するデータクラス """

namespace: str
name: str
docstring: str

def extract_class_docstring(class_: Type) -> str: """ extract_class_docstring クラスのdocstringを抽出し、左詰に整形して返す。

Args:
    class_ (Type): docstring抽出対象のクラス

Returns:
    str: 左詰に整形されたクラスのdocstring。docstringが存在しない場合は空の文字列。
"""
if class_.__doc__ is None:
    return ""
return textwrap.dedent(class_.__doc__).strip()

def get_task_dependencies(task: luigi.Task) -> dict[TaskInfo, list[TaskInfo]]: """ get_task_dependencies タスクの依存関係の辞書(キー:luigiタスクの情報、値:そのタスクの依存しているタスクの情報についてのリスト)を返す。

Args:
    task (luigi.Task): 依存関係を調べたいタスク

Returns:
    dict[TaskInfo, list[TaskInfo]]: タスクの依存関係の辞書
"""
dependencies = defaultdict(list)
visited = set()

def _get_task_dependencies(task):
    if task in visited:
        return
    visited.add(task)

    docstring = extract_class_docstring(task.__class__)
    task_info = TaskInfo(task.task_namespace, task.__class__.__name__, docstring)
    dependencies[task_info]

    for req in luigi.task.flatten(task.requires()):
        req_docstring = extract_class_docstring(req.__class__)
        req_task_info = TaskInfo(
            req.task_namespace, req.__class__.__name__, req_docstring
        )
        dependencies[task_info].append(req_task_info)
        _get_task_dependencies(req)

_get_task_dependencies(task)

uniquified_dependencies = {k: sorted(set(v)) for k, v in dependencies.items()}
return uniquified_dependencies

抽出した依存関係の可視化

抽出した依存関係をもとにグラフを画像として出力する機能もluigi_dependency_graphs.py中に用意しました(save_task_dependencies_diagramsave_task_namespace_dependencies_diagram)。
これらの関数を使うことで以下のように画像を出力できます。

タスクのグループ間・タスク間の依存関係の例

Luigiの組み込みの可視化機能と今回実装した可視化機能による出力を比較します。

Luigi組み込み機能による可視化の例(再掲)

luigi_dependency_graphs.py を使って出力された画像では、Luigi組み込みのそれと比べて、全体の大雑把な流れの可視化と詳細かつ局所的な依存関係の可視化ができていることが見て取れると思います。この可視化はとくに大規模で複雑なパイプラインの依存関係を把握するときに威力を発揮して、設計・実装・レビューの効率化に寄与します。

まとめと感想

Luigiを使っていてつらいなと思っていたタスク依存関係の可視化を自分の思っていた通りの形で実現できました。依存関係抽出と可視化機能の含まれたluigi_dependency_graphs.pyを付録に載せていますので、Luigiを使っていて同じ辛さを感じている方がいればご活用いただきたいです。

また、これをもとにご自身で依存関係抽出・可視化のコードを書いてみるのもいかがでしょうか?筆が乗ってくると結構楽しいかもしれませんよ!

We are hiring!

株式会社ドワンゴの教育事業では、一緒に未来の当たり前の教育をつくるメンバーを募集しています。

私の所属するチームではさまざまなバックグラウンドを持つ企画職とエンジニア、データサイエンティストが一丸となってプロジェクトを進めています。得意なこと不得意なことを補い合いつつお互いの領域に染みだすことのできる退屈しないチームです!

カジュアル面談も行っています。お気軽にご連絡ください!

カジュアル面談応募フォームはこちら

www.nnn.ed.nico

開発チームの取り組み、教育事業の今後については、他の記事や採用資料をご覧ください。

speakerdeck.com

付録

実装した luigi_dependency_graphs.py をここからコピペいただけば、みなさんの環境でもLuigiの依存関係をこの記事のように可視化できます!

その他のコード・データもこちらにありますので、この記事の内容を再現いただけます。

luigi_dependency_graphs.py

import textwrap from collections import defaultdict from dataclasses import dataclass from os import linesep from pathlib import Path from typing import Callable, Type

import graphviz import luigi

@dataclass(order=True, frozen=True) class TaskInfo: """ タスク情報を保持するデータクラス """

namespace: str
name: str
docstring: str

def extract_class_docstring(class_: Type) -> str: """ extract_class_docstring クラスのdocstringを抽出し、左詰に整形して返す。

Args:
    class_ (Type): docstring抽出対象のクラス

Returns:
    str: 左詰に整形されたクラスのdocstring。docstringが存在しない場合は空の文字列。
"""
if class_.__doc__ is None:
    return ""
return textwrap.dedent(class_.__doc__).strip()

def get_task_dependencies(task: luigi.Task) -> dict[TaskInfo, list[TaskInfo]]: """ get_task_dependencies タスクの依存関係の辞書(キー:luigiタスクの情報、値:そのタスクの依存しているタスクの情報についてのリスト)を返す。

Args:
    task (luigi.Task): 依存関係を調べたいタスク

Returns:
    dict[TaskInfo, list[TaskInfo]]: タスクの依存関係の辞書
"""
dependencies = defaultdict(list)
visited = set()

def _get_task_dependencies(task):
    if task in visited:
        return
    visited.add(task)

    docstring = extract_class_docstring(task.__class__)
    task_info = TaskInfo(task.task_namespace, task.__class__.__name__, docstring)
    dependencies[task_info]

    for req in luigi.task.flatten(task.requires()):
        req_docstring = extract_class_docstring(req.__class__)
        req_task_info = TaskInfo(
            req.task_namespace, req.__class__.__name__, req_docstring
        )
        dependencies[task_info].append(req_task_info)
        _get_task_dependencies(req)

_get_task_dependencies(task)

uniquified_dependencies = {k: sorted(set(v)) for k, v in dependencies.items()}
return uniquified_dependencies

def get_task_namespace_dependencies(task: luigi.Task) -> dict[str, list[str]]: """ get_task_namespace_dependencies タスクの名前空間(task_namespace)の依存関係の辞書(キー:luigiタスクの名前空間、値:その名前空間の依存している名前空間のリスト)を返す。

Args:
    task (luigi.Task): 依存関係を調べたいタスク

Returns:
    dict[str, list[str]]: タスクの名前空間(task_namespace)の依存関係の辞書
"""
dependencies = defaultdict(list)
visited = set()

def _get_task_namespace_dependencies(task):
    if task in visited:
        return
    visited.add(task)

    dependencies[task.task_namespace]
    for req in luigi.task.flatten(task.requires()):
        if task.task_namespace != req.task_namespace:
            
            dependencies[task.task_namespace].append(req.task_namespace)

        _get_task_namespace_dependencies(req)

_get_task_namespace_dependencies(task)

uniquified_dependencies = {k: sorted(set(v)) for k, v in dependencies.items()}
return uniquified_dependencies

def extract_summary_and_details(docstring: str) -> tuple[str, str]: """ docstringからサマリーと詳細を抽出する。

Args:
    docstring (str): サマリーと詳細の抽出対象のdocstring

Returns:
    tuple[str, str]: サマリー・詳細
"""
lines = docstring.strip().splitlines()
summary = lines[0].strip() if lines else ""
details = linesep.join(lines[1:]).strip() if len(lines) > 1 else ""

return summary, details

def format_task(task: TaskInfo) -> str: """ graphviz表示用にluigi.Taskのクラス定義から抽出したタスク情報を整形する。

Args:
    task (TaskInfo): luigi.Taskのクラス定義から抽出したタスク情報

Returns:
    str: 整形されたタスク情報についての文字列
"""
if task.docstring == "":
    return f"{task.namespace}.{task.name}"
summary, _ = extract_summary_and_details(task.docstring)
return f"{task.namespace}.{task.name}{linesep}{linesep}{summary}"

def save_task_dependencies_diagram( task: luigi.Task, save_dir: str | Path, output_format: str = "png", info_format_function: Callable[[TaskInfo], str] = format_task, ) -> None: """ Luigiタスクの依存関係をGraphvizを用いて図として保存する。

この関数は、指定されたLuigiタスクに基づいて、タスク間の依存関係を解析し、
Graphvizを使用して依存関係を視覚化する図を生成する。
生成された図は、指定されたディレクトリに保存される。

Args:
    task (luigi.Task): 依存関係を視覚化する対象のLuigiタスク
    save_dir (str | Path):  図を保存するディレクトリのパス。ディレクトリが存在しない場合には新規に作成される。
    output_format (str, optional): 図の出力形式。対応する形式(例: "pdf", "svg")を指定可能。
    info_format_function (Callable[[TaskInfo], str], optional): `TaskInfo`からノード情報を生成する関数
"""
_save_dir = Path(save_dir)
_save_dir.mkdir(exist_ok=True, parents=True)

task_dependencies = get_task_dependencies(task=task)

for task_namespace in {i.namespace for i in task_dependencies.keys()}:
    internal_graph_data = {
        k: v for k, v in task_dependencies.items() if k.namespace == task_namespace
    }
    dot = graphviz.Digraph()
    dot.attr(label=task_namespace)
    dot.attr("node", shape="rectangle", style="filled", fillcolor="lightblue")
    for node in internal_graph_data:
        dot.node(info_format_function(node))
    for node, edges in internal_graph_data.items():
        for edge in edges:
            dot.edge(info_format_function(edge), info_format_function(node))

    save_path = Path(
        _save_dir, task.__class__.__name__, f"{task_namespace}.{output_format}"
    )
    Path(_save_dir, task.__class__.__name__).mkdir(exist_ok=True, parents=True)
    dot.render(str(save_path.with_suffix("")), format=output_format)

def save_task_namespace_dependencies_diagram( task: luigi.Task, save_path: str | Path ) -> None: """ Luigiタスクの名前空間依存関係をGraphvizを用いて図として保存する。

この関数は、指定されたLuigiタスクに基づいて、タスク間の名前空間依存関係を解析し、
Graphvizを使用して依存関係を視覚化する図を生成する。
生成された図は、指定されたファイルパスに保存される。

Args:
    task (luigi.Task): 依存関係を調べる対象のLuigiタスク
    save_path (str | Path): 図を保存するファイルのパス。拡張子(例: ".png", ".pdf")を含める必要がある。

Raises:
    ValueError: 保存先のパスに拡張子が含まれていない場合に発生
"""
_save_path = Path(save_path)
if not _save_path.suffix:
    raise ValueError(f"save_path: {save_path}に拡張子が含まれていません。")

ext = _save_path.suffix.lstrip(".")
task_namespace_dependencies = get_task_namespace_dependencies(task=task)

group_dot = graphviz.Digraph()
group_dot.attr(label="Task Group Dependencies Diagram")
group_dot.attr("node", shape="rectangle", style="filled", fillcolor="lightblue")
for node in task_namespace_dependencies:
    group_dot.node(node)
for node, edges in task_namespace_dependencies.items():
    for edge in edges:
        group_dot.edge(edge, node)
group_dot.render(str(_save_path.with_suffix("")), format=ext)