Build your first Dagster project | Dagster Docs (original) (raw)

Welcome to Dagster! In this guide, you'll use Dagster to create a basic pipeline that:

To follow the steps in this guide, you'll need:

  1. Open the terminal and create a new directory for your project:
mkdir dagster-quickstart  
cd dagster-quickstart  
  1. Create and activate a virtual environment:
    • MacOS
    • Windows
      bash python -m venv venv source venv/bin/activate
  2. Install Dagster and the required dependencies:
pip install dagster dagster-webserver pandas  

info

The project structure in this guide is simplified to allow you to get started quickly. When creating new projects, use dagster project scaffold to generate a complete Dagster project.

Next, you'll create a basic Dagster project that looks like this:

dagster-quickstart/
├── quickstart/
│   ├── __init__.py
│   └── assets.py
├── data/
    └── sample_data.csv
  1. To create the files and directories outlined above, run the following:
mkdir quickstart data  
touch quickstart/__init__.py quickstart/assets.py  
touch data/sample_data.csv  
  1. In the data/sample_data.csv file, add the following content:
id,name,age,city  
1,Alice,28,New York  
2,Bob,35,San Francisco  
3,Charlie,42,Chicago  
4,Diana,31,Los Angeles  

This CSV will act as the data source for your Dagster pipeline.

Now, create the assets for the ETL pipeline. Open quickstart/assets.py and add the following code:

import pandas as pd

import dagster as dg


@dg.asset
def processed_data():
    ## Read data from the CSV
    df = pd.read_csv("data/sample_data.csv")

    ## Add an age_group column based on the value of age
    df["age_group"] = pd.cut(
        df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"]
    )

    ## Save processed data
    df.to_csv("data/processed_data.csv", index=False)
    return "Data loaded successfully"


## Tell Dagster about the assets that make up the pipeline by
## passing it to the Definitions object
## This allows Dagster to manage the assets' execution and dependencies
defs = dg.Definitions(assets=[processed_data])

This may seem unusual if you're used to task-based orchestration. In that case, you'd have three separate steps for extracting, transforming, and loading.

However, in Dagster, you'll model your pipelines using assets as the fundamental building block, rather than tasks.

  1. In the terminal, navigate to your project's root directory and run:
dagster dev -f quickstart/assets.py  
  1. Open your web browser and navigate to http://localhost:3000, where you should see the Dagster UI:
    2048 resolution
  2. In the top navigation, click Assets > View global asset lineage.
  3. Click Materialize to run the pipeline.
  4. In the popup that displays, click View. This will open the Run details page, allowing you to view the run as it executes.
    2048 resolution
    Use the view buttons in near the top left corner of the page to change how the run is displayed. You can also click the asset to view logs and metadata.

In your terminal, run:

cat data/processed_data.csv

You should see the transformed data, including the new age_group column:

id,name,age,city,age_group
1,Alice,28,New York,Young
2,Bob,35,San Francisco,Middle
3,Charlie,42,Chicago,Senior
4,Diana,31,Los Angeles,Middle

Congratulations! You've just built and run your first pipeline with Dagster. Next, you can: