Get started with Velocity
Join the Waitlist
Join Our Discord
Blogs

Build an Async DB Worker with MongoDB, Redis and Python and Deploy it in Kubernetes

Jeff Vincent
Jeff Vincent
  
October 4, 2022

Often, in order to keep an application responsive at scale, you’ll need to pass long-running jobs to another process that can handle them asynchronously in the background. In this post, we’ll look at a simple way to achieve this with Python, Redis and Redis Queue.

Build an Async DB Worker with MongoDB, Redis and Python and Deploy it in Kubernetes

Our project will involve a Flask API, and we’ll collect some basic information from incoming HTTP requests to simulate the collection and storage of user analytics as we move that data asynchronously to a MongoDB instance. Of course, this sort of queue could be used for anything that involves long running jobs, so don’t let the example limit your imagination.

The complete code example lives here if you want to get straight to running it first.

What we’ll end up with

We’ll be using Redis and Redis Queue as a temporary datastore that our API can quickly write the incoming request data to. Then, our worker will read from the tail end of the queue on a first-come-first-served basis, and remove jobs from the queue as it handles them.

In this way, we could have any number of workers reading from the queue without knowing about each other, or what jobs have already been processed from the queue.

They each just take the last job and process it, whatever job that is. This is where the scalability of this solution comes in. As the number of requests increases, the time that the API takes to respond won’t increase in parallel, because it has a built-in “holding tank” where it can store a given job (in our case, the insertion of some request data into MongoDB).

So, you can increase the number of workers as the request volume increases. So, because the worker is a stand-alone process, you can scale it horizontally (i.e. increase the number of instances of just that process), which is always the target with cloud native applications.

Workflow graphic

The Python portions

We’ll need to define both the API that will be listening for requests and the worker that will be subscribing to the Redis Queue. Because both the API and the worker will need access to our async MongoDB insertion task, we’ll define that in a file called async_tasks.py in our root directory, so it can be imported by both modules.

Our Async Tasks:

# async_tasks.py
import pymongo
import os

MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')

conn_str = f'mongodb://{MONGO_HOST}:{MONGO_PORT}'
client = pymongo.MongoClient(conn_str, serverSelectionTimeoutMS=5000)
db = client.events_db
events = db.events

def insert_into_mongo(data):
  events.insert_one(data)

Our API:

Within your root directory for the project, create a directory called api with the following file structure:

api
├── Dockerfile
├── main.py
└── requirements.txt

Paste the following into main.py:

# main.py
import redis
from flask import Flask
from flask import request
from rq import Queue
from async_tasks import insert_into_mongo

app = Flask(__name__)
q = Queue(connection=redis.Redis(host='redis', port=6379, db=0))

@app.route('/')
def event():
  data = {'request_type': request.method}
  job = q.enqueue(insert_into_mongo, data)
  print(job.result)    
  return str(job.__dict__)

if __name__ == '__main__':
  app.run(debug='true', host='0.0.0.0', port='8888')

Next, the Dockerfile:

# Dockerfile
FROM python:3.10
COPY ./api/ .
COPY async_tasks.py .
RUN pip3 install -r requirements.txt
CMD python main.py

And finally, the requirements.txt:

Flask==2.1.2
pymongo==4.1.1
redis==4.3.4
rq==1.10.1

Our Worker:

Next, create a directory called worker – again at the root level of your project – so your file system looks like this:

async-db-example
├── async_tasks.py
├── api
│   ├── Dockerfile
│   ├── main.py
│   └── requirements.txt
└── worker
  ├── Dockerfile
  ├── main.py
  └── requirements.txt

Our worker needs access to the code that is being called when data is passed to the Redis Queue. In a simple case, we would just run the worker in the same root directory as our API and call it good, but in order to enforce the best practice of running a single process per container, we’ll need to define that job in a file that this second container will have access to.

To accomplish this, paste the following into worker/main.py.

from async_tasks import insert_into_mongo

And our Dockerfile once more:

FROM python:3.10
COPY ./worker/ .
COPY async_tasks.py .
RUN pip install -r requirements.txt
CMD ./run.sh

And finally, our requirements.txt:

pymongo==4.1.1
rq==1.10.1

Building our Docker images

Because we are copying the async_tasks.py file from our root directory into both the api and worker images, we’ll need to build the images from the root directory, like so:

docker build -t api:latest -f api/Dockerfile .
docker build -t worker:latest -f worker/Dockerfile .

Deploying in K8s

With the above in place, we can define our required resources to deploy the application to a Kubernetes cluster.

As we do so, we’ll also define a Helm chart that will allow us to compile each of the following resources into a single Helm template with values drawn dynamically from a Values.yaml file.

Our Helm Templates

The following Kubernetes resource definitions represent each of our four services we created above, along with the K8s resources required for networking the services together, as well as allowing us to submit a GET request via a web browser.

Generally speaking, each service consists of a Kubernetes deployment and ClusterIP service. Our API definition also includes an Ingress, which will allow web traffic to access it.

API

Paste the following into k8s/templates/api.yaml:

---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
  velocity.tech.v1/id: jeffs-api
  velocity.tech.v1/dependsOn: mongo-db, jeffs-worker, redis
name: api
labels:
  app: api
spec:
selector:
  matchLabels:
    api: api
replicas: 1
template:
  metadata:
    labels:
      app: api
      api: api
  spec:
    containers:
      - name: api
        image: jdvincent/api:latest
        env:
          - name: REDIS_HOST
            value: {{ .Values.redis_host | toJson  }}
          - name: REDIS_PORT
            value: {{ .Values.redis_port | toJson  }}
          - name: MONGO_HOST
            value: {{ .Values.mongo_host | toJson  }}
          - name: MONGO_PORT
            value: {{ .Values.mongo_port | toJson  }}
        ports:
          - name: api-port
            containerPort: 8888
            protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: api
