Besides supporting running Spark pipelines in AWS EMR cluster, Mage also supports running Spark pipelines in Databricks cluster.

Set up

Here is an overview of the steps required to use Mage with Databricks Cluster:

  1. Set up Databricks cluster
  2. Build docker image
  3. Start Mage
  4. Configure project’s metadata settings
  5. Sample pipeline with PySpark code
  6. Verify everything worked

If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack

1. Set up Databricks cluster

Set up a Databricks workspace and cluster following the docs:

2. Build docker image

Use the Dockerfile template from mage-ai/integrations/databricks/Dockerfile

Update the dadabricks-connect version to match the version used in your Databricks cluster.

Build the Docker image with command docker build -t mage_databricks ..

3. Start Mage

Type this command in your terminal to start Mage using docker (Note: demo_project is the name of your project, you can change it to anything you want):

docker run -it --name mage_databricks -p 6789:6789 -v $(pwd):/home/src mage_databricks \
  /app/run_app.sh mage start demo_project

SSH into the docker container and configure databricks-connect following the guide.

4. Configure project’s metadata settings

Open your project’s metadata.yaml file located at the root of your project’s directory: demo_project/metadata.yaml (presuming your project is named demo_project).

Change the value for key variables_dir to be a S3 bucket that you want to use to store intermediate block output.

For example, if your S3 bucket is named my-awesome-bucket, then the value for the key variables_dir should be s3://my-awesome-bucket.

5. Sample pipeline with PySpark code

  1. Create a new pipeline by going to File in the top left corner of the page and then clicking New pipeline.
  2. Open the pipeline’s metadata.yaml file and update the type to be databricks.
  3. Click + Data loader, then Generic (no template) to add a new data loader block.
  4. Paste the following sample code in the new data loader block:
from pandas import DataFrame
import io
import pandas as pd
import requests


if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader


def data_from_internet():
    url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv'

    response = requests.get(url)
    return pd.read_csv(io.StringIO(response.text), sep=',')


@data_loader
def load_data(**kwargs) -> DataFrame:
    df_spark = kwargs['spark'].createDataFrame(data_from_internet())

    return df_spark
  1. Click + Data exporter, then Generic (no template) to add a new data exporter block.
  2. Paste the following sample code in the new data exporter block (change the s3://bucket-name to the bucket you created from a previous step):
from pandas import DataFrame

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data(df: DataFrame, **kwargs) -> None:
    (
        df.write
        .option('delimiter', ',')
        .option('header', 'True')
        .mode('overwrite')
        .csv('s3://mage-spark-cluster/demo_project/demo_pipeline/')
    )

6. Verify everything worked

Let’s load the data from S3 that we just created using Spark:

  1. Click + Data loader, then Generic (no template) to add a new data loader block.
  2. Paste the following sample code in the new data loader block (change the s3://bucket-name to the bucket you created from a previous step):
from pandas import DataFrame


if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader


@data_loader
def load_data(**kwargs) -> DataFrame:
    df = (
        kwargs['spark'].read
        .format('csv')
        .option('header', 'true')
        .option('inferSchema', 'true')
        .option('delimiter', ',')
        .load('s3://mage-spark-cluster/demo_project/demo_pipeline/*')
    )

    return df