snowflake.core.task.dagv1 | Snowflake Documentation (original) (raw)

High level, client-side representation of task graphs.

This set of higher-level classes provides a more convenient way to create, deploy, and manage task graphs than the lower-level Task APIs in snowflake.core.task. Task graphs are directed acyclic graphs (DAG) of tasks.

Example 1: Create a task graph that has two Tasks.

from snowflake.snowpark.functions import sum as sum_ from snowflake.core.task import StoredProcedureCall from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation def dosomething(session: Session) -> None: ... df = session.table("target") ... df.group_by("a").agg(sum_("b")).save_as_table("agg_table") with DAG("my_dag", schedule=timedelta(days=1)) as dag: ... # Create a task that runs some SQL. ... dag_task1 = DAGTask( ... "dagtask1", ... "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v") ... # Create a task that runs a Python function. ... dag_task2 = DAGTask( ... StoredProcedureCall( ... dosomething, stage_location="@mystage", ... packages=["snowflake-snowpark-python"] ... ), ... warehouse="test_warehouse") ... )

Shift right and left operators can specify task relationships.

dag_task1 >> dag_task2 schema = root.databases["MYDB"].schemas["MYSCHEMA"] dag_op = DAGOperation(schema) dag_op.deploy(dag)

Example 2: Create a task graph that uses Cron, Branch, and function return value as Task return value

from snowflake.snowpark import Session from snowflake.core import Root from snowflake.core._common import CreateMode from snowflake.core.task import Cron from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch session = Session.builder.create() test_stage = "mystage" test_dag = "mydag" test_db = "mydb" test_schema = "public" test_warehouse = "testwh_python" root = Root(session) schema = root.databases[test_db].schemas[test_schema] def task_handler1(session: Session) -> None: ... pass # do something def task_handler2(session: Session) -> None: ... pass # do something def task_handler3(session: Session) -> None: ... pass # do something def task_branch_handler(session: Session) -> str: ... # do something ... return "task3" try: ... with DAG( ... test_dag, ... schedule=Cron("10 * * * *", "America/Los_Angeles"), ... stage_location=test_stage, ... packages=["snowflake-snowpark-python"], ... warehouse=test_warehouse, ... use_func_return_value=True, ... ) as dag: ... task1 = DAGTask( ... "task1", ... task_handler1, ... ) ... task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse) ... task2 = DAGTask("task2", task_handler2) ... task1 >> task1_branch ... task1_branch >> [task2, task_handler3] # after >> you can use a DAGTask or a function. ... op = DAGOperation(schema) ... op.deploy(dag, mode=CreateMode.or_replace) finally: ... session.close()

Classes

DAG(name, *[, schedule, warehouse, ...]) A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.
DAGTask(name, definition, *[, condition, ...]) Represents a child Task of a task graph.
DAGRun() Contains the history of a task graph run in Snowflake.
DAGOperation(schema) APIs to manage task graph child task operations.