Integrations
Spark and PySpark

Want to become a Sparkmage?

Kubernetes

When you run Mage and Spark in the same Kubernetes cluster, you can set the environment variable SPARK_MASTER_HOST to the url of the master node of the Spark cluster in Mage container. Then you’ll be able to connect Mage to your Spark cluster and execute PySpark code in Mage.

Here is an overview of the steps required to use Mage with Spark in Kubernetes cluster.

1. Run Spark cluser in Kubernetes cluster

You can use Helm command to run Spark cluster in the Kubernetes cluster:

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-release bitnami/spark

Then follow the instructions from the output to get the Spark master WebUI URL:

kubectl port-forward --namespace default svc/my-release-spark-master-svc 80:80
echo "Visit http://127.0.0.1:80 to use your application"

Find the Spark Master URL at the top of the web page.

2. Build Mage docker image with Spark environment

  • Download the Dockerfile template for Mage with Spark environment.
  • Build the docker image with the command docker build -t mage_spark .

3. Run Mage in Kubernetes cluster with the SPARK_MASTER_HOST environment variable

  • Use the docker image built from step 2 as the container image in your Kubernetes yaml file.
  • Add SPARK_MASTER_HOST environment variable with the Spark Master URL from step 1 to your the container spec in your Kubernetes yaml file.
  • Then deploy Mage to Kubernetes with the updated yaml file.

Here is the example Pod config:

apiVersion: v1
kind: Pod
metadata:
  name: mage-server
spec:
  containers:
  - name: mage-server
    image: mage_spark
    imagePullPolicy: Never
    ports:
    - containerPort: 6789
    volumeMounts:
    - name: mage-fs
      mountPath: /home/src
    env:
      - name: KUBE_NAMESPACE
        valueFrom:
          fieldRef:
            fieldPath: metadata.namespace
      - name: SPARK_MASTER_HOST
        value: "spark://spark_master_url:7077"
  volumes:
  - name: mage-fs
    hostPath:
      path: /path/to/mage_project
  serviceAccountName: mage-user

Updates

  • In Mage version 0.8.83 and above, you don’t need to specify the environment variable anymore.
  • You can specify the spark config in project’s metadata.yaml file as stated in Custom Spark Session section.

4. Run PySpark code in Mage

When Mage pod is running, you can create a “Standard (batch)” pipeline (python kernel) and then write PySpark code in any blocks.

In Scratchpad block, you’ll need to manually create the Spark session with the code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master(os.getenv('SPARK_MASTER_HOST', 'local')).getOrCreate()

In other blocks, you can access the Spark session from the kwargs via kwargs['spark'].


Standalone Spark cluster

1. Build Mage docker image with Spark environment

  • Download the Dockerfile template for Mage with Spark environment.
  • Build the docker image with the command docker build -t mage_spark .

2. Start Mage with SPARK_MASTER_HOST environment variable

Type this command in your terminal to start Mage using docker

docker run -it --name mage_spark -e SPARK_MASTER_HOST='host' -p 6789:6789 -v $(pwd):/home/src mage_spark \
  /app/run_app.sh mage start demo_project

Notes

  • demo_project is the name of your project, you can change it to anything you want
  • Set the Spark cluster’s master node url as the value for environment variable SPARK_MASTER_HOST

Updates

  • In Mage version 0.8.83 and above, you don’t need to specify the environment variable anymore.
  • You can specify the spark config in project’s metadata.yaml file as stated in Custom Spark Session section.

3. Run PySpark code in Mage

When Mage is running, you can create a “Standard (batch)” pipeline (python kernel) and then write PySpark code in any blocks.

In Scratchpad block, you’ll need to manually create the Spark session with the code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master(os.getenv('SPARK_MASTER_HOST', 'local')).getOrCreate()

In other blocks, you can access the Spark session from the kwargs via kwargs['spark'].


Custom Spark Session

1. Build Mage docker image with Spark environment

  • Download the Dockerfile template for Mage with Spark environment.
  • Build the docker image with the command docker build -t mage_spark .

2. Start Mage using docker with the following command in your terminal

