Serverless EventBridge Pipes

Example of using Amazon EventBridge Pipes with code example written in Python and the AWS CDK

Serverless Advocate
9 min readFeb 4, 2024

--

Preface

✔️ We discuss Amazon EventBridge Pipes and what they are.
✔️ We walk through an example in Python and the AWS CDK.

Introduction 👋🏽

In this article, we are going to cover Amazon EventBridge Pipes by building a fictitious dental app, which will be built using the AWS CDK and Python.

Our fictitious company ‘LJ Dentists’ that we are using for this article

The ‘LJ Dentists’ app lets people book an appointment with their closest dentist.

A person booking a dental appointment with their closet dentist for their son

If they have been before we check their preferred contact method before sending an email, SMS or no correspondence; all using Amazon EventBridge Pipes.

The son receives an SMS as he has been to this dentist before and has preferred contact details

You can find the full code example for the article here:

What are we building? 🛠️

We are going to allow patients to book appointments online using the following architecture:

We can see that patients:

  • Book appointments online using our Amazon API Gateway REST API.
  • A Lambda function writes the appointment details to a DynamoDB table.
  • We use DynamoDB streams to listen to changes to the table, and if a record is created (filter only on record creation), we use Amazon EventBridge Pipes to listen to the stream as a source.
  • We hydrate the appointment information on the stream record by invoking a Lambda function which checks to see if a matching record exists in the contact preferences table; adding the preference to the stream record if it does.
  • Finally, Amazon EventBridge Pipes has a target set as SQS where we store our appointments ready for processing.

👇 Before we go any further — please connect with me on LinkedIn for future blog posts and Serverless news https://www.linkedin.com/in/lee-james-gilmore/

What are Amazon EventBridge Pipes?

Amazon EventBridge Pipes helps you create point-to-point integrations between event producers and consumers with optional transform, filter and enrich steps. EventBridge Pipes reduces the amount of integration code you need to write and maintain when building event-driven applications. This is shown in the diagram below for our example:

We can see from the above diagram that we have:

  • DynamoDB streams as the source.
  • We add some additional filtering to ensure we only receive newly inserted records (i.e. not deletions or updates).
  • We enrich the appointment data from the stream with additional information using an enrichment Lambda function which reads contact information from a separate database.
  • We finally have a target set as an SQS queue so we can further process the appointment records.

It is worth noting that there are many other different services and configurations we can use which are shown below:

https://aws.amazon.com/eventbridge/pipes/

Talking through key code 👨‍💻

Now let’s talk through some of the key code.

We start by splitting up our CDK Application into Stateful and Stateless resources (stacks) as we can see below:

#!/usr/bin/env python3
import os

import aws_cdk as cdk
from stateful.stateful import DentistsStatefulStack
from stateless.stateless import DentistsStatelessStack

app = cdk.App()

# we split our app into stateful and stateless resources
DentistsStatefulStack(app, "DentistsStatefulStack")
DentistsStatelessStack(app, "DentistsStatelessStack")

app.synth()

If we first look at our Stateful stack, we will see that we set up our two DynamoDB tables; one for the appointments, and one for the preferred contact details:

Note: On our main appointments table we add DynamoDB streams so we can listen to changes i.e. change data capture.

from aws_cdk import CfnOutput, RemovalPolicy, Stack
from aws_cdk import aws_dynamodb as dynamodb
from constructs import Construct


class DentistsStatefulStack(Stack):

def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

# add the dynamodb table for storing appointments which has streams enabled
table = dynamodb.Table(
self, 'DentistTable',
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
table_name='DentistTable',
stream=dynamodb.StreamViewType.NEW_IMAGE,
removal_policy=RemovalPolicy.DESTROY,
partition_key=dynamodb.Attribute(
name='id',
type=dynamodb.AttributeType.STRING
)
)

# add the contact preferences dynamodb table which id is the email address
contact_table = dynamodb.Table(
self, 'DentistContactsTable',
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
table_name='DentistContactsTable',
removal_policy=RemovalPolicy.DESTROY,
partition_key=dynamodb.Attribute(
name='id',
type=dynamodb.AttributeType.STRING
)
)

# add a stack output for the table name
CfnOutput(
self, 'DentistDynamoDBTableName',
value=table.table_name,
description='Name of the DynamoDB table',
export_name='DentistDynamoDBTableName'
)

# add a stack output for the contact table name
CfnOutput(
self, 'DentistContactDynamoDBTableName',
value=contact_table.table_name,
description='Name of the Contact Preferences DynamoDB table',
export_name='DentistContactDynamoDBTableName'
)

