Share:

Build an Event-Driven Architecture with FastAPI and Redis Pub/Sub & Deploy it in Kubernetes

Cloud-native applications are made up of a collection of microservices. Generally speaking, these services run in containers, with each container being responsible for a single process within the larger scope of the application.

There are lots of benefits to this approach – container images are kept small, which allows for more efficient deployments. Isolated processes within the larger application can be scaled up as needed, rather than having to scale the entire application, which reduces hosting costs. And development teams can work on different aspects of an application at the same time, which speeds up development cycles for new features and bug fixes.

The common theme here is efficiency. Microservice-based apps that leverage event-driven architectures take this line of thinking a step further. They are – in some ways – more complex than request/response-based microservice architectures, but they can also be that much more efficient in terms of both the rate of development and application processing speed.

How do event-driven architectures increase the rate of feature development?

This diagram of the project we’ll build today illustrates this concept. Its flow begins with a web-api that publishes some data to a Redis channel. Notice that all subsequent networking happens in the same way. Data is published to a Redis channel, rather than being sent directly from one service to the next, as you would find in a REST implementation of an app like this.

Event-Driven Architecture


Because the services aren’t networked together directly, it is much simpler to add new services as needed. In the above diagram, we are adding “some future service” simply by publishing some data to a “future channel” in Redis and having our new service subscribe to that channel in exactly the same way that each of the other services is already doing on the existing Redis channels.

In fact, we may not even need to create a new channel in Redis if the data that needs to be acted upon is already being published to an existing channel. But either way, with event-driven architectures, new services can be very easily added without changing any existing services, which makes it much faster to add new features to an application – especially when different teams are responsible for developing different services within the larger app.

Why is processing speed so important in cloud-native applications?

The faster the rate at which a cloud-native application can handle requests, the less horizontal scaling will be required. This means that the same amount of traffic can be handled with less infrastructure overhead, which means it will cost less to host the application in the cloud.

Event-based architectures can dramatically increase the number of requests a given application can handle, because each service is operating independently of the larger application, which means that, for example, the web-api illustrated above only needs to parse an incoming HTTP request and publish data to Redis before it can move on to handling the next request.

A broker like Redis, in turn, can handle a very large number of messages without scaling since it has almost no computation logic or IO operations, as opposed to any application service that might call other services, third party APIs or just compute for a long time.

This same principle is true for each of the other application services included within the app, as they are all networked through Redis in the same way.

Building our example application with FastAPI, Redis and MongoDB

As a fun, but still practical example of the above concepts, we’ll build a web app that takes an email address, a latitude and a longitude and then employs an event-driven architecture to compute that location’s distance from Velocity’s offices. Once we have the distance computed, we’ll save it in a MongoDB instance and then query the DB before we write an “email message” to a local log.

The full project is available on GitHub, and each of the Redis “consumer” services are built similarly enough that we won’t detail each of them here. But there are a few key points that are specific to FastAPI and our Asyncio-based services that we will walk through in detail.

FastAPI

First, the FastAPI portion of the application. Here, we have a simple HTTP API that handles a GET and a POST request, which allows us to return some simple HTML for our index view and to handle some form data sent via a POST request from the browser.

import json
from fastapi import FastAPI, Form, BackgroundTasks
from fastapi.responses import HTMLResponse
from redis import pub

from index import index_view

app = FastAPI()

@app.get("/")
async def index():
   return HTMLResponse(content=index_view, status_code=200)

async def publish_to_redis(data: dict):
   await pub.publish('raw_input', json.dumps(data))

@app.post("/distance")
async def get_distance(background_tasks: BackgroundTasks,
email: str = Form(),
lat: str = Form(),
long: str = Form()):
   data = {'email': email, 'lat': lat, 'long': long}
   background_tasks.add_task(publish_to_redis, data)
   
   return HTMLResponse(content=index_view, status_code=201)

Things get interesting when we pass the incoming form data to a BackgroundTask, a built-in feature in FastAPI that allows us to handle an incoming request by calling a function that runs in its own asyncio event loop independent from the HTTP API itself.

Specifically, we have defined an asynchronous function called publish_to_redis, which does exactly what you would think – it publishes some data to a Redis channel. But, if you look closely at the above snippet, you’ll see that we call that function from the "/distance" route handler.

In the body of this get_distance function, we parse the incoming form data, build ourselves a Python dictionary, and then add our publish_to_redis function, along with our newly built payload, to an instance of the BackgroundTasks class via its included  add_task method.

Assigning this function call to its own Asyncio event loop via the BackgroundTasks class makes our app even faster, because our API has even less work to do when a request comes in before it can turn around and handle the next request.

Dockerfile

FROM python:3.10
COPY ./src/redis.py .
COPY ./src/api_service/ .
RUN pip install -r requirements.txt
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]

