Databricks
Besides supporting running Spark pipelines in AWS EMR cluster, Mage also supports running Spark pipelines in Databricks cluster.
Set up
Here is an overview of the steps required to use Mage with Databricks Cluster:
- Set up Databricks cluster
- Build docker image
- Start Mage
- Configure project’s metadata settings
- Sample pipeline with PySpark code
- Verify everything worked
If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack
1. Set up Databricks cluster
Set up a Databricks workspace and cluster following the docs:
2. Build docker image
Use the Dockerfile template from mage-ai/integrations/databricks/Dockerfile
Update the dadabricks-connect version to match the version used in your Databricks cluster.
Build the Docker image with command docker build -t mage_databricks .
.
3. Start Mage
Type this command in your terminal to start Mage using docker (Note:
demo_project
is the name of your project, you can change it to anything you
want):
docker run -it --name mage_databricks -p 6789:6789 -v $(pwd):/home/src mage_databricks \
/app/run_app.sh mage start demo_project
SSH into the docker container and configure databricks-connect
following the guide.
4. Configure project’s metadata settings
Open your project’s metadata.yaml
file located at the root of your project’s
directory: demo_project/metadata.yaml
(presuming your project is named
demo_project
).
Change the value for key variables_dir
to be a S3 bucket that
you want to use to store intermediate block output.
For example, if your S3 bucket is named my-awesome-bucket
, then the value for
the key variables_dir
should be s3://my-awesome-bucket
.
5. Sample pipeline with PySpark code
- Create a new pipeline by going to
File
in the top left corner of the page and then clickingNew pipeline
. - Open the pipeline’s metadata.yaml file and update the
type
to bedatabricks
. - Click
+ Data loader
, thenGeneric (no template)
to add a new data loader block. - Paste the following sample code in the new data loader block:
from pandas import DataFrame
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
def data_from_internet():
url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv'
response = requests.get(url)
return pd.read_csv(io.StringIO(response.text), sep=',')
@data_loader
def load_data(**kwargs) -> DataFrame:
df_spark = kwargs['spark'].createDataFrame(data_from_internet())
return df_spark
- Click
+ Data exporter
, thenGeneric (no template)
to add a new data exporter block. - Paste the following sample code in the new data exporter block (change the
s3://bucket-name
to the bucket you created from a previous step):
from pandas import DataFrame
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df: DataFrame, **kwargs) -> None:
(
df.write
.option('delimiter', ',')
.option('header', 'True')
.mode('overwrite')
.csv('s3://mage-spark-cluster/demo_project/demo_pipeline/')
)
6. Verify everything worked
Let’s load the data from S3 that we just created using Spark:
- Click
+ Data loader
, thenGeneric (no template)
to add a new data loader block. - Paste the following sample code in the new data loader block (change the
s3://bucket-name
to the bucket you created from a previous step):
from pandas import DataFrame
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data(**kwargs) -> DataFrame:
df = (
kwargs['spark'].read
.format('csv')
.option('header', 'true')
.option('inferSchema', 'true')
.option('delimiter', ',')
.load('s3://mage-spark-cluster/demo_project/demo_pipeline/*')
)
return df