The Problem: Silence in the Face of Workflow Anomalies

In the realm of SQS workflow management, I encountered a perplexing issue with SNS alarms. Previously, I relied on SNS alarms to notify me when messages populated in my SQS queues, especially when they found their way into the Dead Letter Queue (DLQ). However, the alarms seemed to stay silent when my workflow triggered multiple times, causing messages to accumulate in the DLQ.

Upon closer inspection, the root cause surfaced: the SNS alarm remained in an ALARM state, because the DLQ was not cleared. This persistent state became problematic because, if my workflow executed multiple times a day and developers chose to ignore or not manually clear the DLQ, the alarm would never transition back to an OK state. Consequently, vital email notifications weren’t triggered, leaving unnoticed issues lingering in the workflow.

The Exploration: Unveiling Potential Solutions

To address this challenge, I explored different ways to solve the problem. My goal was to find a robust method that didn’t involve manual intervention or introduce unnecessary complexities. Here’s a glimpse into the options I considered:

  1. Lambda Function for Email Notifications: The conventional approach involves using a Lambda function to check the SQS queues and send email notifications. However, this approach seemed resource-intensive and introduced additional complexities.
  2. Scheduled Lambda Invocations: Automate the Lambda function to run at scheduled intervals. While this solves the problem, it adds the overhead of scheduling and managing Lambda executions.
  3. Enhanced SNS Alarm Configuration: Modify the SNS alarm configurations to handle cases where the DLQ is emptied. Unfortunately, this approach proved challenging due to the inherent limitations of SNS alarm settings.

Solution Architecture

After careful consideration, I crafted a solution using AWS Step Functions — a versatile orchestration service. This Step Function workflow tracks messages in the SQS queues after each run of the main workflow.

Step 1: DLQMessageReceiver

So, the DLQMessageReceiver is the starting point of our state machine. It’s like the entry gate where everything begins. Here’s what it does:


"DLQMessageReceiver": {
  "Type": "Parallel",
  "Branches": [
    {
      "StartAt": "ProcessDLQ",
      "States": {
        "ProcessDLQ": {
          ...
        }
      }
    }
  ],
  "Parameters": {
    "DLQs": [
      {
        "QueueUrl": "first_dlq_queue_url"
      },
      {
        "QueueUrl": "second_dlq_queue_url"
      }
    ]
  },
  "End": true
}

This part sets up the overall structure of our state machine. It’s a Parallel state, meaning it can handle multiple things at once, but in our case, we’re just dealing with one branch. Here’s what we’re doing:

  • We have some DLQs that we want to check for unprocessed messages.
  • We start by specifying the DLQs we want to monitor. For example, “first_dlq_queue_url” and “second_dlq_queue_url”.
  • Once we’ve defined our DLQs, we move on to the next step, which is processing each DLQ to see if there are any unprocessed messages.

Note: This solution is designed to be scalable, allowing monitoring of multiple Dead Letter Queues (DLQs) for unprocessed messages.

This scalable approach ensures that the solution can accommodate an increasing number of queues without significant modifications, providing flexibility and adaptability to evolving workflow demands.

Step 2: ProcessDLQ

Now, let’s write the code for processing each DLQ. This is where the real action happens:

"ProcessDLQ": {
  "Type": "Map",
  "ItemsPath": "$.DLQs",
  "MaxConcurrency": 1,
  "Iterator": {
    "StartAt": "ReceiveMessages",
    "States": {
      "ReceiveMessages": {
        ...
      }
    }
  },
  "End": true
}

In this step, we’re using a Map state, which allows us to iterate over each DLQ in the list. Here’s what each part does:

  • "Type": "Map": This indicates that we’re using a Map state, which iterates over a list of items (in our case, DLQs).
  • “ItemsPath”: “$.DLQs”: This specifies the path to the array of DLQs that we defined earlier in the DLQMessageReceiver step.
  • “MaxConcurrency”: 1: This ensures that only one DLQ is processed at a time to avoid overwhelming our system.
  • “Iterator”: This is where we define what happens during each iteration, starting with the ReceiveMessages state.

Step 3: ReceiveMessages

Let’s move forward and write the code for the ReceiveMessages state. This state is responsible for retrieving messages from the SQS queue associated with each DLQ.

Here’s the structure of the ReceiveMessages state within the Iterator of the ProcessDLQ step:

"ReceiveMessages": {
 "Type": "Task",
 "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
 "Parameters": {
 "QueueUrl.$": "$.QueueUrl"
 },
 "Next": "CheckMessages",
 "ResultPath": "$.receivedMessage"
}


Now, let’s break down what each part does:

  • "Type": "Task": This indicates that we’re using a Task state, which performs a specific action.
  • "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage": This specifies the ARN (Amazon Resource Name) for the receiveMessage action in SQS. It tells AWS Step Functions where to find the code or service to execute.
  • "Parameters": {"QueueUrl.$": "$.QueueUrl"}: Here, we’re passing the QueueUrl parameter to the receiveMessage action. The $.QueueUrl references the QueueUrl of the current DLQ being processed.
  • "Next": "CheckMessages": This specifies the next state to transition to after the task (receiving messages) is completed.
  • "ResultPath": "$.receivedMessage": This specifies where the output of the task (received messages) should be stored in the state’s output.

