This project uses sample data from the Google NYC Taxi Pubsub to create a streaming pipeline in Mage that reads and transforms a sample stream, ultimately writing output to a SCD Type-2 table in Postgres.
This project uses sample data from the Google NYC Taxi Pubsub to create a streaming pipeline in Mage that:
Then, it creates a batch pipeline to:
Read more or make updates to the repo here.
Prerequisites
docker compose
tooLet’s get started 🎉
git clone https://github.com/mage-ai/pubsub-devcontainer-demo
.env
file with the requisite variables. See the .env.example
file for guidance.reopen in Container
to start the devcontainer or open the command prompt and select Devcontainers: Rebuild and Reopen in Container
. This will build a devcontainer (a Docker container) and install all dependencies to your project. This may take a few minutes.
docker compose up
from the command line.localhost:6789
when the container is ready. You should see the Mage UI!By performing the above, VSCode is creating an isolated environment, installing a few extensions + building and running Docker as defined in docker-compose.yaml
.
We’re using Devcontainers to create a consistent development environment for our project. This means that we can be sure that our code will run in the same environment as our teammates and that we can easily share our code with others.
As for the stream itself, you might notice kafka
and zookeeper
in the docker-compose.yaml
file. These are the tools we use to manage the stream.
We’re using kafka
to manage the stream and zookeeper
to manage kafka
. You can read more about these tools here.
The acutal stream is managed by the stream
container, which spins up a python container and executes ./stream/bin/send_stream.py
— this is the script that reads from the pubsub topic data (./stream/data/taxi-stream.json
).
Data will begin streaming 30 seconds after the container starts.
kafka_demo
and dbt_demo
— click kafka_demo
then the “code” icon in the top left to open the pipeline code..env
file.Execute pipeline
in the bottom right to run your stream. You should see data flowing!Nice! We’ve got a working stream. You can click Cancel pipeline
after some sample data has been loaded or let the stream run.
Behind the scenes, we’re kicking off a Kafka + Zookeeper instance, creating a topic, and writing data to that topic. Mage is then pulling in this “simulated” stream, transforming it, and writing to S3!
This container builds a postgres database alongside our Mage app for us to run transformations in dbt. Navigate to the pipelines
page and select dbt demo
. Select the code icon.
dbt
. Note how we can read from our mage data directly, without the need for an intermediate table.