This Dockerfile is super straight-forward, but it’s worth detailing here because of the way that we are starting the FastAPI service. Notice that we aren’t calling a Python file directly. Instead, we’re starting the app with uvicorn, a production ready, ASGI web server implementation built specifically for asynchronous applications written in Python.

Redis

Throughout the application, including in the web api above, when we call pub.publish() we're using an instance of aioredis to publish and subscribe to our various Redis channels. This – again – makes our app even faster, because not only are we publishing to a Redis instance in memory, we are doing so asynchronously.

import os
import aioredis

REDIS_HOST = os.environ.get('REDIS_HOST')
REDIS_PORT = os.environ.get('REDIS_PORT')

redis = aioredis.from_url(f'redis://{REDIS_HOST}:{REDIS_PORT}')
psub = redis.pubsub()
pub = aioredis.Redis.from_url(f'redis://{REDIS_HOST}:{REDIS_PORT}',
   decode_responses=True)

Distance Service (Data Processing Redis Consumer)

Next, we have an example of our Redis consumers. This is the “Data Processing Service” in our diagram above. We’ll be using geopy to calculate the distance from the submitted lat/long and Velocity.

import asyncio
import json
from geopy.distance import geodesic
from redis import psub, pub

async def reader():
   async with psub as p:
       await p.subscribe('raw_input')
       if p != None:
           while True:
               message = await p.get_message(ignore_subscribe_messages=True)
               await asyncio.sleep(0)
               if message != None:
                   data = json.loads(message['data'])
                   try:
                       cd = CalculateDistance(data)
                       cd.distance_from_velocity()
                       calculated_data = json.dumps({'email': data['email'],
                       'distance_from_velocity': cd.distance})
                       await pub.publish('calculated_distance', calculated_data)
                   except Exception as e:
                       print(str(e))
                       pass
     
class CalculateDistance:
   def __init__(self, data):
       self.input_lat = data['lat']
       self.input_long = data['long']
       self.velocity_lat = 32.080058
       self.velocity_long = 34.864535
       self.distance = None
     
   def distance_from_velocity(self):
       self.distance = geodesic(
           (self.input_lat, self.input_long),
           (self.velocity_lat, self.velocity_long)).miles

if __name__ == '__main__':
       asyncio.run(reader())

Above, we have defined a class to handle the distance calculation, and we have an async function called reader. To make our reader listen to its assigned channel indefinitely, we are using an async with statement that takes the psub object we defined in our redis.py file.

The reader function waits for confirmation that it has subscribed to the Redis channel raw_data which the web_api will be publishing user input to. Once it receives confirmation, it listens to that channel for any message that is not None indefinitely with a while loop. Then, for each message that it receives, it parses the included data (the Python dict we created above) and then uses those values to compute the distance.

Finally, it builds a new dict calculated_data which it then publishes to the Redis channel calculated_distance in exactly the same way that the web_api published to the parallel raw_data channel.

DB Service

Our db service is listening to this next calculated_distance channel in the same way that the distance service is listening to the raw_data channel. Here, though, instead of doing some data processing, it is inserting a record into MongoDB. In fact, each of the Redis consumers defined in the project work this way.

Here, though, instead of publishing some newly computed data, we are publishing only the user’s email address to the Redis channel user:email. By naming our Redis channels according to the data that a consumer of that channel will receive, we make it much more intuitive for future development efforts that might make use of them.

import asyncio
import json
from redis import psub, pub
from mongo import users

async def reader():
   async with psub as p:
       await p.subscribe('calculated_distance')
       if p != None:
           while True:
               message = await p.get_message(ignore_subscribe_messages=True)
               await asyncio.sleep(0)
               if message != None:
                   data = json.loads(message['data'])
                   insert_into_mongo(data)
                   await pub.publish('user:email', data['email'])

def insert_into_mongo(data):
   users.insert_one(data)

if __name__ == '__main__':
       asyncio.run(reader())

Email Service

This service is responsible for the last step in the application’s flow. It – again – listens to a Redis channel in the same way as the above Redis consumers (Distance and DB), but this time, it takes the data from the Redis message (just the email address) and uses that to query MongoDB. Then, it logs an “email” message to the address it received from Redis with the associated calculated_distance that it gets from MongoDB.

import asyncio
import logging
from redis import psub, pub
from mongo import users

logging.basicConfig(filename='sent_emails',
                   filemode='a',
                   format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                   datefmt='%H:%M:%S',
                   level=logging.DEBUG)

async def reader():
   async with psub as p:
       await p.subscribe('user:email')
       if p != None:
           while True:
               message = await p.get_message(ignore_subscribe_messages=True)
               await asyncio.sleep(0)
               if message != None:
                   email = ((message['data']).decode('utf-8'))
                   data_from_db = users.find({"email": email}).limit(1).sort([('$natural',-1)])
                   for i in data_from_db:
                       send_email(email, i)
                       