Now, we’re ready to move on to the next step, which is checking if messages were received from the SQS queue.

Step 4: CheckMessages

Let’s proceed by writing the code for the CheckMessages state. This state is responsible for determining whether messages were received from the SQS queue associated with each DLQ.

Here’s the structure of the CheckMessages state within the Iterator of the ProcessDLQ step:

"CheckMessages": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.receivedMessage.Messages",
"IsPresent": true,
"Next": "PublishAlert"
}
],
"Default": "EndStateMachine"
}

Now, let’s break down what each part does:

  • "Type": "Choice": This indicates that we’re using a Choice state, which allows us to make decisions based on the output of the previous state.
  • "Choices": This is an array of conditions to evaluate. In our case, we’re checking if the “Messages” array is present in the output of the ReceiveMessages state.
  • "Variable": "$.receivedMessage.Messages": This specifies the variable to evaluate, which is the “Messages” array from the output of the ReceiveMessages state.
  • "IsPresent": true: This condition checks if the “Messages” array is present.
  • "Next": "PublishAlert": If the “Messages” array is present, we transition to the PublishAlert state to send an alert about unprocessed messages.
  • "Default": "EndStateMachine": If the “Messages” array is not present, we transition to the end of the state machine, indicating that no unprocessed messages were found.

Now that we have defined the CheckMessages state, we’re ready to move on to the final step, which is publishing an alert if unprocessed messages are found

Step 5: PublishAlert

Let’s proceed by writing the code for the final step, which is the PublishAlert state. This state is responsible for publishing an alert to an SNS topic if unprocessed messages are found in the SQS queue.

Here’s the structure of the PublishAlert state:

"PublishAlert": {
 "Type": "Task",
 "Resource": "arn:aws:states:::sns:publish",
 "Parameters": {
 "TopicArn": "${AlertSNSTopicArn}",
 "Message.$": "States.Format('ALERT: Unprocessed messages found in: {}', $.QueueUrl)"
 },
 "End": true
}


Let’s take a closer look at the execution of the devised Step Function solution:

1.Email Sent: In the first iteration, specifically concerning the first queue, the Step Function successfully executed. The workflow identified unprocessed messages in the specified Dead Letter Queue (DLQ), retrieved them, and sent an alert email through the configured SNS topic.

2. Email Not Sent: Now, let’s explore the second iteration, focusing on the second queue. In this case, the Step Function executed successfully, but no email notification was sent as there were no messages found in the second DLQ during this particular execution.

The Benefits

  1. Automated Oversight: The Step Function solution automates the workflow oversight process, eliminating the need for manual intervention.
  2. Developer Relief: Developers are freed from the task of manually clearing queues, reducing their workload and allowing them to focus on more critical aspects of development.
  3. Timely Notifications: Email notifications are now sent promptly whenever unprocessed messages are detected, ensuring quick awareness of potential issues.
  4. Scalability: The solution is designed to handle any number of queues efficiently. It executes in loops, making it easily scalable to accommodate growing workflow demands.

Here is the complete code for the Step Function setup:

{
  "Comment": "This state machine checks for unprocessed messages in specified Dead Letter Queues (DLQs) and sends an alert if any are found.",
  "StartAt": "DLQMessageReceiver",
  "States": {
    "DLQMessageReceiver": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ProcessDLQ",
          "States": {
            "ProcessDLQ": {
              "Type": "Map",
              "ItemsPath": "$.DLQs",
              "MaxConcurrency": 1,
              "Iterator": {
                "StartAt": "ReceiveMessages",
                "States": {
                  "ReceiveMessages": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
                    "Parameters": {
                      "QueueUrl.$": "$.QueueUrl"
                    },
                    "Next": "CheckMessages",
                    "ResultPath": "$.receivedMessage"
                  },
                  "CheckMessages": {
                    "Type": "Choice",
                    "Choices": [
                      {
                        "Variable": "$.receivedMessage.Messages",
                        "IsPresent": true,
                        "Next": "PublishAlert"
                      }
                    ],
                    "Default": "EndStateMachine"
                  },
                  "PublishAlert": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::sns:publish",
                    "Parameters": {
                      "TopicArn": "${AlertSNSTopicArn}",
                      "Message.$": "States.Format('ALERT: Unprocessed messages found in: {}', $.QueueUrl)"
                    },
                    "End": true
                  },
                  "EndStateMachine": {
                    "Type": "Succeed"
                  }
                }
              },
              "End": true
            }
          }
        }
      ],
      "Parameters": {
        "DLQs": [
          {
            "QueueUrl": "first_dlq_queue_url"
          },
          {
            "QueueUrl": "second_dlq_queue_url"
          }
        ]
      },
      "End": true
    }
  }
}