spec:
ports:
  - port: 8888
    targetPort: 8888
    name: api
selector:
  app: api
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: api
spec:
ingressClassName: {{ .Values.ingress_class_name | toJson }}
rules:
  - host: {{ .Values.ingress_host | toJson }}
    http:
      paths:
        - path: /
          pathType: Prefix
          backend:
            service:
              name: api
              port:
                number: 8888

Notice in the above deployment, that we’re not hardcoding the MongoDB and Redis host and port names as environment variables. Rather, those values will be resolved by Helm when we convert these separate resources into a single Helm template.

Finally, the image we’re pulling has been built and pushed to Dockerhub for the sake of convenience, but it includes exactly the code defined above.

MongoDB

Paste the following into k8s/templates/mongodb.yaml:

---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
  velocity.tech.v1/id: mongo-db
name: mongo
labels:
  app: mongo
spec:
selector:
  matchLabels:
    api: mongo
replicas: 1
template:
  metadata:
    labels:
      app: mongo
      api: mongo
  spec:
    containers:
      - name: mongo
        image: mongo:5.0
        ports:
          - name: mongo
            containerPort: 27017
            protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: mongo
spec:
ports:
  - port: 27017
    targetPort: 27017
    name: mongo
selector:
  app: mongo
type: ClusterIP

Redis

Paste the following into k8s/templates/redis.yaml:

---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
  velocity.tech.v1/id: redis
name: redis
labels:
  app: redis
spec:
selector:
  matchLabels:
    api: redis
replicas: 1
template:
  metadata:
    labels:
      app: redis
      api: redis
  spec:
    containers:
      - name: redis
        image: redis:7.0.4
        ports:
          - name: redis-port
            containerPort: 6379
            protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: redis
spec:
ports:
  - port: 6379
    targetPort: 6379
    name: redis
selector:
  app: redis
type: ClusterIP

Worker

Paste the following into k8s/templates/worker.yaml:

---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
  velocity.tech.v1/id: worker
  velocity.tech.v1/dependsOn: redis, mongo-db
name: worker
labels:
  app: worker
spec:
selector:
  matchLabels:
    api: worker
replicas: 1
template:
  metadata:
    labels:
      api: worker
  spec:
    containers:
      - name: worker
        image: jdvincent/worker:latest
        env:
          - name: MONGO_HOST
            value: {{ .Values.mongo_host | toJson  }}
          - name: MONGO_PORT
            value: {{ .Values.mongo_port | toJson  }}
          - name: REDIS_HOST
            value: {{ .Values.redis_host | toJson  }}
          - name: REDIS_PORT
            value: {{ .Values.redis_port | toJson  }}

Our Values.yaml file:

redis_port: "6379"
redis_host: redis
mongo_port: "27017"
mongo_host: mongo
ingress_host: null
ingress_class_name: kong

Our Chart.yaml file:

apiVersion: v2
name: async-db-worker
description: An example Helm Chart
type: application
version: 0.1.0
appVersion: "1.16.0"

Our file system should look like this when we’re finished:

async-db-example
├── api
│   ├── Dockerfile
│   ├── main.py
│   ├── base_api.py
│   └── requirements.txt
├── k8s
│   ├── Chart.yaml
│   ├── templates
│   │   ├── api.yml
│   │   ├── mongodb.yml
│   │   ├── redis.yml
│   │   └── worker.yml
│   └── values.yml
└── worker
  ├── Dockerfile
  ├── main.py
  ├── base_api.py
  └── requirements.txt

Run it locally in Minikube:

minikube start
minikube addons enable kong
minikube tunnel

Then, in a second terminal window, run the following from the k8s directory:

helm template . --values values.yaml | kubectl apply -f -

Then, when you navigate to 127.0.0.1 in your web browser, an HTTP GET request will be sent to the Web API, which will forward analytics data related to that request to Redis. Next, the DB Worker will read that data from Redis, and write it to MongoDB.

You will see the following response from Redis to the Web API in your browser:

And you can see that the request data was written to Mongo by execing into the Mongo Pod. To do so, simply open the Minikube dashboard by running minikube dashboard and then click the exec icon on the Mongo Pod view. Once the terminal is open in the running Pod, you can run the following to see the stored data:

root@mongo-67c744c4c6-rf8ck:/# mongosh
Current Mongosh Log ID: 644bf2091ac3d3be92232199
Connecting to:          mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.1
Using MongoDB:          5.0.17
Using Mongosh:          1.8.1
test> use events_db
switched to db events_db
events_db> db.events.find()
[ { _id: ObjectId("644bf1f2b3d1c4318165ea91"), request_type: 'GET' } ]

Conclusion

You can use asynchronous networking to keep microservice-based applications responsive when they need to handle large volumes of incoming web traffic. Above, we walked through the process of building and deploying a simple asynchronous DB Worker, which reads data from an in memory cache (Redis) and then writes that data to the database (MongoDB). By storing the incoming requests in Redis, we allow the Web API to handle larger volumes of traffic much more efficiently, so your application will scale well.

Join the discussion!

Have any questions or comments about this post? Maybe you have a similar project or an extension to this one that you'd like to showcase? Join the Velocity Discord server to ask away, or just stop by to talk K8s development with the community.

Python class called ProcessVideo

Python class called ProcessVideo

Get started with Velocity