Getting started with Dagster using docker, Setup Pipeline, and run Assets

Jouneid Raza
6 min readJun 10, 2024

--

Background:

In today’s data-driven world, efficiently managing and processing large volumes of data is crucial for businesses to gain insights and make informed decisions. Data pipelines play a vital role in this process, enabling the seamless data flow from source to destination, with transformations and validations along the way.

Introduction to Dagster:

Dragster is a powerful tool designed to simplify data pipeline development, deployment, and management. With its intuitive interface and robust architecture, Dagster empowers data engineers to build scalable and maintainable pipelines with ease.

Basic components:

The basic components of Dagster include:

  1. Assets: Represent data entities within a pipeline. They are defined using Python functions annotated with @asset. Assets encapsulate data transformation logic.
  2. Jobs: Define the execution logic of a pipeline. Jobs orchestrate the execution of one or more assets and can be composed of multiple steps or tasks.
  3. Schedules: Specify when jobs should be executed. Schedules are defined using Python functions annotated with @schedule, allowing for periodic or event-driven execution.
  4. Definitions: Aggregate all assets, jobs, and schedules within a pipeline. Definitions provide a unified interface for managing and orchestrating the pipeline’s components.

These components work together to define, schedule, and execute data pipelines within Dagster. Assets provide the data transformations, jobs orchestrate the execution flow, schedules dictate when jobs are triggered, and definitions aggregate and manage the pipeline’s components for seamless execution.

Setup dragster docker project

Here is the project code and the folder structure. Download and run the code on your machine.

dagster
├── dags
│ ├── assets
│ │ ├── __init__.py
│ │ └── myAsset.py
│ ├── jobs
│ │ └── __init__.py
│ └── schedules
│ └── __init__.py
├── docker
│ ├── dagster.yaml
│ ├── Dockerfile_for_dagster
│ ├── Dockerfile_for_my_code
│ └── Workspace.yaml
├── docker-compose.yml
└── requirements.txt

Project code

In this project, we’ll be exploring the structure and setup of a Dagster project, a data orchestrator tool used for building data pipelines. Dagster allows for the creation, scheduling, and monitoring of data pipelines, making it easier to manage complex data workflows.

  1. app.py

This code combines all asset, job, and schedule definitions into a single dagsterDefinitions object named defs.

  • asset_definitions contain the definitions of all assets, including the firstAsset.
  • job_definitions contain the definitions of all jobs, including MyJob.
  • schedule_definitions contain the definitions of all schedules, including my_job_schedule.

These definitions are then combined into a Definitions object named defs. This object can be used to represent and organize the various components of a Dagster pipeline.

2. myAsset.py

This code defines an asset using Dagster, which extracts data. It uses a Pandas compute kind, indicating data manipulation with Pandas. The asset is tagged with “data_extraction” and configured with a retry policy of a maximum of 3 retries, with exponential backoff delay. It generates a Pandas DataFrame containing simulated data about Slack alerts, logs successful execution, and returns the DataFrame as output along with metadata including the number of records, size, and a preview of the DataFrame.

3. jobs/__init__.py

This code defines a job using Dagster, named “MyJob”, which is associated with the asset named “firstAsset”. It utilizes an in-process executor for execution and does not specify any particular graph or IO manager.

4. Schedules/__init__.py

This code defines a schedule using Dagster, named “my_job_schedule”, which runs the job “MyJob” every 5 minutes according to the specified cron schedule. The schedule operates in the “US/Central” timezone. The function _context is provided but not used within the schedule definition.

5. docker-compose.py

This Docker Compose file defines a multi-container Docker application consisting of several services:

docker_postgresql: A PostgreSQL database service with specified environment variables for user, password, and database name.

docker_user_code: A service for running user code. It builds an image using the Dockerfile located at ./docker/Dockerfile_for_my_code and sets environment variables for connecting to the PostgreSQL database.

docker_webserver: A service for the Dagster web server. It builds an image using the Dockerfile located at ./docker/Dockerfile_for_dagster and sets up the entry point for launching the Dragster web server. It exposes port 3000 and maps it to the host system.