docker run -it --name mage_spark -p 6789:6789 -v $(pwd):/home/src mage_spark \
  /app/run_app.sh mage start demo_project

Notes

  • demo_project is the name of your project, you can change it to anything you want

3. Review the spark_config section in metadata.yaml under the main folder, and make necessary adjustments

spark_config:
  # Application name
  app_name: 'my spark app'
  # Master URL to connect to
  # e.g., spark_master: 'spark://host:port', or spark_master: 'yarn'
  spark_master: 'local'
  # Executor environment variables
  # e.g., executor_env: {'PYTHONPATH': '/home/path'}
  executor_env: {}
  # Jar files to be uploaded to the cluster and added to the classpath
  # e.g., spark_jars: ['/home/path/example1.jar']
  spark_jars: []
  # Path where Spark is installed on worker nodes,
  # e.g. spark_home: '/usr/lib/spark'
  spark_home: null
  # List of key-value pairs to be set in SparkConf
  # e.g., others: {'spark.executor.memory': '4g', 'spark.executor.cores': '2'}
  others: {}

4. Run PySpark code in Mage

When Mage is running, you can create a “Standard (batch)” pipeline (python kernel), and add a block, then write PySpark code using the Spark session via kwargs['spark'], e.g.,

spark = kwargs.get('spark')
print(spark.sql('select 1'))

In Scratchpad block, you’ll need to manually create the Spark session with the code:

from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.services.spark.config import SparkConfig
from mage_ai.services.spark.spark import get_spark_session
spark_config = SparkConfig.load(
    config={'repo_path': get_repo_path()})
spark = get_spark_session(spark_config)
print(spark.sql('select 1'))

Hadoop and Yarn cluster for Spark

1. Build Mage docker image with a Hadoop Yarn environment

  • Download the Dockerfile template for Mage with Hadoop environment.
  • Build the docker image with the command docker build -t mage_hadoop .

2. Start Mage with the following command in your terminal using docker

docker run -it --name mage_hadoop -p 6789:6789 -v $(pwd):/home/src mage_hadoop \
  /app/run_app_with_hadoop.sh mage start demo_project

Notes

  • demo_project is the name of your project, you can change it to anything you want

3. Run PySpark code in Mage using Hadoop Yarn

  • Change the metadata.yaml file in the main pipeline folder to include the following Spark settings:
spark_config:
  # Application name
  app_name: 'my spark yarn app'
  # Master URL to connect to
  # e.g., spark_master: 'spark://host:port', or spark_master: 'yarn'
  spark_master: 'yarn'
  # Executor environment variables
  # e.g., executor_env: {'PYTHONPATH': '/home/path'}
  executor_env: {}
  # Jar files to be uploaded to the cluster and added to the classpath
  # e.g., spark_jars: ['/home/path/example1.jar']
  spark_jars: []
  # Path where Spark is installed on worker nodes,
  # e.g. spark_home: '/usr/lib/spark'
  spark_home: null
  # List of key-value pairs to be set in SparkConf
  # e.g., others: {'spark.executor.memory': '4g', 'spark.executor.cores': '2'}
  others: {}
  • Create a new Standalone (batch) pipeline and add a Data loader, then run it with the following code:
