Agregar y actualizar DAG (original) (raw)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se describe cómo administrar DAG en tu entorno de Cloud Composer.

Cloud Composer usa un bucket de Cloud Storage para almacenar los DAG de tu entorno de Cloud Composer. Tu entorno sincroniza los DAG de este bucket con los componentes de Airflow, como los trabajadores y los programadores de Airflow.

Antes de comenzar

Accede al bucket de tu entorno

Para acceder al bucket asociado con tu entorno, haz lo siguiente:

Console

  1. En la consola de Google Cloud, ve a la página Entornos.
    Ir a Entornos
  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. Se abrirá la página Detalles del bucket. Muestra el contenido de la carpeta /dags en el bucket de tu entorno.

gcloud

gcloud CLI tiene comandos independientes para agregar y borrar DAGs en el bucket de tu entorno.

Si deseas interactuar con el bucket de tu entorno, también puedes usarGoogle Cloud CLI. Para obtener la dirección del bucket de tu entorno, ejecuta el siguiente comando de la CLI de gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Reemplaza lo siguiente:

Ejemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Realiza una solicitud a la API de environments.get. En el recurso Entorno, en el recurso EnvironmentConfig, en el recurso dagGcsPrefix, se encuentra la dirección del bucket de tu entorno.

Ejemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Usa la biblioteca google-auth para obtener credenciales y la biblioteca requests para llamar a la API de REST.

Agrega o actualiza un DAG

Para agregar o actualizar un DAG, mueve el archivo .py de Python del DAG a la carpeta /dags del bucket del entorno.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.
    Ir a Entornos
  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. Se abrirá la página Detalles del bucket. Muestra el contenido de la carpeta /dags en el bucket de tu entorno.
  3. Haz clic en Subir archivos. Luego, selecciona el archivo .py de Python para el DAG con el diálogo del navegador y confirma.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Reemplaza lo siguiente:

Ejemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Actualiza un DAG que tenga ejecuciones activas del DAG

Si actualizas un DAG que tiene ejecuciones activas del DAG:

Actualiza los DAG que se ejecutan con frecuencia

Después de subir un archivo DAG, Airflow tarda en cargarse y actualizar y en actualizar el DAG. Si tu DAG se ejecuta con frecuencia, es posible que desees asegurarte de que el DAG use la versión actualizada del archivo de DAG. Para ello, deberás hacer lo siguiente:

  1. Pausa el DAG en la IU de Airflow.
  2. Sube un archivo DAG actualizado.
  3. Espera hasta ver las actualizaciones en la IU de Airflow. Esto significa que el programador analizó correctamente el DAG y lo actualizó en la base de datos de Airflow.
    Si la IU de Airflow muestra los DAG actualizados, esto no garantiza que los trabajadores de Airflow tengan la versión actualizada del archivo de DAG. Esto sucede porque los archivos de DAG se sincronizan de forma independiente para los programadores y trabajadores.
  4. Es posible que desees extender el tiempo de espera para asegurarte de que el archivo DAG esté sincronizado con todos los trabajadores de tu entorno. La sincronización se realiza varias veces por minuto. En un entorno en buen estado, esperar entre 20 y 30 segundos es suficiente para que todos los trabajadores se sincronicen.
  5. Si deseas asegurarte de que todos los trabajadores tengan la versión nueva del archivo de DAG, inspecciona los registros de cada trabajador individual (opcional). Para ello, deberás hacer lo siguiente:
    1. En la consola de Google Cloud, abre la pestaña Registros de tu entorno.
    2. Ve a Registros de Composer > Infraestructura > Sincronización de Cloud Storage y, luego, inspecciona los registros de cada trabajador de tu entorno. Busca el elemento de registro Syncing dags directory más reciente que tenga una marca de tiempo después de subir el archivo DAG nuevo. Si ves un elemento Finished syncing que lo sigue, los DAG se sincronizan con éxito en este trabajador.
  6. Reanuda el DAG.

Borra un DAG de tu entorno

Para borrar un DAG, quita el archivo .py de Python correspondiente al DAG de la carpeta /dags del bucket del entorno.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.
    Ir a Entornos
  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. Se abrirá la página Detalles del bucket. Muestra el contenido de la carpeta /dags en el bucket de tu entorno.
  3. Selecciona el archivo DAG, haz clic en Borrar y confirma la operación.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Reemplaza lo siguiente:

Ejemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Quita un DAG de la IU de Airflow

Para quitar los metadatos de un DAG de la interfaz web de Airflow, sigue estos pasos:

IU de Airflow

  1. Ve a la IU de Airflow de tu entorno.
  2. En el caso del DAG, haz clic en Borrar DAG.

gcloud

En versiones de Airflow 1 anteriores a la 1.14.0, ejecuta el siguiente comando en gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

En Airflow 2, Airflow 1.14.0 y versiones posteriores, ejecuta el siguiente comando en gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Reemplaza lo siguiente:

¿Qué sigue?