Get started with Velocity
Join the Waitlist
Join Our Discord
Blogs

Asynchronous Video Streaming in Python with FastAPI and MongoDB's GridFS

Jeff Vincent
Jeff Vincent
  
October 24, 2022

In this post, we build a simple video streaming app that demonstrates async features both in FastAPI and in the interactions it makes with our MongoDB database with Motor, the async Python driver. We’ll also demonstrate how asyncio can be used to network multiple microservices together.

Asynchronous Video Streaming in Python with FastAPI and MongoDB's GridFS

FastAPI is built to support asyncio, the asynchronous Python library, which provides the async and await keywords that allow tasks to be executed incrementally, rather than entirely in a single action. This makes the code much faster because input/output (I/O) based tasks don’t stop the flow of the application while it is waiting for the output to be returned. Instead, the application flow can immediately move on to the next task while it is waiting for the output from some external process.

Today, we’ll build a simple video streaming app that demonstrates this feature both in FastAPI and in the interactions it makes with our MongoDB database with Motor, the async Python driver. We’ll also demonstrate how asyncio can be used to network multiple microservices together with AIOHTTP, an async HTTP client for Python.

We’ll interact with MongoDB in two ways – both with “standard” documents and with GridFS, a feature that allows you to store large files as chunks of binary, which is how we will store and stream our video files.

Finally, for good measure, we’ll Dockerize our services and deploy the app in Kubernetes.

So, when finished we’ll have covered the following:

  • Asyncio – the underlying library that makes all the async stuff in Python work
  • FastAPI – an API framework that supports asyncio
  • Motor – an async driver for MongoDB
  • AIOHTTP – async HTTP for microservice networking
  • GridFS – a component of MongoDB for storing and streaming blobs of data
  • Kubernetes – a container orchestration platform

The full project is available on GitHub.

Application diagram

As you can see, the application consists of a Web API, an Auth service, and a MongoDB instance. We'll create two Mongo databases, users (for storing user data) and video_db (for storing metadata about user videos) each with its own collection, and a GridFS instance for storing and streaming the video files themselves.

GridFS, in turn, consists of a default bucket called fs in which there are two more collections – chunks and files. The chunks collection houses chunks of binary data that make up a larger file. The files collection stores file metadata related to any files uploaded into the fs bucket.

The chunks and files collections are otherwise the same as any other collection you would create in MongoDB. That is, they store data as documents that can later be queried according to fields written on the documents themselves. For example, we’ll create a random hash as a name for the uploaded videos, which we can then use to query the fs bucket when we need to stream a given, uploaded video.

Illustration of chunks and files collections

The Python Portion

We won’t review the entire Python modules here, as you can simply clone the repo from GitHub. Instead, we’ll look at specific excerpts that each demonstrate one or more of the concepts outlined above.

NOTE: The application is very simple and not production-ready. For example, it is never recommended to store plain text passwords, but we are doing so here for the sake of keeping things simple and focused.

Asyncio

For starters, it is important to understand what asyncio is and how it works at a high level, as it will make writing the async def functions it uses much more straightforward. Asyncio is a Python library that uses event loops, which I think of as a train that is running in a single, looping track. Passengers can get on and off the train at different points, but the train keeps moving on the same path regardless of who is riding on it.

Train on a looped track

Event loops work in roughly the same way. There is a processing loop (the train on the track) and your application can “hand off” tasks to it with the async keyword (i.e., passengers get on the train) and wait for responses from it with the await keyword (i.e., passengers get off), but the loop keeps going. This system allows your application to continue working while a task is being processed by the event loop, which means that the application’s flow isn’t “blocked” while it waits for a response. This, in a nutshell, is what makes asyncio so much faster than traditional Python.

