Get started with Velocity
Join the Waitlist
Join Our Discord
Blogs

Develop Kubernetes Native Applications with Velocity

Velocity Team
Velocity Team
  
January 1, 2024

Streamline Kubernetes Native Application Development with Velocity

Develop Kubernetes Native Applications with Velocity

Kubernetes native applications are those that leverage Kubernetes-specific API objects, such as Batch/Jobs, Pods and the like, not only as a means of deploying an application, but rather as a means of building the application itself. This architectural approach offers numerous advantages, such as the ability to run application processes asynchronously and to allow the Kubernetes scheduler to distribute those processes efficiently across your full cluster’s resources. 

However, while developing such an application, your individual services can only be effectively written and debugged in a running Kubernetes cluster, as they rely on the Kubernetes API and any additional cluster resources your architecture includes. 

Historically, this would have required you to update your code locally, commit your changes, wait for any relevant CI process to complete, build and push a container image, pull the image into your Kubernetes cluster and only then would you be able to see if your code runs as expected. 

Velocity dramatically simplifies and speeds up the development workflow for Kubernetes native applications by allowing you to simply write and debug code in your local IDE, which it then automatically syncs to your cluster, so you can effectively develop directly in your cluster while working from your local machine.

This post will walk you through the process of developing a Kubernetes native application, and it will also demonstrate how much more easily and efficiently such applications can be developed with Velocity. 

What we’re building

Today we'll build a video frame stabilizer written in Python, which will require a significant amount of compute time and resources to process the videos we'll be uploading. To keep the application responsive while this video processing workflow is being executed, we'll use Kubernetes Batch/Jobs to allow the Kubernetes controller to schedule this processing workflow asynchronously and in a way that maximizes our cluster's compute resources.

That is, rather than running the video processing workflow in our “core” FastAPI application, which would cause the API to bog down with any significant traffic load, the FastAPI application will call the Kubernetes API and request the creation of a Batch/Job that will run the video processing workflow as an independent and isolated workflow. This way, the Kubernetes scheduler can spread the running of this distinct workflow across the full cluster's resources, and the FastAPI application will be able to handle additional incoming requests during the video processing workflow.

The full project is available in GitHub. 

Start a Kubernetes Cluster

To start, let's spin up a local cluster with Minikube. Because the video processing workflow is a bit resource intensive, let's also increase the default memory and CPUs of our cluster, like so:

minikube config set memory 3500
minikube config set cpus 4
minikube start
minikube addons enable kong
minikube tunnel

Above, after starting the cluster, we have enabled the kong add-on and started a tunnel session in order to later deploy a Kubernetes Ingress, so we can then send requests from outside of the cluster.

Want to learn more about minikube? Check out our tutorial https://www.youtube.com/watch?v=xiyWacQca2c

Create a virtual env

Next, we'll create a virtual env locally to make it simple to manage our dependencies as we work.

pip install pipenv
pipenv shell

The FastAPI video upload and play app (Kubernetes Deployment)

Let's create the “core” FastAPI application which will allow us to upload videos, store them in MongoDB's GridFS, and then stream the videos to the browser. First, we'll install our Python dependencies, and add those packages to our requirements.txt:

pip install fastapi motor python-multipart uvicorn
pip freeze > requirements.txt

And now, let's define the API in a file called main.py:

import logging
import os
from fastapi import FastAPI, UploadFile
from fastapi.responses import StreamingResponse
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

app = FastAPI()

MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')

