# Scurri Webhooks Scurri Webhooks enable event notifications from Scurri to your systems. When significant events occur, such as tracking updates, Scurri can automatically push relevant data to your configured destinations, eliminating the need for continuous polling. ## Destination Types Scurri Webhooks currently supports the following destination types: - **HTTPS endpoints** - **AWS SNS Topics** You must select a destination type when creating a new Webhook configuration. Each type has its own configuration requirements and authorization schemes. More detailed information about each type is provided below. ### AWS SNS Topics AWS Simple Notification Service (SNS) is a fully managed messaging service that enables you to decouple and scale microservices, distributed systems, and serverless applications. How it works: - AWS SNS Topic is created to act as a communication channel. - Scurri publishes messages to the SNS Topic. - Your system can subscribe (via AWS SQS queues, Lambda functions, or HTTP endpoints) to the SNS Topic to receive messages. The message which is published contains a payload with details about the event that triggered the webhook, such as tracking updates. Benefits of using AWS SNS Topics: - **Scalability**: SNS can handle a large volume of messages. - **Decoupling**: Your system is decoupled from Scurri, allowing for more flexible architecture, and easier maintenance. - **Multiple Subscribers**: Multiple systems can subscribe to the same SNS Topic, allowing for easy distribution of messages to different services. - **Reliability**: AWS SNS provides built-in retry mechanisms and message durability to ensure that messages are delivered reliably. - **Performance**: SNS is designed for high throughput and low latency. #### Setting up AWS SNS Topics To set up AWS SNS Topics, you can choose between two approaches: self-managed SNS Topic or Scurri-managed SNS Topic. **Self-managed SNS Topic** In this case, you create the SNS Topic in your AWS account, and grant Scurri publisher permissions to publish messages onto the Topic. You can then set up your Webhook configuration with the ARN (Amazon Resource Name) of this Topic, and we will publish events payload there. There are several key benefits to this approach: - **Ease of Setup**: Webhook configurations are self managed. By giving us publisher permission to your Topic, you can set up your Webhook configuration independently with little to no intervention from Scurri support staff. - **Self-Management**: Setting the Topic up yourself gives you the power to self manage access, and change subscribers on the fly without the need to contact Scurri support. **Scurri-managed SNS Topic** In this case, Scurri manages and maintains the SNS Topic, to which you are given subscriber access. There are several key benefits to this approach: - **Ease of Use**: Scurri will create the Topic for you, and you can simply subscribe to it. This is a good option if you are not familiar with AWS SNS or do not want to manage the Topic yourself. - **Scurri Support**: If you encounter any issues with the Topic, Scurri support can help you resolve them quickly, as we have full control over the Topic. #### Example: Use AWS SQS to Subscribe to AWS SNS Topic There are many different ways to consume data from an SNS Topic, SQS is just one of them. ##### Setting up SQS To set up an SQS queue to listen to Scurri Webhooks events from an SNS Topic, you can use the AWS CloudFormation template provided below. This template creates an SQS queue and subscribes it to the specified SNS Topic: ```json { "AWSTemplateFormatVersion": "2010-09-09", "Description": "SQS queue to listen to tracking updates", "Parameters": { "Topic": { "Description": "ARN of remote queue", "Type": "String" } }, "Resources": { "Queue": { "Type": "AWS::SQS::Queue", "Properties": { } }, "QueueAccessPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:SendMessage", "Resource": { "Fn::GetAtt": [ "Queue", "Arn" ] }, "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "Topic" } } } } ] }, "Queues": [ { "Ref": "Queue" } ] } }, "Subscription": { "Type": "AWS::SNS::Subscription", "Properties": { "Endpoint": { "Fn::GetAtt": [ "Queue", "Arn" ] }, "Region": "eu-west-1", "Protocol": "sqs", "RawMessageDelivery": false, "TopicArn": { "Ref": "Topic" } } } }, "Outputs": { "QueueURL": { "Description": "URL of newly created SQS Queue", "Value": { "Ref": "Queue" } }, "QueueARN": { "Description": "ARN of newly created SQS Queue", "Value": { "Fn::GetAtt": [ "Queue", "Arn" ] } }, "QueueName": { "Description": "Name of newly created SQS Queue", "Value": { "Fn::GetAtt": [ "Queue", "QueueName" ] } } } } ``` Next, run the below command to start up an instance on AWS, replacing the `` with the ARN of the Topic you want to subscribe to. ```shell aws cloudformation \ create-stack \ --capabilities CAPABILITY_IAM \ --stack-name "scurri-tracking-updates" \ --template-body "file://scurri-subscription.json" \ --parameters ParameterKey=Topic,ParameterValue= ``` ##### Consuming data from SQS Once you have set up the SQS queue, you can consume messages from it. Below is a Python script that demonstrates how to consume data from the SQS queue subscribed to the SNS Topic. This script uses the `boto3` library to interact with AWS SQS and processes messages received from the queue. ```python import boto3 import json import logging import time # Replace with your SQS queue URL SQS_QUEUE_URL = "" AWS_REGION = "eu-west-1" # Max number of messages to retrieve in one call (1-10) MAX_MESSAGES = 5 # Sleep interval between polling attempts (in seconds). MAX_MESSAGES / SLEEP_INTERVAL = rate limit e.g. 5 msgs/sec SLEEP_INTERVAL = 1 # Visibility timeout in seconds (how long the message is hidden while processing) VISIBILITY_TIMEOUT = 30 # Long polling wait time in seconds (to reduce latency and costs) WAIT_TIME_SECONDS = 20 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) sqs_client = boto3.client('sqs', region_name=AWS_REGION) def do_something_with_data(data): """ Placeholder function to represent processing of the extracted data. Replace this with actual business logic (e.g., saving to DB, calling another service, etc.) """ logger.info(f"Data processed: {data}") def process_sns_message(sqs_message): """ Extracts and processes the actual data from the SQS message body. If 'Raw Message Delivery' is NOT enabled on the SNS subscription, the SQS message contains a JSON wrapper from SNS. """ try: # The SQS message body is a string, which we load as JSON sns_wrapper = json.loads(sqs_message['Body']) # The actual data/payload from the original publisher is in the 'Message' field payload_string = sns_wrapper.get('Message') if not payload_string: logger.warning(f"Message ID {sqs_message.get('MessageId')} has no 'Message' payload.") return False # Assuming the actual payload is also a JSON string data_payload = json.loads(payload_string) # --- Core Processing Logic --- logger.info(f"Processing message from Topic ARN: {sns_wrapper.get('TopicArn')}") logger.info(f"Received data type: {data_payload.get('event_type')}") # Save the data to a database, call another service, etc. do_something_with_data(data_payload) # Return True if processing was successful return True except json.JSONDecodeError as e: logger.error(f"Failed to parse message body (JSON error): {e}") # Return False to indicate failure, allowing the message to eventually be retried or moved to DLQ return False except Exception as e: logger.error(f"An unexpected error occurred during processing: {e}") return False def consume_messages(): """ Main loop for polling, processing, and deleting messages from the SQS queue. """ logger.info(f"Starting SQS consumer on queue: {SQS_QUEUE_URL}") while True: try: # 1. Receive Messages (Long Polling) response = sqs_client.receive_message( QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=MAX_MESSAGES, VisibilityTimeout=VISIBILITY_TIMEOUT, WaitTimeSeconds=WAIT_TIME_SECONDS # Enable long polling ) messages = response.get('Messages', []) if not messages: logger.info("No new messages. Waiting...") continue logger.info(f"Received {len(messages)} messages. Starting processing.") messages_to_delete = [] for message in messages: receipt_handle = message['ReceiptHandle'] # 2. Process Message if process_sns_message(message): # 3. If processing succeeds, prepare to delete the message messages_to_delete.append({ 'Id': message['MessageId'], 'ReceiptHandle': receipt_handle }) else: # If processing fails, do NOT delete. # The message will reappear after the Visibility Timeout. logger.error(f"Processing failed for Message ID {message['MessageId']}. Will be retried.") # 4. Delete Messages in a batch if messages_to_delete: delete_response = sqs_client.delete_message_batch( QueueUrl=SQS_QUEUE_URL, Entries=messages_to_delete ) if delete_response.get('Failed'): logger.error(f"Failed to delete {len(delete_response['Failed'])} messages: {delete_response['Failed']}") else: logger.info(f"Successfully deleted {len(delete_response['Successful'])} messages.") except Exception as e: logger.critical(f"A critical error occurred in the main loop: {e}") time.sleep(SLEEP_INTERVAL) if __name__ == "__main__": consume_messages() ``` The script from the above example implements the following 3 steps: 1. Poll the SQS, checking to see if any messages are available. 2. Process the data. 3. Delete the messages from the queue. ### HTTPS Destination Type The HTTPS destination type allows you to receive data from Scurri via a secure HTTP endpoint. This is useful for integrating Scurri with your existing systems or applications that can handle HTTP requests. Scurri Webhook delivers event via HTTPS POST request to the provided endpoint URL. The request body contains a JSON payload with details about the event that triggered the Webhook, such as tracking updates. #### Authentication Types Scurri Webhooks support multiple authentication types for the HTTPS Destinations. ##### Basic Authentication Basic authentication supports sending a username and password via the `Authorization` header in the format: ``` Authorization: basic ``` with the username and password string being encoded to base64. e.g. the string `scurri:connect` would become `c2N1cnJpOmNvbm5lY3Q=`. Username and password must be provided when configuring the Webhook, and are stored securely on Scurri server. ##### Token-based Authentication Token-based authentication supports sending an authentication token via the `Authorization` header in the format: ``` Authorization: token ``` Token must be provided when configuring the Webhook, and is stored securely on Scurri server. ##### Oauth2 Oauth2 based Authentication supports retrieving a token from an authorization endpoint, with this token being used to authenticate against your webhook destination endpoint when sending data. The supported grant type is "client credential". ##### Custom Authentication Custom authentication supports authentication via a non-standard header, e.g. 'x-api-key'. At least one header must be specified to use this auth type. #### Example: How to Retrieve Data over HTTPS To receive data from Scurri Webhooks via HTTPS, you need to set up an HTTPS endpoint that can handle POST requests. Here is an example of how to set up a simple HTTPS endpoint using Python, the FastAPI framework and Celery for asynchronous task processing: ```python import logging from fastapi import FastAPI, HTTPException from tasks import handle_tracking_events, handle_other_event logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() @app.post("/webhook") async def handle_webhook(payload: dict): """ Handles incoming POST requests to the /webhook endpoint. """ logger.info(f"Received webhook event: {payload.get('event_type')}") try: # Example: Process different event types if payload.get('event_type') == "tracking_update": # Delay task to handle tracking info asynchronously handle_tracking_events.delay(payload) elif payload.get('event_type') == "another_event_type": # Delay task to handle another event asynchronously handle_other_event.delay(payload) else: logger.warning(f"Unhandled event type received: {payload.get('event_type')}") except Exception as e: logger.error(f"Error processing webhook: {e}") raise HTTPException(status_code=400, detail="Invalid payload") return {"status": "success", "message": "Webhook received and queued for processing"} ``` With this approach, the server receives the inbound request and processes it asynchronously, responding straight away. This way, we can quickly acknowledge receipt of the Webhook and avoid holding up the Scurri system with long processing times. Scurri Webhooks expects a timely response from your endpoint, and will timeout if it takes too long to respond.