Getting started with Dagster using docker, Setup Pipeline, and run Assets
Background:
In today’s data-driven world, efficiently managing and processing large volumes of data is crucial for businesses to gain insights and make informed decisions. Data pipelines play a vital role in this process, enabling the seamless data flow from source to destination, with transformations and validations along the way.
Introduction to Dagster:
Dragster is a powerful tool designed to simplify data pipeline development, deployment, and management. With its intuitive interface and robust architecture, Dagster empowers data engineers to build scalable and maintainable pipelines with ease.
Basic components:
The basic components of Dagster include:
- Assets: Represent data entities within a pipeline. They are defined using Python functions annotated with
@asset
. Assets encapsulate data transformation logic. - Jobs: Define the execution logic of a pipeline. Jobs orchestrate the execution of one or more assets and can be composed of multiple steps or tasks.
- Schedules: Specify when jobs should be executed. Schedules are defined using Python functions annotated with
@schedule
, allowing for periodic or event-driven execution. - Definitions: Aggregate all assets, jobs, and schedules within a pipeline. Definitions provide a unified interface for managing and orchestrating the pipeline’s components.
These components work together to define, schedule, and execute data pipelines within Dagster. Assets provide the data transformations, jobs orchestrate the execution flow, schedules dictate when jobs are triggered, and definitions aggregate and manage the pipeline’s components for seamless execution.
Setup dragster docker project
Here is the project code and the folder structure. Download and run the code on your machine.
dagster
├── dags
│ ├── assets
│ │ ├── __init__.py
│ │ └── myAsset.py
│ ├── jobs
│ │ └── __init__.py
│ └── schedules
│ └── __init__.py
├── docker
│ ├── dagster.yaml
│ ├── Dockerfile_for_dagster
│ ├── Dockerfile_for_my_code
│ └── Workspace.yaml
├── docker-compose.yml
└── requirements.txt
Project code
In this project, we’ll be exploring the structure and setup of a Dagster project, a data orchestrator tool used for building data pipelines. Dagster allows for the creation, scheduling, and monitoring of data pipelines, making it easier to manage complex data workflows.
- app.py
This code combines all asset, job, and schedule definitions into a single dagsterDefinitions
object named defs
.
asset_definitions
contain the definitions of all assets, including thefirstAsset
.job_definitions
contain the definitions of all jobs, includingMyJob
.schedule_definitions
contain the definitions of all schedules, includingmy_job_schedule
.
These definitions are then combined into a Definitions
object named defs
. This object can be used to represent and organize the various components of a Dagster pipeline.
2. myAsset.py
This code defines an asset using Dagster, which extracts data. It uses a Pandas compute kind, indicating data manipulation with Pandas. The asset is tagged with “data_extraction” and configured with a retry policy of a maximum of 3 retries, with exponential backoff delay. It generates a Pandas DataFrame containing simulated data about Slack alerts, logs successful execution, and returns the DataFrame as output along with metadata including the number of records, size, and a preview of the DataFrame.
3. jobs/__init__.py
This code defines a job using Dagster, named “MyJob”, which is associated with the asset named “firstAsset”. It utilizes an in-process executor for execution and does not specify any particular graph or IO manager.
4. Schedules/__init__.py
This code defines a schedule using Dagster, named “my_job_schedule”, which runs the job “MyJob” every 5 minutes according to the specified cron schedule. The schedule operates in the “US/Central” timezone. The function _context
is provided but not used within the schedule definition.
5. docker-compose.py
This Docker Compose file defines a multi-container Docker application consisting of several services:
docker_postgresql
: A PostgreSQL database service with specified environment variables for user, password, and database name.
docker_user_code
: A service for running user code. It builds an image using the Dockerfile located at ./docker/Dockerfile_for_my_code
and sets environment variables for connecting to the PostgreSQL database.
docker_webserver
: A service for the Dagster web server. It builds an image using the Dockerfile located at ./docker/Dockerfile_for_dagster
and sets up the entry point for launching the Dragster web server. It exposes port 3000 and maps it to the host system.
docker_daemon
: A service for the Dagster daemon. It builds an image using the same Dockerfile as docker_webserver
and sets up the entry point for running the Dragster daemon.
The services are connected to a custom Docker network named docker_example_network
, facilitating communication between them. Dependencies between services are defined using the depends_on
directive to ensure that services are started in the correct order.
Volumes are mounted for /var/run/docker.sock
and /tmp/io_manager_storage
to allow communication with the Docker daemon and to provide temporary storage for the IO manager.
Overall, this Docker Compose file sets up the infrastructure required to run a Dagster application, including the PostgreSQL database, user code execution environment, web server, and daemon for background processes.
6. Dockerfile_for_dagster
This Dockerfile sets up an environment for running Dagster applications:
It uses the python:3.10-slim
base image.
Installs the necessary Python packages for Dagster and its related components using pip install
.
Sets the DAGSTER_HOME
environment variable to /opt/dagster/dagster_home/
.
Creates the directory specified by DAGSTER_HOME
using mkdir
.
Copies dagster.yaml
and workspace.yaml
files from the ./docker
directory to the DAGSTER_HOME
directory.
Sets the working directory to DAGSTER_HOME
.
This Dockerfile prepares the environment by installing Dagster and its dependencies and setting up the necessary configuration files for running Dagster applications.
7. Dockerfile_for_my_code
This Dockerfile sets up an environment for running user-specific Dagster code:
It uses the python:3.10-slim
base image.
Copies the requirements.txt
file from the local directory to /opt/dagster/app/
in the Docker container.
Sets the working directory to /opt/dagster/app/
.
Upgrades pip
and installs the Python packages specified in requirements.txt
using pip install
.
Copies of the repository code are located in ./dags/
from the local directory to /opt/dagster/app/
in the Docker container.
Exposes port 4000
for the Dagster gRPC server.
Specifies the command to run when the Docker container starts, launching the Dagster gRPC server with the provided parameters (-h
, -p
, -f
).
8. workspace.yaml
The load_from
section in the workspace.yaml
file defines where the Dagster instance should load resources from. In this case:
- It specifies a gRPC server with the following details:
- Host:
docker_user_code
- Port:
4000
This configuration instructs the Dagster instance to load resources from the gRPC server running on the specified host (docker_user_code
) and port (4000
), and it assigns the location name "veve_data_platform"
to this source.
9.dagster.yaml
This configuration in the dagster.yaml
file defines various components and their configurations for the Dagster instance:
- Scheduler: Specifies the scheduler to be used (
DagsterDaemonScheduler
). - Run Coordinator: Defines the run coordinator (
QueuedRunCoordinator
), which manages the execution of pipeline runs. - Run Launcher: Configures the run launcher (
DockerRunLauncher
) for launching pipeline runs in Docker containers. It specifies environment variables and network settings, ensuring connectivity to other services. - Run Storage: Specifies the storage backend for storing information about pipeline runs (
PostgresRunStorage
). It provides configurations for connecting to a PostgreSQL database, including hostname, username, password, and port. - Schedule Storage: Defines the storage backend for storing information about pipeline schedules (
PostgresScheduleStorage
). Similar to run storage, it configures connections to a PostgreSQL database. - Event Log Storage: Specifies the storage backend for storing event logs (
PostgresEventLogStorage
). Like the other storage backends, it sets up connections to a PostgreSQL database.
Overall, this configuration ensures that the Dagster instance is properly set up to manage scheduling, run execution, and event logging using PostgreSQL as the storage backend.
Summary
Dagster streamlines data pipeline development and management. The provided code demonstrates defining assets, jobs, and schedules within a structured project layout. Docker integration ensures consistent deployment across environments.
By configuring services like PostgreSQL and specifying execution parameters, users can easily deploy and manage complex data workflows. This discussion enhances understanding of Dagster’s modular architecture and Docker-based deployment, empowering effective data pipeline development and orchestration.