@app.on_event('startup')
async def get_mongo():
    video_db = AsyncIOMotorClient(f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
    app.fs = AsyncIOMotorGridFSBucket(video_db)

@app.post('/api/upload')
async def upload(file: UploadFile):
    if file.filename:
        grid_in = app.fs.open_upload_stream(
            file.filename, metadata={'contentType': 'video/mp4'})
        data = await file.read()
        await grid_in.write(data)
        await grid_in.close()

        base_filename = os.path.splitext(file.filename)[0]
        output_filename = f"{base_filename}_stabilized.mp4"
        response_text = f"""Successfully uploaded {file.filename}
                        View it in your browser at http://localhost/api/stream/{file.filename}"""
        return response_text
    return ''

@app.get('/api/stream/{filename}')
async def stream(filename: str):
    grid_out = await app.fs.open_download_stream_by_name(filename)

    async def read():
        while grid_out.tell() < grid_out.length:
            yield await grid_out.readchunk()

    return StreamingResponse(
        read(), media_type='video/mp4', headers={
            'Content-Length': str(grid_out.length)})

Above, we've defined a simple FastAPI application that connects to MongoDB at startup, and that then exposes two HTTP endpoints — upload and stream. The upload endpoint accepts a POST request that contains a video file as form data, which it then writes to MongoDB. The stream endpoint accepts a GET request with the name of a file that has previously been uploaded, and then it streams the file back to the browser.

Notice that to stream the video, we are returning a FastAPI streaming response, and within that, we are reading the data in chunks asynchronously from MongoDB, which allows us to stream data to the browser as it is read from Mongo.

Dockerize the app

Next, we'll need to containerize the application we just wrote above with the following Dockerfile.

FROM python:3.10
COPY . .
RUN pip install -r requirements.txt
CMD uvicorn main:app --host 0.0.0.0 --port 8000

Then we can run the following to build and push the image:

minikube image build -t fastapi-video-app:latest .

Deploy to Kubernetes

Now, to deploy the app to Kubernetes, we'll need to write two files which will define the MongoDB deployment and service, the FastAPI deployment and service, and the ingress that will route traffic to the FastAPI app.

mongodb.yaml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mongo
  labels:
    app: mongo
spec:
  selector:
    matchLabels:
      app: mongo
  replicas: 1
  template:
    metadata:
      labels:
        app: mongo
    spec:
      containers:
        - name: mongo
          image: mongo:5.0
          ports:
            - name: mongo
              containerPort: 27017
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mongo
spec:
  ports:
    - port: 27017
      targetPort: 27017
      name: mongo
  selector:
    app: mongo
  type: ClusterIP

Web-api.yaml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: web-api
  labels:
    app: web-api
spec:
  selector:
    matchLabels:
      app: web-api
  replicas: 1
  template:
    metadata:
      labels:
        app: web-api
    spec:
      containers:
        - name: web-api
          image: fastapi-video-app:latest
          imagePullPolicy: IfNotPresent
          env:
            - name: MONGO_HOST
              value: "mongo"
            - name: MONGO_PORT
              value: "27017"
          ports:
            - name: web-api
              containerPort: 8000
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: web-api
spec:
  ports:
    - port: 8000
      targetPort: 8000
      name: web-api
  selector:
    app: web-api
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: web-api
spec:
  ingressClassName: kong
  rules:
    - host: null
      http:
        paths:
          - path: /api
            pathType: Prefix
            backend:
              service:
                name: web-api
                port:
                  number: 8000

Finally, with the above files ready, we can deploy the application by running the following:

kubectl apply -f mongodb.yaml -n default
kubectl apply -f web-api.yaml -n default

Upload a video

Now that the app is up and running, we can upload a video to our FastAPI app, and it will be stored in MongoDB by its filename, like so:

curl -X POST -F "file=@Documents/wobbly.mov" http://localhost/api/upload

NOTE: you'll need to update the above file path to point at a video you have on your local machine. 

Successfully uploaded wobbly.mov

View it in your browser at http://localhost/api/stream/wobbly.mov

And then, we will see something like the above, with a link to stream the video to the browser!

Start Velocity

Install Velocity from the JetBrains Marketplace, and then click the “run” button next to the default run configuration, “Setup Velocity.”

Then define a Velocity run configuration as follows:

Click “Next” and then “Create” and you'll see a Velocity session's output in the Velocity console:

"/Users/jeffvincent/Library/Application Support/JetBrains/PyCharm2023.1/plugins/Velocity/velocity" start deployment/web-api --events-server-port 52476 --kube-context minikube --namespace default --configuration-name jb-1-ntAV1bLkD4aUHBAf --send-diagnostics-on-err --python.pycharm-version 231.9011.38
[VELOCITY] [EnrichConfig] Using configuration: 'jb-1-ntAV1bLkD4aUHBAf'
[VELOCITY] Session Trace ID: 2b4e48e3936a102142d35fde2af1087b
[VELOCITY] Using CLI version: '0.28.0'
[VELOCITY] Starting events server (port 52476)
[VELOCITY] Client connected to events server
[VELOCITY] Fetching existing docker auth config
[VELOCITY] [ValidateTarget] Using context: 'minikube'
[VELOCITY] [ValidateTarget] Using namespace: 'default'
[VELOCITY] [ValidateTarget] Using target: 'deployment/web-api'
[VELOCITY] [ValidateTarget] Activating development session

Update the deployed app

Now, let's update our deployed application to include the Kubernetes SDK for Python, which we'll use to create the Batch/Job described above.

pip install kubernetes
pip freeze > requirements.txt

base_api.py

Next, let's write the logic that will call the Kubernetes API in a class called CreateProcessVideoJob with the Python SDK for Kubernetes. This base_api.py file will include a Kubernetes Job manifest defined with the Python SDK for Kubernetes, which in turn will contain the container image batch-process-image:latest that we’ll define in just a bit.

import binascii
import os
from kubernetes import client


class CreateProcessVideoJob:
    def __init__(self, filename):
        self.filename = filename
        self.job_name = binascii.hexlify(os.urandom(16)).decode('utf-8')
        self.job = None
        self.batch_v1 = client.BatchV1Api()

    def create_job_object(self):
        env_vars = [
            client.V1EnvVar(name="MONGO_HOST", value="mongo"),
            client.V1EnvVar(name="MONGO_PORT", value="27017"),
        ]

        # Configure Pod template container
        container = client.V1Container(
            name=self.job_name,
            image="batch-process-image:latest", # We will build this image shortly.
            command=["python", "main.py", "--filename", self.filename],
            env=env_vars)

        # Create and configure a spec section
        template = client.V1PodTemplateSpec(
            metadata=client.V1ObjectMeta(labels={"app": self.job_name}),
            spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
        
				# Create the specification of deployment
        spec = client.V1JobSpec(
            template=template,
            backoff_limit=4)

        # Instantiate the job object
        job = client.V1Job(
            api_version="batch/v1",
            kind="Job",
            metadata=client.V1ObjectMeta(name=self.job_name),
            spec=spec)

        self.job = job

    def create_job(self):
        api_response = self.batch_v1.create_namespaced_job(
            body=self.job,
            namespace="default")
        print("Job created. status='%s'" % str(api_response.status))

Notice that above, we've defined all the sections that you would see in a standard YAML manifest for a Batch/Job, but we've done so in Python.

main.py

And now, let's import the above module — base_api.py, and update the upload function so that it will instantiate the CreateProcessVideoJob class and then use it to call the Kubernetes API as a FastAPI background task:

import logging
import os
from fastapi import FastAPI, BackgroundTasks, UploadFile
from fastapi.responses import StreamingResponse
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
from kubernetes import config

from base_api import CreateProcessVideoJob

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

app = FastAPI()

MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')

config.load_incluster_config()

@app.on_event('startup')
async def get_mongo():
    video_db = AsyncIOMotorClient(f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
    app.fs = AsyncIOMotorGridFSBucket(video_db)

async def _process_video(filename):
    create_batch_job = CreateProcessVideoJob(filename)
    create_batch_job.create_job_object()
    create_batch_job.create_job()

@app.post('/api/upload')
async def upload(file: UploadFile, background_tasks: BackgroundTasks):
    if file.filename:
        grid_in = app.fs.open_upload_stream(
            file.filename, metadata={'contentType': 'video/mp4'})
        data = await file.read()
        await grid_in.write(data)
        await grid_in.close()

        background_tasks.add_task(_process_video, file.filename)
        base_filename = os.path.splitext(file.filename)[0]
        output_filename = f"{base_filename}_stabilized.mp4"
        response_text = f"""Successfully uploaded {file.filename}
                        View it in your browser at http://localhost/api/stream/{file.filename}
                        Kubernetes batch job is running; when complete,
                        you can view the processed video at http://localhost/api/stream/{output_filename}"""
        return response_text
    return ''

@app.get('/api/stream/{filename}')
async def stream(filename: str):
    grid_out = await app.fs.open_download_stream_by_name(filename)

    async def read():
        while grid_out.tell() < grid_out.length:
            yield await grid_out.readchunk()

    return StreamingResponse(
        read(), media_type='video/mp4', headers={
            'Content-Length': str(grid_out.length)})

Develop the Batch/Job workflow

Next, we'll need to define the video processing workflow, which will run as a container in the Batch/Job we're creating. For this, we'll define a Python class called ProcessVideo, which will take a filename as a parameter when we instantiate the class.

import argparse
import asyncio
import os
import cv2
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
from vidgear.gears import VideoGear

MONGO_HOST = os.environ.get('MONGO_HOST')
MONGO_PORT = os.environ.get('MONGO_PORT')


class ProcessVideo:
    def __init__(self, filename):
        self.raw_video = None
        self.processed_video = None
        self.filename = filename
        self.base_filename = None
        self.video_db = AsyncIOMotorClient(
            f'mongodb://{MONGO_HOST}:{MONGO_PORT}').video
        self.library = self.video_db.library
        self.fs = AsyncIOMotorGridFSBucket(self.video_db)

    async def download_video(self):
        cursor = self.fs.find(
            {'filename': self.filename}, no_cursor_timeout=True)
        while (await cursor.fetch_next):
            grid_out = cursor.next_object()
            self.raw_video = await grid_out.read()
            with open('video.mp4', 'wb') as f:
                f.write(self.raw_video)

    def _stream_to_file(self):
        self.base_filename = os.path.splitext(self.filename)[0]
        output_filename = f"{self.base_filename}_stabilized.mp4"
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")

        # Open the VideoWriter
        output_writer = None
        frame_size = None

        while True:
            frame = self.processed_video.read()
            if frame is None:
                break

            if output_writer is None:
                frame_size = (frame.shape[1], frame.shape[0])
                output_writer = cv2.VideoWriter(
                    output_filename, fourcc, 30.0, frame_size)

            output_writer.write(frame)

        if output_writer is not None:
            output_writer.release()
            print(f"Processed frames saved to {output_filename}")
        else:
            print("No frames were processed.")

    def process_video(self):
        self.processed_video = \
            VideoGear(source='video.mp4', stabilize=True).start()
        self._stream_to_file()

    async def upload_video(self):
        filename = f"{self.base_filename}_stabilized.mp4"
        grid_in = self.fs.open_upload_stream(
            filename, metadata={'contentType': 'video/mp4'})
        with open(filename, 'rb') as file:
            data = file.read()
        await grid_in.write(data)
        await grid_in.close()


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--filename', dest='filename', type=str, required=True)
    _args = parser.parse_args()
    return _args


async def main(filename):
    pv = ProcessVideo(filename=filename)
    await pv.download_video()
    pv.process_video()
    await pv.upload_video()


if __name__ == '__main__':
    args = parse_args()
    asyncio.run(main(args.filename))

Build the container image

Create the following Dockerfile:

FROM python:3.10
RUN pip install motor opencv-python vidgear
RUN apt-get update && apt-get install -y libgl1-mesa-glx
COPY main.py .

And then build the image in Minikube as follows:

minikube image build batch-process-image:latest

Upload another video

curl -X POST -F "file=@Documents/wobbly2.mov" http://localhost/api/upload

Now, when you upload a video, you'll see the following error in the Velocity console:

This is the power of Velocity! You can develop Kubernetes native applications and see Kubernetes-specific tracebacks as if you were developing locally!

If you look closely, the above error is a 403 response from the Kubernetes API, which means that the FastAPI API is sending a request to the Kubernetes API, but the default Kubernetes Service Account (which is trying to execute the command the FastAPI app is sending) doesn't have permission to create the Job that it is trying to create. To fix this, we'll need to give the default Service Account permission to create a Batch/Job in the default namespace.

The Kubernetes Role and RoleBinding

To do this, we'll need to create the following Role and RoleBinding. The Role we'll create will be called jobs-creator and it will specify the “default” namespace, the “batch” API group, the “jobs” resource type within that API group, and the actions or “verbs” that this Role will be able to carry out.

Then, we'll associate this Role with the default Service Account by creating a related RoleBinding called the jobs-creator-binding. Notice that this RoleBinding specifies the default Service Account and the Role we just defined — the “jobs-creator.”

role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: jobs-creator
  namespace: default
rules:
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: jobs-creator-binding
  namespace: default
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
roleRef:
  kind: Role
  name: jobs-creator
  apiGroup: rbac.authorization.k8s.io

We can create the above resources with kubectl like we did with the others above:

kubectl apply -f role.yaml -n default

Upload a Video

Now, upload another video, and you will see output similar to that shown below, as the FastAPI application will now have permission to create a Batch/Jobs API object in the default namespace!

curl -X POST -F "file=@Documents/wobbly3.mov" http://localhost/api/upload

Velocity console:

web-api-6b879ccb6d-2q6bs web-api INFO:     10.244.0.2:48314 - "POST /api/upload HTTP/1.1" 200 OK
web-api-6b879ccb6d-2q6bs web-api DEBUG:kubernetes.client.rest:response body: {"kind":"Job","apiVersion":"batch/v1","metadata":{"name":"64c5474f5b28bf8b10eb7625abc63a43","namespace":"default","uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731","resourceVersion":"34460","generation":1,"creationTimestamp":"2023-09-06T15:57:36Z","labels":{"app":"64c5474f5b28bf8b10eb7625abc63a43","batch.kubernetes.io/controller-uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731","batch.kubernetes.io/job-name":"64c5474f5b28bf8b10eb7625abc63a43","controller-uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731","job-name":"64c5474f5b28bf8b10eb7625abc63a43"},"annotations":{"batch.kubernetes.io/job-tracking":""},"managedFields":[{"manager":"OpenAPI-Generator","operation":"Update","apiVersion":"batch/v1","time":"2023-09-06T15:57:36Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:app":{}}},"f:spec":{"f:backoffLimit":{},"f:completionMode":{},"f:completions":{},"f:parallelism":{},"f:suspend":{},"f:template":{"f:metadata":{"f:labels":{".":{},"f:app":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"64c5474f5b28bf8b10eb7625abc63a43\"}":{".":{},"f:command":{},"f:env":{".":{},"k:{\"name\":\"MONGO_HOST\"}":{".":{},"f:name":{},"f:value":{}},"k:{\"name\":\"MONGO_PORT\"}":{".":{},"f:name":{},"f:value":{}}},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:terminationGracePeriodSeconds":{}}}}}}]},"spec":{"parallelism":1,"completions":1,"backoffLimit":4,"selector":{"matchLabels":{"batch.kubernetes.io/controller-uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731"}},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"64c5474f5b28bf8b10eb7625abc63a43","batch.kubernetes.io/controller-uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731","batch.kubernetes.io/job-name":"64c5474f5b28bf8b10eb7625abc63a43","controller-uid":"78feec89-dcc6-494d-a7ef-b554e2b6f731","job-name":"64c5474f5b28bf8b10eb7625abc63a43"}},"spec":{"containers":[{"name":"64c5474f5b28bf8b10eb7625abc63a43","image":"jdvincent/batch-process-image:latest","command":["python","main.py","--filename","wobbly3.mov"],"env":[{"name":"MONGO_HOST","value":"mongo"},{"name":"MONGO_PORT","value":"27017"}],"resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"Always"}],"restartPolicy":"Never","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","securityContext":{},"schedulerName":"default-scheduler"}},"completionMode":"NonIndexed","suspend":false},"status":{}}

CURL response:

"Successfully uploaded wobbly.mov
View it in your browser at http://localhost/api/stream/wobbly3.mov
Kubernetes batch job is running; when complete,                        
you can view the processed video at http://localhost/api/stream/wobbly3_stabilized.mp4"

Check the Status of the Kubernetes Job

Now, in the Minikube dashboard, you can track the progress of the video processing job.

The running Batch/Job in the above screenshot is our video being processed! When this Batch/Job completes, you'll be able to navigate to the link that was returned from the Curl upload — i.e. http://localhost/api/stream/<video_basename>_stabilized.mp4 to stream the stabilized video!

Conclusion

Kubernetes native applications are those that are not only deployed in Kubernetes, but rather are built from Kubernetes API objects. Above, we saw how this application design approach can allow compute-intensive processes to run as independent and asynchronous workflows via Batch/Jobs, and we saw how Velocity can dramatically simplify the process of developing complex workflows such as this within a running Kubernetes cluster.

Without Velocity, we would have had to update our local code, wait for all relevant CI processes to complete, build a container image, and deploy that new image to Kubernetes in order to see if our updated code worked. But with Velocity, we were able to simply develop the service while it was deployed to our cluster, and see our updated code running almost immediately. 

Python class called ProcessVideo

Python class called ProcessVideo

Get started with Velocity