@data_loader
def load_data(*args, **kwargs):
    """
    Template code for loading data from any source.

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your data loading logic here
    spark = kwargs.get('spark')
    spark_conf = spark.sparkContext.getConf()
    print('#' * 40)
    print(spark_conf.toDebugString())
    print('#' * 40)
    print(spark.sql('select 1'))
    return {}
  • Verify that the added Spark code is running through a Yarn master.

AWS

Here is an overview of the steps required to use Mage locally with Spark in AWS:

  1. Create an EC2 key pair
  2. Create an S3 bucket for Spark
  3. Start Mage
  4. Configure project’s metadata settings
  5. Launch EMR cluster
  6. Allow EMR connection permissions
  7. Sample pipeline with PySpark code
  8. Debugging
  9. Clean up

If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack

1. Create an EC2 key pair

If you don’t have an existing EC2 key pair that you use to SSH into EC2 instances, follow AWS’s guide on how to create an EC2 key pair.

Once you created an EC2 key pair, note the name of the EC2 key pair and the full path of where you saved it on your local machine (we’ll need it later).

2. Create an S3 bucket for Spark

Using Spark on AWS EMR requires an AWS S3 bucket to store logs, scripts, etc.

Follow AWS’s guide to create an S3 bucket. You don’t need to add any special permissions or policies to this bucket.

Once you created an S3 bucket, note the name of the bucket (we’ll need it later).

3. Start Mage

Using Mage with Spark is much easier if you use Docker.

Type this command in your terminal to start Mage using docker (Note: demo_project is the name of your project, you can change it to anything you want):

docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai \
  /app/run_app.sh mage start demo_project

4. Configure project’s metadata settings

Open your project’s metadata.yaml file located at the root of your project’s directory: demo_project/metadata.yaml (presuming your project is named demo_project).

Change the values for the keys mentioned in the following steps.

4a. ec2_key_name

Change the value for key ec2_key_name to equal the name of the EC2 key pair you created in an earlier step.

For example, if your EC2 key pair is named aws-ec2.pem or aws-ec2.pub, then the value for the key ec2_key_name should be aws-ec2.

4b. remote_variables_dir

Change the value for key remote_variables_dir to equal the S3 bucket you created in an earlier step.

For example, if your S3 bucket is named my-awesome-bucket, then the value for the key remote_variables_dir should be s3://my-awesome-bucket.

4c. Remove optional settings

You can remove the following 2 keys:

  1. master_security_group
  2. slave_security_group

Your final metadata.yaml file could look like this:

variables_dir: ./
remote_variables_dir: s3://my-awesome-bucket

emr_config:
  master_instance_type: "r5.4xlarge"
  slave_instance_type: "r5.4xlarge"
  ec2_key_name: aws-ec2

You may need to request an increase in quota limits for using those instance types.

For more information on how to view your quota limits and request an increase, check out this AWS document.

5. Launch EMR cluster

You’ll need an AWS Access Key ID and an AWS Secret Access Key. This is provided from AWS’s IAM Management console.

Once you’ve acquired those credentials, do the following:

5a. Create IAM roles for EMR

The following steps will create 2 IAM roles required for EMR.

  1. Install AWS CLI.
  2. Run this command in your terminal (reference)
aws emr create-default-roles

5b. Run script to create cluster

Using your AWS Access Key ID and an AWS Secret Access Key, run the following command in your terminal to launch an EMR cluster:

Using pip:

Note: You must have your AWS Access Key ID and an AWS Secret Access Key in your environment variables as AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY respectively.

mage create_spark_cluster demo_project
Using Docker:
docker run -it \
  -v $(pwd):/home/src mageai/mageai \
  --env AWS_ACCESS_KEY_ID=your_key_id \
  --env AWS_SECRET_ACCESS_KEY=your_access_key \
  mage create_spark_cluster demo_project

This script will take a few minutes to complete. Once finished, your terminal will output something like this:

Creating EMR cluster for project: /home/src/demo_project
Creating cluster...

{
  "JobFlowId": "j-3500M6WJOND9Q",
  "ClusterArn": "...",
  "ResponseMetadata": {
    "RequestId": "...,
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "...,
      "content-type": "application/x-amz-json-1.1",
      "content-length": "118",
      "date": "Wed, 17 Aug 2022 04:32:33 GMT"
    },
    "RetryAttempts": 0
  }
}

Cluster ID: j-3500M6WJOND9Q
Waiting for cluster, this typically takes several minutes...
Current status: STARTING..BOOTSTRAPPING.....WAITING
Cluster j-3500M6WJOND9Q is created

6. Allow EMR connection permissions

There are 2 ways to connect to your EMR remotely:

  1. Whitelist your IP
  2. SSH into EMR master node

Whitelist your IP

  1. Open the EC2 dashboard in AWS
  2. Click on “Security Groups” on the left side panel under the section “Network & Security”
  3. Find the security group named “ElasticMapReduce-master”
  4. Add a new inbound rule with the following values:
TypeProtocolPort rangeSource
Custom TCPTCP8998My IP

This will allow your locally running Mage to remotely access AWS EMR.

SSH into EMR master node

1. Allow SSH access to EMR

Add an inbound rule to the EMR master node’s security group to allow SSH access by following these steps:

  1. Go to Amazon EMR.
  2. Click on the cluster you just created.
  3. Under the section Security and access, click the link next to Security groups for Master:. The link could look like this sg-0bb79fd041def8c5d (depending on your security group ID).
  4. In the “Security Groups” page that just opened up, click on the row with the “Security group name” value of “ElasticMapReduce-master”.
  5. Under the section “Inbound rules”, click the button on the right labeled “Edit inbound rules”.
  6. Scroll down and click “Add rule”.
  7. Change the “type” dropdown from Custom TCP to SSH.
  8. Change the “Source” dropdown from Custom to My IP.
  9. Click “Save rules” in the bottom right corner of the page.

2. SSH into master node

  1. Go to Amazon EMR.
  2. Click on the cluster you just created.
  3. Find the Master public DNS, it should look something like this: ec2-some-ip.us-west-2.compute.amazonaws.com.
  4. Make sure your EC2 key pair is read-only. Run the following command (change the location to wherever you saved your EC2 key pair locally): chmod 400 ~/.ssh/aws-ec2.pem
  5. In a separate terminal session, run the following command:
ssh -i [location of EC2 key pair file] \
  -L 0.0.0.0:9999:localhost:8998 \
  hadoop@[Master public DNS]

The command could look like this:

ssh -i ~/.ssh/aws-ec2.pem \
  -L 0.0.0.0:9999:localhost:8998 \
  hadoop@ec2-44-234-41-39.us-west-2.compute.amazonaws.com

7. Sample pipeline with PySpark code

If you aren’t using Docker and you installed Mage using pip, you must run the following commands in your terminal to use the pyspark kernel:

pip install sparkmagic
mkdir ~/.sparkmagic
wget https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json
mv example_config.json ~/.sparkmagic/config.json
sed -i 's/localhost:8998/host.docker.internal:9999/g' ~/.sparkmagic/config.json

  1. Create a new pipeline by going to File in the top left corner of the page and then clicking New pipeline.
  2. Change the pipeline’s kernel from python to pyspark. Click the button with the green dot and the word python next to it. This is located at the top of the page on the right side of your header.
  3. Click + Data loader, then Generic (no template) to add a new data loader block.
  4. Paste the following sample code in the new data loader block:
from pandas import DataFrame
import io
import pandas as pd
import requests


if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader


def data_from_internet():
    url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv'

    response = requests.get(url)
    return pd.read_csv(io.StringIO(response.text), sep=',')


@data_loader
def load_data(**kwargs) -> DataFrame:
    df_spark = kwargs['spark'].createDataFrame(data_from_internet())

    return df_spark
  1. Click + Data exporter, then Generic (no template) to add a new data exporter block.
  2. Paste the following sample code in the new data exporter block (change the s3://bucket-name to the bucket you created from a previous step):
from pandas import DataFrame

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data(df: DataFrame, **kwargs) -> None:
    (
        df.write
        .option('delimiter', ',')
        .option('header', 'True')
        .mode('overwrite')
        .csv('s3://mage-spark-cluster/demo_project/demo_pipeline/')
    )

Verify everything worked

Let’s load the data from S3 that we just created using Spark:

  1. Click + Data loader, then Generic (no template) to add a new data loader block.
  2. Paste the following sample code in the new data loader block (change the s3://bucket-name to the bucket you created from a previous step):
from pandas import DataFrame


if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader


@data_loader
def load_data(**kwargs) -> DataFrame:
    df = (
        kwargs['spark'].read
        .format('csv')
        .option('header', 'true')
        .option('inferSchema', 'true')
        .option('delimiter', ',')
        .load('s3://mage-spark-cluster/demo_project/demo_pipeline/*')
    )

    return df

8. Debugging

If you run into any problems, 1st thing to try is restarting the kernel: Run > Restart kernel.

If that doesn’t work, restart the app by stopping the docker container and starting it again.

9. Clean up

Please make sure to terminate your EMR cluster when you’re done using it so you can save money.


[WIP] GCP

Coming soon…


[WIP] Azure

Coming soon…