Often, in order to keep an application responsive at scale, you’ll need to pass long-running jobs to another process that can handle them asynchronously in the background. In this post, we’ll look at a simple way to achieve this with Python, Redis and Redis Queue.
Our project will involve a Flask API, and we’ll collect some basic information from incoming HTTP requests to simulate the collection and storage of user analytics as we move that data asynchronously to a MongoDB instance. Of course, this sort of queue could be used for anything that involves long running jobs, so don’t let the example limit your imagination.
The complete code example lives here if you want to get straight to running it first.
We’ll be using Redis and Redis Queue as a temporary datastore that our API can quickly write the incoming request data to. Then, our worker will read from the tail end of the queue on a first-come-first-served basis, and remove jobs from the queue as it handles them.
In this way, we could have any number of workers reading from the queue without knowing about each other, or what jobs have already been processed from the queue.
They each just take the last job and process it, whatever job that is. This is where the scalability of this solution comes in. As the number of requests increases, the time that the API takes to respond won’t increase in parallel, because it has a built-in “holding tank” where it can store a given job (in our case, the insertion of some request data into MongoDB).
So, you can increase the number of workers as the request volume increases. So, because the worker is a stand-alone process, you can scale it horizontally (i.e. increase the number of instances of just that process), which is always the target with cloud native applications.
We’ll need to define both the API that will be listening for requests and the worker that will be subscribing to the Redis Queue. Because both the API and the worker will need access to our async MongoDB insertion task, we’ll define that in a file called async_tasks.py in our root directory, so it can be imported by both modules.
# async_tasks.py
import pymongo
import os
MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')
conn_str = f'mongodb://{MONGO_HOST}:{MONGO_PORT}'
client = pymongo.MongoClient(conn_str, serverSelectionTimeoutMS=5000)
db = client.events_db
events = db.events
def insert_into_mongo(data):
events.insert_one(data)
Within your root directory for the project, create a directory called api with the following file structure:
api
├── Dockerfile
├── main.py
└── requirements.txt
Paste the following into main.py:
# main.py
import redis
from flask import Flask
from flask import request
from rq import Queue
from async_tasks import insert_into_mongo
app = Flask(__name__)
q = Queue(connection=redis.Redis(host='redis', port=6379, db=0))
@app.route('/')
def event():
data = {'request_type': request.method}
job = q.enqueue(insert_into_mongo, data)
print(job.result)
return str(job.__dict__)
if __name__ == '__main__':
app.run(debug='true', host='0.0.0.0', port='8888')
Next, the Dockerfile:
# Dockerfile
FROM python:3.10
COPY ./api/ .
COPY async_tasks.py .
RUN pip3 install -r requirements.txt
CMD python main.py
And finally, the requirements.txt:
Flask==2.1.2
pymongo==4.1.1
redis==4.3.4
rq==1.10.1
Next, create a directory called worker – again at the root level of your project – so your file system looks like this:
async-db-example
├── async_tasks.py
├── api
│ ├── Dockerfile
│ ├── main.py
│ └── requirements.txt
└── worker
├── Dockerfile
├── main.py
└── requirements.txt
Our worker needs access to the code that is being called when data is passed to the Redis Queue. In a simple case, we would just run the worker in the same root directory as our API and call it good, but in order to enforce the best practice of running a single process per container, we’ll need to define that job in a file that this second container will have access to.
To accomplish this, paste the following into worker/main.py.
from async_tasks import insert_into_mongo
And our Dockerfile once more:
FROM python:3.10
COPY ./worker/ .
COPY async_tasks.py .
RUN pip install -r requirements.txt
CMD ./run.sh
And finally, our requirements.txt:
pymongo==4.1.1
rq==1.10.1
Because we are copying the async_tasks.py file from our root directory into both the api and worker images, we’ll need to build the images from the root directory, like so:
docker build -t api:latest -f api/Dockerfile .
docker build -t worker:latest -f worker/Dockerfile .
With the above in place, we can define our required resources to deploy the application to a Kubernetes cluster.
As we do so, we’ll also define a Helm chart that will allow us to compile each of the following resources into a single Helm template with values drawn dynamically from a Values.yaml file.
The following Kubernetes resource definitions represent each of our four services we created above, along with the K8s resources required for networking the services together, as well as allowing us to submit a GET request via a web browser.
Generally speaking, each service consists of a Kubernetes deployment and ClusterIP service. Our API definition also includes an Ingress, which will allow web traffic to access it.
Paste the following into k8s/templates/api.yaml:
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
velocity.tech.v1/id: jeffs-api
velocity.tech.v1/dependsOn: mongo-db, jeffs-worker, redis
name: api
labels:
app: api
spec:
selector:
matchLabels:
api: api
replicas: 1
template:
metadata:
labels:
app: api
api: api
spec:
containers:
- name: api
image: jdvincent/api:latest
env:
- name: REDIS_HOST
value: {{ .Values.redis_host | toJson }}
- name: REDIS_PORT
value: {{ .Values.redis_port | toJson }}
- name: MONGO_HOST
value: {{ .Values.mongo_host | toJson }}
- name: MONGO_PORT
value: {{ .Values.mongo_port | toJson }}
ports:
- name: api-port
containerPort: 8888
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: api
spec:
ports:
- port: 8888
targetPort: 8888
name: api
selector:
app: api
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: api
spec:
ingressClassName: {{ .Values.ingress_class_name | toJson }}
rules:
- host: {{ .Values.ingress_host | toJson }}
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: api
port:
number: 8888
Notice in the above deployment, that we’re not hardcoding the MongoDB and Redis host and port names as environment variables. Rather, those values will be resolved by Helm when we convert these separate resources into a single Helm template.
Finally, the image we’re pulling has been built and pushed to Dockerhub for the sake of convenience, but it includes exactly the code defined above.
Paste the following into k8s/templates/mongodb.yaml:
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
velocity.tech.v1/id: mongo-db
name: mongo
labels:
app: mongo
spec:
selector:
matchLabels:
api: mongo
replicas: 1
template:
metadata:
labels:
app: mongo
api: mongo
spec:
containers:
- name: mongo
image: mongo:5.0
ports:
- name: mongo
containerPort: 27017
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: mongo
spec:
ports:
- port: 27017
targetPort: 27017
name: mongo
selector:
app: mongo
type: ClusterIP
Paste the following into k8s/templates/redis.yaml:
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
velocity.tech.v1/id: redis
name: redis
labels:
app: redis
spec:
selector:
matchLabels:
api: redis
replicas: 1
template:
metadata:
labels:
app: redis
api: redis
spec:
containers:
- name: redis
image: redis:7.0.4
ports:
- name: redis-port
containerPort: 6379
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: redis
spec:
ports:
- port: 6379
targetPort: 6379
name: redis
selector:
app: redis
type: ClusterIP
Paste the following into k8s/templates/worker.yaml:
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
velocity.tech.v1/id: worker
velocity.tech.v1/dependsOn: redis, mongo-db
name: worker
labels:
app: worker
spec:
selector:
matchLabels:
api: worker
replicas: 1
template:
metadata:
labels:
api: worker
spec:
containers:
- name: worker
image: jdvincent/worker:latest
env:
- name: MONGO_HOST
value: {{ .Values.mongo_host | toJson }}
- name: MONGO_PORT
value: {{ .Values.mongo_port | toJson }}
- name: REDIS_HOST
value: {{ .Values.redis_host | toJson }}
- name: REDIS_PORT
value: {{ .Values.redis_port | toJson }}
redis_port: "6379"
redis_host: redis
mongo_port: "27017"
mongo_host: mongo
ingress_host: null
ingress_class_name: kong
apiVersion: v2
name: async-db-worker
description: An example Helm Chart
type: application
version: 0.1.0
appVersion: "1.16.0"
Our file system should look like this when we’re finished:
async-db-example
├── api
│ ├── Dockerfile
│ ├── main.py
│ ├── base_api.py
│ └── requirements.txt
├── k8s
│ ├── Chart.yaml
│ ├── templates
│ │ ├── api.yml
│ │ ├── mongodb.yml
│ │ ├── redis.yml
│ │ └── worker.yml
│ └── values.yml
└── worker
├── Dockerfile
├── main.py
├── base_api.py
└── requirements.txt
minikube start
minikube addons enable kong
minikube tunnel
Then, in a second terminal window, run the following from the k8s directory:
helm template . --values values.yaml | kubectl apply -f -
Then, when you navigate to 127.0.0.1 in your web browser, an HTTP GET request will be sent to the Web API, which will forward analytics data related to that request to Redis. Next, the DB Worker will read that data from Redis, and write it to MongoDB.
You will see the following response from Redis to the Web API in your browser:
And you can see that the request data was written to Mongo by execing into the Mongo Pod. To do so, simply open the Minikube dashboard by running minikube dashboard and then click the exec icon on the Mongo Pod view. Once the terminal is open in the running Pod, you can run the following to see the stored data:
root@mongo-67c744c4c6-rf8ck:/# mongosh
Current Mongosh Log ID: 644bf2091ac3d3be92232199
Connecting to: mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.1
Using MongoDB: 5.0.17
Using Mongosh: 1.8.1
test> use events_db
switched to db events_db
events_db> db.events.find()
[ { _id: ObjectId("644bf1f2b3d1c4318165ea91"), request_type: 'GET' } ]
You can use asynchronous networking to keep microservice-based applications responsive when they need to handle large volumes of incoming web traffic. Above, we walked through the process of building and deploying a simple asynchronous DB Worker, which reads data from an in memory cache (Redis) and then writes that data to the database (MongoDB). By storing the incoming requests in Redis, we allow the Web API to handle larger volumes of traffic much more efficiently, so your application will scale well.
Python class called ProcessVideo
Python class called ProcessVideo
Thank you! Your submission has been received!