# add a stack output for the table stream arn of the appointments table
CfnOutput(
self, 'DentistDynamoDBTableStreamArn',
value=table.table_stream_arn,
description='DynamoDB table stream ARN',
export_name='DentistDynamoDBTableStreamArn'
)

Now that we have our Stateful resources set up, in our Stateless stack, we first create our Lambda function for creating an appointment:

# create the lambda function for creating an appointment
create_appointment_lambda = aws_lambda.Function(
self, 'CreateAppointment',
runtime=aws_lambda.Runtime.PYTHON_3_12,
handler='create_appointment.handler',
code=aws_lambda.Code.from_asset(os.path.join(DIRNAME, 'src')),
function_name='CreateAppointment',
environment={
'dynamodb_table': dynamodb_table_name,
},
)

We also add a second Lambda function which will be used in our pipe to enrich the data it gets from DynamoDB streams:

# create the lambda function for retrieving contact details
get_contact_details = aws_lambda.Function(
self, 'GetContactDetails',
runtime=aws_lambda.Runtime.PYTHON_3_12,
handler='get_contact_details.handler',
code=aws_lambda.Code.from_asset(os.path.join(DIRNAME, 'src')),
function_name='GetContactDetails',
environment={
'contacts_dynamodb_table': contacts_dynamodb_table_name,
},
)

We next add an Amazon SQS queue which will be our target for our pipe:

# create the sqs queue which are pipes will send messages too (the target)
sqs_queue = sqs.Queue(
self, 'AppointmentsQueue',
queue_name='AppointmentsQueue',
removal_policy=RemovalPolicy.DESTROY
)

We then create the IAM policy for the Amazon EventBridge pipe itself which allows it to connect to the other resources for source, enrichment and target:

# create the pipe policy and role for the source (dynamodb streams)
pipe_source_policy = iam.PolicyStatement(
actions=[
'dynamodb:DescribeStream',
'dynamodb:GetRecords',
'dynamodb:GetShardIterator',
'dynamodb:ListStreams'
],
resources=[stream_arn],
effect=iam.Effect.ALLOW
)

# create the target policy to allow the pipe to publish messages to sqs (target)
pipe_target_policy = iam.PolicyStatement(
actions=['sqs:SendMessage'],
resources=[sqs_queue.queue_arn],
effect=iam.Effect.ALLOW
)

# create the policy to allow the pipe to invoke our lambda (enrichment)
pipe_enrichment_policy = iam.PolicyStatement(
actions=['lambda:InvokeFunction'],
resources=[get_contact_details.function_arn],
effect=iam.Effect.ALLOW
)

# create the pipe role
pipe_role = iam.Role(self, 'PipeRole',
assumed_by=iam.ServicePrincipal('pipes.amazonaws.com'),
)

# add the three policies to the role
pipe_role.add_to_policy(pipe_source_policy)
pipe_role.add_to_policy(pipe_target_policy)
pipe_role.add_to_policy(pipe_enrichment_policy)

We can now create our pipe using the following code:

# create the eventbridge pipe that has a filter just for new items in dynamodb
pipe = pipes.CfnPipe(self, 'Pipe',
role_arn=pipe_role.role_arn,
source=stream_arn,
log_configuration=pipes.CfnPipe.PipeLogConfigurationProperty(
cloudwatch_logs_log_destination=pipes.CfnPipe.CloudwatchLogsLogDestinationProperty(
log_group_arn=log_group.log_group_arn
),
level='INFO',
),
name='DentistPipe',
source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
dynamo_db_stream_parameters=pipes.CfnPipe.PipeSourceDynamoDBStreamParametersProperty(
starting_position='LATEST',
),
filter_criteria=pipes.CfnPipe.FilterCriteriaProperty(
filters=[pipes.CfnPipe.FilterProperty(
pattern=json.dumps({'eventName': [ { 'prefix': 'INSERT' } ]})
)]
),
),
enrichment=get_contact_details.function_arn,
target=sqs_queue.queue_arn,
)
pipe.apply_removal_policy(RemovalPolicy.DESTROY)

Note: We also add the code for our Amazon API Gateway and Lambda function integration which you can see in the repo.

At this point, we have all of our infrastructure set up, but let’s have a quick look at the Lambda function code, starting with the ‘create_appointment.py’ file:

import json
import os
import uuid
from http import HTTPStatus

import boto3
from boto3.dynamodb.types import TypeSerializer

