Send your AWS Step Functions execution result to Slack:
We were in need to build a notification system that is able to send the Step Functions execution result, right after execution. Our solution sends a Slack notification with the details of each step, the start and end time of the whole process, and the URL for the logs.
In a nutshell, the following occurs:
- The Step Functions execute on a daily basis.
- EventBridge Rule triggers the lambda function “GetSFHistory”
- “GetSFHistory” gets the Step Functions execution log, builds a slack message and sends it to SQS
- SQS runs the lambda function “SendSlackMessage”
- “SendSlackMessage” sends the final message to Slack
Set Up
- Create Step Functions
- Create SQS Queue and Lambda function GetSFHistory
- Create EventBridge Rule
- Create Slack Hook and Lambda function SendSlackMessage
- Update SQS Queue
Step Functions Diagram
Lambda Function: GetSFHistory
Get Step Functions Log
First, we need to get detailed information of the Step Functions execution, for that we used the API GetExecutionHistory. You can check the AWS Step Functions API Documentation: https://docs.aws.amazon.com/step-functions/latest/apireference/API_GetExecutionHistory.html
Example response from the API:
{
"events": [
...
{
"timestamp": 1525283875.612,
"stateExitedEventDetails": {
"output": "\"Hello World!\"",
"outputDetails": {
"truncated": false
},
"name": "HelloWorld"
},
"type": "PassStateExited",
"id": 3,
"previousEventId": 2
},
...
]
}
From the response we get the information from the fields “timestamp” and “type”. Some of the important Event Types are:
- Step Functions Starts: ‘ExecutionStarted’
- Step Functions Ends (successfully): ‘ExecutionSucceeded’
- End of task: ‘TaskStateExited’
- Lambda Function ended successfully: ‘LambdaFunctionSucceeded’
- Lambda Function ended in failure: ‘LambdaFunctionFailed’
- ECS task ended successfully: ‘TaskSucceeded’
- ECS task ended in failure: ‘TaskFailed’
Function extract:
def get_sf_history(event):
client = boto3.client('stepfunctions') # Connect to Step Functions Client
lastexecutionarn = event[‘detail’][‘executionArn’]
statemachinename = lastexecutionarn.split(‘:’)[-2]
response = client.get_execution_history(
executionArn=lastexecutionarn, maxResults=300)
name = []
prev_id = []
event_type = []
start_time = ”
end_time = ”
for event in response[‘events’]:
if event[‘type’] == ‘ExecutionStarted’:
start_time = event[‘timestamp’].strftime(“%d-%m-%Y %H:%M:%S”) # Get the start time for the step functions execution
if event[‘type’] == ‘TaskStateExited’:
name.append(event[‘stateExitedEventDetails’][‘name’]) # Get the name of the task run
prev_id.append(event[‘previousEventId’]) # Get the id of the previous event
if event[‘type’] == ‘ExecutionSucceeded’:
end_time = event[‘timestamp’].strftime(“%d-%m-%Y %H:%M:%S”) # Get the end time for the step functions execution
if prev_id != []:
for event in response[‘events’]:
for id in prev_id:
if event[‘id’] == id:
event_type.append(event[‘type’]) # With the previous event id get the type of event (Expected Types: Lambda function: LambdaFunctionSucceeded or LambdaFunctionFailed, ECS Task: TaskSucceeded or TaskFailed)
if ‘prod’ in lastexecutionarn: # Get Environment from the step functions name
env = ‘prod’
else:
env = ‘test’
return statemachinename, lastexecutionarn, start_time, name, end_time, event_type, env
Build Slack message
Then we need to build the message in a way that Slack can interpret it. We added an image to have an easy way of identifying the process, for example, if the ETL process involves an API it can be an image depicting that. For the message content we have information about the Step Functions, name and arn, when it started running and when it finished, and each step, whether was succesful or not. Finally an url to go directly to the execution log, if you have access to the AWS account. To build the message we used the Slack message kit builder: https://app.slack.com/block-kit-builder.
Function extract:
def build_slack_message(name, event_type, statemachinename, lastexecutionarn, aws_region):
text = ''
block = ''
image_url = 'https://api.slack.com/img/blocks/bkb_template_images/approvalsNewDevice.png'
alt_tex_image = 'standard image'
for name_i, event_type_i in zip(name,event_type):
if 'Succeeded' in event_type_i:
result = ':white_check_mark:'
text = f"*Step:* {name_i} - *Status:* {event_type_i} {result}"
elif 'Failed' in event_type_i:
result = ':x:'
text = f"*Step:* {name_i} - *Status:* {event_type_i} {result}"
else:
text = f"*Step:* {name_i} - *Status:* {event_type_i}"
block += f"""
{{
"type": "section",
"text": {{
"type": "mrkdwn",
"text": "{text}"
}}
}}, """
message = f"""
{{
"blocks": [
{{
"type": "section",
"text": {{
"type": "mrkdwn",
"text": "*{statemachinename} ELT Process Report*\n\n*Execution Arn:* {lastexecutionarn}"
}},
"accessory": {{
"type": "image",
"image_url": "{image_url}",
"alt_text": "{alt_tex_image}"
}}
}},
{{
"type": "section",
"text": {{
"type": "mrkdwn",
"text": "*Start time:* {start_time}"
}}
}},
{{
"type": "section",
"text": {{
"type": "mrkdwn",
"text": "*End time:* {end_time}"
}}
}},
{{
"type": "divider"
}},
{block}
{{
"type": "divider"
}},
{{
"type": "actions",
"elements": [
{{
"type": "button",
"text": {{
"type": "plain_text",
"text": "State machine execution detail",
"emoji": true
}},
"value": "lastexecutionarn",
"url": "https://console.aws.amazon.com/states/home?region={aws_region}#/executions/details/{lastexecutionarn}"
}}
]
}}
]
}}"""
return message
Send message to SQS
Then, we needed to send this message to an SQS queue. First we create the SQS Queue to get the url, which is needed in the code. We will come back to the queue to finish the configuration after the other lambda is created.
Set the type to ‘standard’ and leave the rest of the details by default.
Function extract:
def send_message_to_sqs(aws_region, env):
sqs = boto3.client('sqs')
sqs_name = 'SlackNotificationsQueue'
queue_url = f'https://sqs.{aws_region}.amazonaws.com/{aws_account_id}/{sqs_name}'
# Send message to SQS queue
response = sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=2,
MessageAttributes={
'ENV': {
'DataType': 'String',
'StringValue': env
}
},
MessageBody=(block_2))
Lambda Handler
In the same Lambda function we have all three functions.
import json
import boto3
# Client info
aws_region = ''
aws_account_id = ''
def get_sf_history(event):
...
def build_slack_message(name, event_type, statemachinename, lastexecutionarn, aws_region):
...
def send_message_to_sqs(aws_region, env):
...
def lambda_handler(event, context):
statemachinename, lastexecutionarn, start_time, name, end_time, event_type, env = get_sf_history(event)
message = build_slack_message(name, event_type, statemachinename, lastexecutionarn, aws_region)
send_message_to_sqs(aws_region, env)
return {
'statusCode': 200,
'body': json.dumps('Code execution succeded!')
}
Once the lambda is created we need to add some policies to the lambda role:
- AWSLambdaBasicExecutionRole (created with the lambda)
- AmazonSQSFullAccess
- AWSStepFunctionsReadOnlyAccess
EventBridge Rule
We need to create an EventBridge Rule which will trigger the Lambda Function: GetSFHistory. The rule type for this kind of behaviour is Rule with event pattern.
For the Event pattern we used:
{
"source": ["aws.states"],
"detail-type": ["Step Functions Execution Status Change"],
"detail": {
"status": ["SUCCEEDED"],
"stateMachineArn": [""]
}
}
Lambda Trigger: GetSFHistory
Lambda Function: SendSlackMessage
For the other lambda function we need to first create the Slack bot and get the url.
Create Slack Bot
We followed the Slack documentation: https://api.slack.com/messaging/webhooks and got the url for the bot to post messages in the intended channel. We chose to save the Slack URL in AWS Parameter Store as a secure string under an environment prefix to have the whole process running in different conditions (/{env}/SLACK_URL) this way we could make changes to the bot and we didn’t need to change the code.
Upload Lambda Function as a Zip File
The function gets the message from the SQS queue and sends it to Slack.
Function extract:
import requests
import json
import sys
import boto3
def lambda_handler(event, context):
payload = event['Records'][0]['body']
env = event['Records'][0]['messageAttributes']['ENV']['stringValue'] # Get ENV from GetSFHistory Output
ssm = boto3.client('ssm', region_name='us-east-1') # Get Variables from SSM
# Select SSM parameters based on the 'env' environment variable
if env == 'test':
prefix = '/test/'
elif env == 'prod':
prefix = '/prod/'
# If env is not set, raise error to stop server from starting
else:
raise AttributeError('No value set for environment type (env)')
slack_url = ssm.get_parameter(Name= prefix + 'SLACK_URL', WithDecryption=True)['Parameter']['Value']
headers = {'Content-Type': "application/json"}
try:
r = requests.post(slack_url,
data=payload,
headers=headers)
except ValueError:
print("Couldn't send the message")
finally:
return {
'statusCode': 200,
'body': json.dumps('Code Run Succesfully')
}
Because this function uses the requests library, it needs to be in the deployment package. Therefore, we deployed the Lambda Function as a Zip file following the instructions: https://docs.aws.amazon.com/lambda/latest/dg/python-package.html
Finally we need to add some permission policies to the Lambda Function Role:
- AWSLambdaBasicExecutionRole (created with the lambda)
- AmazonSQSFullAccess
- AmazonSSMFullAccess
- kms_access
SQS Queue
As the last step we need to finish with the SQS configuration. Under ‘Lambda Triggers’ it needs to be the Lambda Function: SendSlackMessage enable, and the access policy needs to be edited with:
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam:::root"
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:::SlackNotificationsQueue"
},
{
"Sid": "__sender_statement",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:::SlackNotificationsQueue"
},
{
"Sid": "__receiver_statement",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam:::role/service-role/"
},
"Action": [
"SQS:ChangeMessageVisibility",
"SQS:DeleteMessage",
"SQS:ReceiveMessage"
],
"Resource": "arn:aws:sqs:::SlackNotificationsQueue"
}
]
}