docker_daemon: A service for the Dagster daemon. It builds an image using the same Dockerfile as docker_webserver and sets up the entry point for running the Dragster daemon.

The services are connected to a custom Docker network named docker_example_network, facilitating communication between them. Dependencies between services are defined using the depends_on directive to ensure that services are started in the correct order.

Volumes are mounted for /var/run/docker.sock and /tmp/io_manager_storage to allow communication with the Docker daemon and to provide temporary storage for the IO manager.

Overall, this Docker Compose file sets up the infrastructure required to run a Dagster application, including the PostgreSQL database, user code execution environment, web server, and daemon for background processes.

6. Dockerfile_for_dagster

This Dockerfile sets up an environment for running Dagster applications:

It uses the python:3.10-slim base image.

Installs the necessary Python packages for Dagster and its related components using pip install.

Sets the DAGSTER_HOME environment variable to /opt/dagster/dagster_home/.

Creates the directory specified by DAGSTER_HOME using mkdir.

Copies dagster.yaml and workspace.yaml files from the ./docker directory to the DAGSTER_HOME directory.

Sets the working directory to DAGSTER_HOME.

This Dockerfile prepares the environment by installing Dagster and its dependencies and setting up the necessary configuration files for running Dagster applications.

7. Dockerfile_for_my_code

This Dockerfile sets up an environment for running user-specific Dagster code:

It uses the python:3.10-slim base image.

Copies the requirements.txt file from the local directory to /opt/dagster/app/ in the Docker container.

Sets the working directory to /opt/dagster/app/.

Upgrades pip and installs the Python packages specified in requirements.txt using pip install.

Copies of the repository code are located in ./dags/ from the local directory to /opt/dagster/app/ in the Docker container.

Exposes port 4000 for the Dagster gRPC server.

Specifies the command to run when the Docker container starts, launching the Dagster gRPC server with the provided parameters (-h, -p, -f).

8. workspace.yaml

The load_from section in the workspace.yaml file defines where the Dagster instance should load resources from. In this case:

  • It specifies a gRPC server with the following details:
  • Host: docker_user_code
  • Port: 4000

This configuration instructs the Dagster instance to load resources from the gRPC server running on the specified host (docker_user_code) and port (4000), and it assigns the location name "veve_data_platform" to this source.

9.dagster.yaml

This configuration in the dagster.yaml file defines various components and their configurations for the Dagster instance:

  • Scheduler: Specifies the scheduler to be used (DagsterDaemonScheduler).
  • Run Coordinator: Defines the run coordinator (QueuedRunCoordinator), which manages the execution of pipeline runs.
  • Run Launcher: Configures the run launcher (DockerRunLauncher) for launching pipeline runs in Docker containers. It specifies environment variables and network settings, ensuring connectivity to other services.
  • Run Storage: Specifies the storage backend for storing information about pipeline runs (PostgresRunStorage). It provides configurations for connecting to a PostgreSQL database, including hostname, username, password, and port.
  • Schedule Storage: Defines the storage backend for storing information about pipeline schedules (PostgresScheduleStorage). Similar to run storage, it configures connections to a PostgreSQL database.
  • Event Log Storage: Specifies the storage backend for storing event logs (PostgresEventLogStorage). Like the other storage backends, it sets up connections to a PostgreSQL database.

Overall, this configuration ensures that the Dagster instance is properly set up to manage scheduling, run execution, and event logging using PostgreSQL as the storage backend.

Summary

Dagster streamlines data pipeline development and management. The provided code demonstrates defining assets, jobs, and schedules within a structured project layout. Docker integration ensures consistent deployment across environments.

By configuring services like PostgreSQL and specifying execution parameters, users can easily deploy and manage complex data workflows. This discussion enhances understanding of Dagster’s modular architecture and Docker-based deployment, empowering effective data pipeline development and orchestration.

Feel free to contact me here on Linkedin, Follow me on Instagram, and leave a message (Whatsapp +923225847078) in case of any queries.

Happy learning :)

--

--

Jouneid Raza
Jouneid Raza

Written by Jouneid Raza

With 8 years of industry expertise, I am a seasoned data engineer specializing in data engineering with diverse domain experiences.

No responses yet