dynamodb_table = os.getenv('dynamodb_table')
dynamodb_client = boto3.client('dynamodb')
serializer = TypeSerializer()

def handler(event, context):
try:
# parse the request data from the event and grab the body
request_data = json.loads(event['body'])

# add a new appointment id using uuidv4
request_data['id'] = str(uuid.uuid4())

# serialize the payload into dynamodb format
appointment_data = {k: serializer.serialize(v) for k,v in request_data.items()}

# add the item to the dynamodb table
dynamodb_client.put_item(TableName=dynamodb_table, Item=appointment_data)

body = {
'message': request_data,
'statusCode': HTTPStatus.CREATED,
}

# send the response back to api gateway in the correct shape
response = {
'statusCode': HTTPStatus.CREATED,
'body': json.dumps(body, indent=2),
'headers': {
'content-type': 'application/json',
},
}

except Exception as e:
response = {
'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR.value,
'body': f'Exception={e}',
}

return response

We can see from the code above that we create a basic function which takes the appointment from the API Gateway event, adds a new unique ID (uuid), and writes the record to the appointments table.

When records are added to the table the stream will be invoked with any record changes, and our pipe will pick up the changes as a source.

Next, let’s look at the enrichment Lambda function which is invoked as part of the pipe:

import os

import boto3
from boto3.dynamodb.types import TypeDeserializer

dynamodb_table = os.getenv('contacts_dynamodb_table')
dynamodb_client = boto3.client('dynamodb')
deserializer = TypeDeserializer()

def handler(event, context):
try:
# parse the request data from the stream event and grab the body
new_image_data = event[0]['dynamodb']['NewImage']

response = {k: deserializer.deserialize(v) for k,v in new_image_data.items()}

# get the email address from the streams body
email_address = response['appointment']['patient']['email']
print('Email: ', email_address)

# check whether or not this email address exists in the contact databases
account = dynamodb_client.get_item(
TableName=dynamodb_table,
Key={'id': {'S': email_address}}
)

if 'Item' in account:
item = account['Item']

# deserialize the dynamodb item if found
account_data = {k: deserializer.deserialize(v) for k,v in item.items()}
print('Contact information found: ', account_data)

# add the account data onto the reponse so its available on the sqs message
response['preferredMethod'] = account_data['preferredMethod']
else:
# add the account data onto the reponse
response['preferredMethod'] = 'none'
print('Contact information not found.')

except Exception as e:
print(e)
response = {
'error': f'Exception={e}',
'body': response,
}

print('response: ', response)
return response

We can see from the code above that we take the email_address property from the stream record, and do a lookup on our preferred contact DynamoDB table. If a record exists, we will take the preferred method and add it to the appointment record. If a matching record doesn’t exist, we add a preferredMethod of ‘none’.

If you want to test this end to end you just need to deploy the application and add the following item into our Contacts DynamoDb table if you want to add a preferredMethod:

Note: There is a Postman file in the repo which is all set up to allow you to test the functionality through the REST API.

{
"id": "john.doe@example.com",
"preferredMethod": "email"
}

If a record doesn’t exist it will default to ‘none’ as discussed above.

Conclusion

I hope you found that useful as a practical example of using Amazon EventBridge pipes in your Serverless solutions. Pipes allow us to remove the Lambda glue code we had to use in the past and gives us a fantastic way of enriching data from a source before pushing that data to a target.

Wrapping up 👋🏽

I hope you enjoyed this article, and if you did then please feel free to share and feedback!

Please go and subscribe to my YouTube channel for similar content!

I would love to connect with you also on any of the following:

https://www.linkedin.com/in/lee-james-gilmore/
https://twitter.com/LeeJamesGilmore

If you enjoyed the posts please follow my profile Lee James Gilmore for further posts/series, and don’t forget to connect and say Hi 👋

Please also use the ‘clap’ feature at the bottom of the post if you enjoyed it! (You can clap more than once!!)

About me

Hi, I’m Lee, an AWS Community Builder, Blogger, AWS certified cloud architect, and Global Head of Technology & Architecture based in the UK; currently working for City Electrical Factors (UK) & City Electric Supply (US), having worked primarily in full-stack JavaScript on AWS for the past 6 years.

I consider myself a serverless advocate with a love of all things AWS, innovation, software architecture, and technology.

*** The information provided are my own personal views and I accept no responsibility for the use of the information. ***

You may also be interested in the following:

--

--

Global Head of Technology & Architecture | Serverless Advocate | Mentor | Blogger | AWS x 7 Certified 🚀