Asyncio can also manage multiple loops at the same time – think multiple trains on multiple tracks. Although, this also means that you, as the “engineer,” need to make sure the passengers, or processes to be executed, can get to the right train. If this sounds complex, that’s probably because it is. Thankfully, we’ll only need to work with two “trains” or event loops in this application, and both will be accessed via the FastAPI library, which makes it relatively simple.

Web API

First, let’s look at making an asynchronous connection to MongoDB with the default asyncio event loop in our FastAPI app. To use the above analogy, this is the main train on the main track. We’ll want to make the connection to MongoDB with this event loop so that our route-handling functions can access it, and thus make use of the connection.

To do so in FastAPI, we can call an on_event function at application startup as shown below. Without this, motor the asyncio-based MongoDB driver, will make the connection in a separate event loop which our application won't be able to access.

@app.on_event('startup')
async def get_mongo():
  video_db = AsyncIOMotorClient(f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
  app.library = video_db.library
  app.fs = AsyncIOMotorGridFSBucket(video_db)

@app.get('/')
async def index(request: Request):
  try:
      if request.session['email']:
          videos = await _get_videos(request)
          return HTMLResponse(f'<h3>Logged in as "{request.session["email"]}"</h3>{views.upload_block}{videos}{views.logout_block}')
      return HTMLResponse(views.sign_up_login_block)
  except:
      return HTMLResponse(views.sign_up_login_block)

async def _get_videos(request: object):
  videos = app.library.find({'email': request.session['email']})
  docs = await videos.to_list(None)
  video_urls = ''
  for i in docs:
      filename = i['filename']
      v = f'<a href="{PROTOCOL}://{HOST}/stream/{filename}" target="_blank">http://{HOST}/stream/{filename}</a>'
      video_urls = video_urls + '<br>' + v
  return video_urls

Above, we’re calling an async function we’ve defined locally – _get_videos. We can use the await keyword when we call the function, because we used the async keyword in the function definition. _get_videos is querying the MongoDB library collection for all videos that have been uploaded by the currently logged in user, which we’re tracking with sessions that we’ll look at below. It works by iterating over a list of returned docs from MongoDB with an async motor cursor.

Asyncio-Compatible Libraries

It is important to use asyncio-compatible libraries  – like Motor and AIOHTTP – within your FastAPI app so that the processes they execute can be given to the event loop, or they will be “blocking” operations that the app will have to wait for a response from before it can continue on to its next task.

AIOHTTP

The following asynchronous HTTP networking with aiohttp works similarly. Here, we have two routes /sign-up and /login. Both are listening for a POST request from the browser, and both parse some form data (an email and a password) before forwarding that data to the Auth service. We’re using AIOHTTP here, so that the Web API will be free to handle another request while the Auth service interacts with the DB. Specifically, the process isn’t blocking, because we can use the await keyword to “free up” the Web API while it’s waiting for a response from the Auth service. That means that the Web API is free to handle the next incoming request in the meantime.

@app.post('/sign-up')
async def sign_up(email: str = Form(), password: str = Form()):
  user_data = {'email': email, 'password': password}
  try:
      async with aiohttp.ClientSession() as session:
          async with session.post(f'http://{AUTH_HOST}:{AUTH_PORT}/sign-up', data=user_data) as response:
              r = await response.text()
  except Exception as e:
      return HTMLResponse(f'<h3>Error: {str(e)}</h3>{views.sign_up_login_block}')

  if '1' in r:
      return HTMLResponse(f'<h3>An account already exists with that email</h3>{views.sign_up_login_block}')
  return HTMLResponse(views.sign_up_login_block)

@app.post('/login')
async def login(request: Request, email: str = Form(), password: str = Form()):
  user_data = {'email': email, 'password': password}
  try:
      async with aiohttp.ClientSession() as session:
          async with session.post(f'http://{AUTH_HOST}:{AUTH_PORT}/login', data=user_data) as response:
              r = await response.text()
              print(r)
  except Exception as e:
      return HTMLResponse(f'<h3>Error: {str(e)}</h3>{views.sign_up_login_block}')

  if '1' in r:
      return HTMLResponse(f'<h3>Login failed</h3>{views.sign_up_login_block}')
  request.session['email'] = email
  videos = await _get_videos(request)
  return HTMLResponse(f'<h3>Logged in as "{email}"</h3>{views.upload_block}{videos}{views.logout_block}')

In the /login route handler, we’re also setting session data for the logged-in user. We have access to this request property via Starlette, the underlying framework that upon FastAPI is built. This property provides us access to a Python dictionary that accepts plain-text, which it then converts to a cookie that is sent back to the browser, which we can use to identify which logged-in user is making a given request to the Web API.

GridFS and FastAPI’s Background Tasks

Next, we’ll look at uploading files to GridFS with a FastAPI BackgroundTask. Notice we’ve written several helper functions _generate_hash, _add_library_record, and _upload. Each of these functions is used in the /upload route handler.

First, we're using _generate_hash to create a unique filename for every file being uploaded. Next, we’re passing _add_library_record and _upload to an instance of the FastAPI BackgroundTasks class with the included add_task method. The background_tasks instance is a FastAPI abstraction of the event loops described above. It is working by creating a new event loop and executing the tasks in that loop, which works well for long-running tasks like file uploads.

async def _generate_hash():
  return binascii.hexlify(os.urandom(16)).decode('utf-8')

async def _add_library_record(email: str, hash: str):
  data = {'email': email, 'filename': hash}
  await app.library.insert_one(data)

async def _upload(file: object, hash: str):
  grid_in = app.fs.open_upload_stream(
      hash, metadata={'contentType': 'video/mp4'})
  data = await file.read()
  await grid_in.write(data)
  await grid_in.close()  # uploaded on close

@app.post('/upload')
async def upload(request: Request, file: UploadFile, background_tasks: BackgroundTasks):
  if request.session['email']:
      if file.filename:
          hash = await _generate_hash()
          background_tasks.add_task(_upload, file, hash)
          background_tasks.add_task(_add_library_record, request.session['email'], hash)
          videos = await _get_videos(request)
          return HTMLResponse(f'{views.upload_block}{videos}{views.video_library_block}{views.logout_block}')
      return HTMLResponse(f'<h3>Please select a file to upload</h3>{views.upload_block + views.logout_block}')
  return HTMLResponse(f'<h3>Please log in</h3>{views.sign_up_login_block}')

Finally, let’s look at streaming files from GridFS. To do so, we’ll use the GridFS method open_download_stream_by_name to request a given file one chunk at a time, which we will read – again, chunk by chunk – with an asynchronous generator. We'll then yield each chunk of binary data, one at a time, to the browser with a FastAPI StreamingResponse that takes our async generator read() as its first argument.

@app.get('/stream/{filename}')
async def stream(filename: str, request: Request):
  if request.session['email']:
      grid_out = await app.fs.open_download_stream_by_name(filename)
     
      async def read():
          while grid_out.tell() < grid_out.length:
              yield await grid_out.readchunk()

      return StreamingResponse(read(), media_type='video/mp4',
          headers={'Content-Length': str(grid_out.length)})
  return HTMLResponse(f'<h3>Please log in</h3>{views.sign_up_login_block}')

Deploy in Kubernetes with Helm

In the provided repo, you’ll also find a k8s directory with the resources you’ll need to deploy the application in Kubernetes with Helm. Below, we’ll look at examples of each of the file types this requires.

Dockerize our services

First, in both the web_api and auth directories, you’ll find a Dockerfile. This file tells Docker how to build the service into an image, which we can then pull into Kubernetes in order to run it. Here, we’re starting with a base image of python:3.10 into which we copy our source code. Then, we install our dependencies with the RUN command, and finally, we define a command (CMD) that will be executed when the container that is based on this image is spun up.

FROM python:3.10
COPY ./src/web_api/ .
RUN pip install -r requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

NOTE: There is no need to build the Docker image unless you’d like to, as it has already been built and pushed to DockerHub, so that it can be pulled by the Kubernetes resource definitions provided in the sample repository.

Helm templates

Next, let’s look at an example of an included Helm template. This file defines the required K8s resources for the Web API service and their networking connections. These include a Deployment, which is itself made up of multiple k8s resources, a ClusterIP Service and an Ingress.

  • Deployment: this is where the Docker image we defined above will be spun up into a container. It will run inside of one or more Pods, which are controlled by a ReplicaSet, and they will receive requests from a LoadBalancer.
  • ClusterIP Service: this K8s resource provides other services within the larger cluster a hostname and an IP address to send requests to.
  • Ingress: this resource is what exposes the application to external HTTP requests.

Each of the above K8s resources work together to form a single “service” within the larger application deployment.

The other services included in the project – Auth and MongoDB – both consist of a Deployment and a ClusterIP service, but no Ingress, as they aren’t receiving HTTP requests from the internet directly.

---
apiVersion: apps/v1
kind: Deployment
metadata:
name: web-api
labels:
  app: web-api
spec:
selector:
  matchLabels:
    api: web-api
replicas: 3
template:
  metadata:
    labels:
      app: web-api
      api: web-api
  spec:
    containers:
      - name: web-api
        image: jdvincent/video_api:latest
        env:
          - name: PROTOCOL
            value: {{ .Values.protocol | toJson  }}
          - name: HOST
            value: {{ .Values.host | toJson  }}
          - name: AUTH_HOST
            value: {{ .Values.auth_host | toJson  }}
          - name: AUTH_PORT
            value: {{ .Values.auth_port | toJson  }}
          - name: MONGO_HOST
            value: {{ .Values.mongo_host | toJson  }}
          - name: MONGO_PORT
            value: {{ .Values.mongo_port | toJson  }}
        ports:
          - name: web-api
            containerPort: 8000
            protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: web-api
spec:
ports:
  - port: 8000
    targetPort: 8000
    name: web-api
selector:
  app: web-api
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: web-api
spec:
ingressClassName: {{ .Values.ingress_class_name | toJson }}
rules:
  - host: {{ .Values.ingress_host | toJson }}
    http:
      paths:
        - path: /
          pathType: Prefix
          backend:
            service:
              name: web-api
              port:
                number: 8000

Helm values

Helm charts include one or more values.yaml files that provide an easy way to configure the templates with different values, so the application can be deployed in different environments. Notice above that certain values are populated with something like the following: {{ .Values.ingress_class_name | toJson }}. This tells Helm to dynamically replace that template with the associated value from the values.yaml file. For example, this particular template will resolve to kong, because that’s what we’ve defined as the ingress_class_name below.

auth_host: auth
auth_port: "8000"
mongo_port: "27017"
mongo_host: mongo
ingress_host: null
ingress_class_name: kong
host: "127.0.0.1"
protocol: "http"

Run it in Minikube

minikube start
minikube addons enable kong
minikube tunnel
cd k8s
helm template . --values values.yaml | kubectl apply -f -

Conclusion

FastAPI supports the Asyncio async and await keywords that allow for asynchronous operations in Python. Above, we walked through the process of using FastAPI to build a microservice-based video streaming application that stores mp4 videos in and streams them from a MongoDB GridFS Bucket. In doing so, we covered async HTTP networking within the application, along with FastAPI’s background tasks feature that allows you to leverage Asyncio’s event loops feature to process long-running jobs in the background of the application. Then, we containerized our APIs and walked through the process of deploying them in Kubernetes.

Join the discussion!

Have any questions or comments about this post? Maybe you have a similar project or an extension to this one that you'd like to showcase? Join the Velocity Discord server to ask away, or just stop by to talk K8s development with the community.

Python class called ProcessVideo

Python class called ProcessVideo

Get started with Velocity