snowflake.core.task.dagv1.DAG | Snowflake Documentation (original) (raw)

class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, task_auto_retry_attempts: int | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: dict[str, Any] | None = None, session_parameters: dict[str, Any] | None = None, stage_location: str | None = None, imports: list[str | tuple[str, str]] | None = None, packages: list[str | ModuleType] | None = None, use_func_return_value: bool = False)

Bases: object

A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.

Snowflake doesn’t have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root Task and its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer toTask graphs.

When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A task’s predecessor is the dummy task if it’s added to the task graph with no other predecessors.

Example

dag = DAG("TEST_DAG", ... schedule=timedelta(minutes=10), ... use_func_return_value=True, ... warehouse="TESTWH_DAG", ... packages=["snowflake-snowpark-python"], ... stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG" ... ) def task1(session: Session) -> None: ... session.sql("select 'task1'").collect() def task2(session: Session) -> None: ... session.sql("select 'task2'").collect() def cond(session: Session) -> str: ... return 'TASK1' with dag: ... task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG") ... task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG") ... condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG") ... condition >> [task1, task2] dag_op = DAGOperation(schema) dag_op.deploy(dag, mode="orReplace") dag_op.run(dag) Note: When defining a task branch handler, simply return the task name you want to jump to. The task name is case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return 'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match.

Refer to snowflake.core.task.Task for the details of each property.

Attributes

tasks

Returns a list of tasks this task graph has.

Methods

add_task(task: DAGTask) → None

Add a child task to this task graph.

Parameters:

task (DAGTask) – The child task to be added to this task graph.

Examples

Add a task to previously created DAG

child_task = DagTask( ... "child_task", ... "select 'child_task'", ... warehouse="test_warehouse" ... ) dag.add_task(child_task) )

get_finalizer_task() → DAGTask | None

Get the finalizer task for the dag.

Examples

Get the finalizer task from previously created DAG:

finalizer_task = dag.get_finalizer_task()

get_task(task_name: str) → DAGTask | None

Get a child task from this task graph based on task name.

Parameters:

task_name (str) – The name of the task to be retrieved from this task graph.

Examples

Get a task from previously created DAG

task = dag.get_task("child_task")