Config

connector_type: amazon_sqs
queue_name: test-queue
batch_size: 10
wait_time_seconds: 1
message_deletion_method: AFTER_RECEIVED   # AFTER_RECEIVED or MANUAL

serde_config:
  serialization_method: JSON

Authentication

Here are the options to authenticate with the AWS SQS.

  1. Add the following keys and values to your environment variables
  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_REGION
  1. If you deploy Mage on AWS ECS cluster, you can use ECS execution task role to authenticate. You can grant the ECS task permissions to access other AWS services by attaching IAM policies to this ECS task execution role.

If you get the botocore.exceptions.NoRegionError error, try setting the AWS_DEFAULT_REGION environment variable.

Pass raw message to transformer

serde_config:
  serialization_method: RAW_VALUE

Message deletion method

We support two ways to delete messages:

  1. Delete the message in the data loader automatically after deserializing the message body.
    • Set message_deletion_method: AFTER_RECEIVED in the source config.
    • The input of the transformer is the list of deserialized message body.
  2. Manually delete the message in transformer after processing the message.
    • Set message_deletion_method: MANUAL in the source config.

    • The input of the transformer is the list of dictionary with the structure

      {
        'parsed_msg_body': {'k1': 'v1', ...},
        'raw_message': sqs.Message(...)
      }
      

      You can use the following example code to process the message and delete it after.

      @transformer
      def transform(messages: List[Dict], *args, **kwargs):
          processed_msgs = []
          for msg in messages:
              processed_msg = msg['parsed_msg_body']
              # Add your own logic to process the message
              processed_msgs.append(processed_msg)
              msg['raw_message'].delete()
          return processed_msgs
      

Was this page helpful?