def send_email(email, data):
   logging.info(f'Email sent to: {email}; Message: You are {data["distance_from_velocity"]} miles from Velocity.')

if __name__ == '__main__':
       asyncio.run(reader())

Deploy in Kubernetes

Again, all of the required resources are available in GitHub, so we’ll just walk through some unique aspects of several of the included K8s resource definitions. Additionally, to make it easier to deploy to multiple environments, we will include these definitions in the templates directory of a Helm chart.

If you aren’t familiar with Helm, and are coming from a Python background, you can basically think of it as making Jinja templates out of our K8s resource files that we can very easily populate with different values as needed.

The K8s resource definitions

Each of the services in the app include at least a Deployment, and some include a ClusterIP K8s Service. The FastAPI service includes both of the above and a K8s Ingress, which allows HTTP traffic to hit the API.

K8s Ingress


This diagram illustrates the different K8s resources required to deploy the app in Kubernetes. First, we have the FastAPI portion. There, we have an ingress that allows web traffic to enter the K8s cluster. That traffic is then routed to the FastAPI deployment inside of the cluster, but in order for that to be possible, we have to include a ClusterIP service, because the ingress needs an exposed IP and port to send information to.

Next in the list, we have the Redis and MongoDB deployments, which also have to be available for other services to connect to, so they also require a ClusterIP service.

And finally, we have the Redis consumers (i.e., “Data Processing,” “DB Service,” and “Email Service”). Because all the networking both to and between these services takes place via Redis, these services aren’t receiving any network calls directly. Instead, they are each listening to a given channel in Redis, doing something when they receive a message, and publishing something back to Redis when they’ve finished. So, these services only require a K8s Deployment for our app to work in Kubernetes.

The FastAPI K8s Resource Definition

The following FastAPI, K8s resource definition illustrates the structure of all the required YAML files. As noted above, it includes a Deployment, a ClusterIP Service and an Ingress. The container image, like all the others included in the project, is built according to the Dockerfile included the src/<service> directory in GitHub.

---
apiVersion: apps/v1
kind: Deployment
metadata:
 annotations:
   velocity.tech.v1/id: web-api
   velocity.tech.v1/dependsOn: redis
 name: web-api
 labels:
   app: web-api
spec:
 selector:
   matchLabels:
     api: web-api
 replicas: 1
 template:
   metadata:
     labels:
       app: web-api
       api: web-api
   spec:
     containers:
       - name: web-api
         image: jdvincent/web_api:latest
         env:
           - name: REDIS_HOST
             value: {{ .Values.redis_host | toJson  }}
           - name: REDIS_PORT
             value: {{ .Values.redis_port | toJson  }}
         ports:
           - name: web-api
             containerPort: 8000
             protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
 name: web-api
spec:
 ports:
   - port: 8000
     targetPort: 8000
     name: web-api
 selector:
   app: web-api
 type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
 name: web-api
spec:
 ingressClassName: {{ .Values.ingress_class_name | toJson }}
 rules:
   - host: {{ .Values.ingress_host | toJson }}
     http:
       paths:
         - path: /
           pathType: Prefix
           backend:
             service:
               name: web-api
               port:
                 number: 8000

Notice that the Ingress backend.service (web-api) aligns with the name of the ClusterIP service, and that the label on the Deployment (app: web-api) aligns with the selector defined on the service. Additionally, the Ingress port aligns with the Service port and then Service targetPort aligns with the Deployment’s container.port. This is how each of these three distinct K8s resources are made to work as a single unit – or service – within the larger application.

Run it in minikube

To run the project on a local K8s cluster in minikube, we’ll need to pass the following values.yaml file in the command below.

redis_port: "6379"
redis_host: redis
mongo_port: "27017"
mongo_host: mongo
ingress_host: null
ingress_class_name: kong
minikube start
minikube addons enable kong
minikube tunnel
helm template . --values values.yaml | kubectl apply -f -

Run it in Velocity

Velocity is a new development platform that allows you to run microservice-based applications both locally and in a remote K8s cluster simultaneously in order to speed development. You can sign up for a free Velocity account here.

In order to run this same project in a Velocity development environment, we only need to change the values.yaml file defined above to the following:

redis_port: "{velocity.v1:redis.exposures(port=redis).port}"
redis_host: "{velocity.v1:redis.exposures(port=redis).host}"
mongo_port: "{velocity.v1:mongodb.exposures(port=mongo).port}"
mongo_host: "{velocity.v1:mongodb.exposures(port=mongo).host}"
ingress_host: "api-{velocity.v1.domainSuffix}"
ingress_class_name: null

We can then spin up our Velocity environment with Helm and veloctl, like so:

helm template . --values velocity-values.yml | veloctl env create -f -
Velocity Environment