Sources
Amazon SQS
Config
connector_type: amazon_sqs
queue_name: test-queue
batch_size: 10
wait_time_seconds: 1
message_deletion_method: AFTER_RECEIVED # AFTER_RECEIVED or MANUAL
serde_config:
serialization_method: JSON
Authentication
Here are the options to authenticate with the AWS SQS.
- Add the following keys and values to your environment variables
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_REGION
- If you deploy Mage on AWS ECS cluster, you can use ECS execution task role to authenticate. You can grant the ECS task permissions to access other AWS services by attaching IAM policies to this ECS task execution role.
If you get the botocore.exceptions.NoRegionError
error, try setting the AWS_DEFAULT_REGION
environment variable.
Pass raw message to transformer
serde_config:
serialization_method: RAW_VALUE
Message deletion method
We support two ways to delete messages:
- Delete the message in the data loader automatically after deserializing the message body.
- Set
message_deletion_method: AFTER_RECEIVED
in the source config. - The input of the transformer is the list of deserialized message body.
- Set
- Manually delete the message in transformer after processing the message.
-
Set
message_deletion_method: MANUAL
in the source config. -
The input of the transformer is the list of dictionary with the structure
{ 'parsed_msg_body': {'k1': 'v1', ...}, 'raw_message': sqs.Message(...) }
You can use the following example code to process the message and delete it after.
@transformer def transform(messages: List[Dict], *args, **kwargs): processed_msgs = [] for msg in messages: processed_msg = msg['parsed_msg_body'] # Add your own logic to process the message processed_msgs.append(processed_msg) msg['raw_message'].delete() return processed_msgs
-
Was this page helpful?