Get started with Velocity
Join the Waitlist
Join Our Discord
Blogs

Build a Real-Time Analytics Service in Python with Neon and Kafka

Velocity Team
Velocity Team
  
March 7, 2024

Build a Real-Time Analytics Service in Python with Neon and Kafka

Build a Real-Time Analytics Service in Python with Neon and Kafka

Latency is often a major bottleneck when developing cloud-native, data-intensive applications in a Kubernetes cluster. Whether you’re working with audio, video or — frankly — any variety of data at scale, moving that data to and from your local machine during development almost inevitably involves significantly increased latency as compared to a production cluster running in the cloud, which translates into tedious dev sessions and best guesses as to application performance in the wild.

Today, we'll look at an alternative approach with Velocity, as we develop and deploy a simple analytics service built with Kafka running in our Kubernetes cluster, and our Kafka stream being written to a managed Postgres instance we'll create in Neon.

Neon is an open-source, fully managed, cloud-native Postgres provider that separates compute from storage to support autoscaling, branching your database for development, and bottomless storage. Kafka, an open-source distributed event streaming platform for managing continuous streams of data, is purpose-built for handling high volumes of throughput, which will manage our demo analytics data stream with ease.

The app will be written in Python, and the full example includes a simple React frontend that will display the total number of “click” and “view” events over the past five minutes in a bar graph that refreshes every five seconds.

Topics we'll cover:

Project architecture

The app will consist of a simple React frontend, two core backend services, and a data generation service. Each service will be deployed in Kubernetes, and we’ll also deploy an Ingress that will enable HTTP access from outside of the cluster. 

Mock analytics data will be streamed from a data service into Kafka, and a worker service will then read that data from Kafka, and feed it into our Neon Postgres instance. Finally, we'll create a web-api service that will query the Neon instance in order to display the analytics data.

And once we have the app deployed to our cluster, we’ll start a Velocity development session to showcase how Velocity can dramatically reduce lag when working on a data-intensive application that’s running in Kubernetes, so that the application performance in your dev environment is actually aligned with its performance in production.

The full project is available on GitHub. 

Create a managed Postgres DB with Neon

First, you'll need to create a free account with Neon. Then, create a project called “AnalyticsExample.”

Next, create a new database called “analytics” within the project by clicking the “Database” dropdown and selecting “Create new database”.

Mock analytics data

Next, we'll create our data service that will generate a continuous stream of mock analytics data at a regular interval, which will be passed to Kafka via a Kafka producer we'll define in Python with the `confluent-kafka` library.

import os
import time
import json
import random
from confluent_kafka import Producer

KAFKA_HOST = os.environ.get('KAFKA_HOST')

# Kafka configuration
bootstrap_servers = f'{KAFKA_HOST}:9092'
topic = 'analytics_topic'

# Create a Kafka producer instance
producer = Producer({'bootstrap.servers': bootstrap_servers})

# List of possible event types
event_types = ['click', 'purchase', 'login', 'logout', 'view']

# List of user IDs
user_ids = ['user123', 'user456', 'user789', 'userabc', 'userxyz']

# List of page IDs
page_ids = ['page123', 'page456', 'page789', 'pageabc', 'pagexyz']


# Mock analytics data
def run():
    while True:
        # Generate random values for event fields
        event_type = random.choice(event_types)
        user_id = random.choice(user_ids)
        page_id = random.choice(page_ids)
        duration = random.randint(1, 10)

        # Create the analytics event
        event = {
            'timestamp': int(time.time()),
            'event_type': event_type,
            'user_id': user_id,
            'page_id': page_id,
            'duration': duration
        }

        # Convert event data to JSON
        event_json = json.dumps(event)
        print(event_json)

        # Produce the event to the Kafka topic
        producer.produce(topic, value=event_json)

        # Flush the producer to ensure the message is sent
        producer.flush()

        # Sleep for a certain duration to control the data flow rate
        time.sleep(10)  # Adjust the sleep duration as needed


if __name__ == '__main__':
    run()

Define the DB worker

Next, we'll need to define the Kafka consumer, which will allow the Worker service to read our Kafka data stream and write that data to our Neon Postgres instance. In order to write the data, we'll use the popular Python ORM SQLAlchemy.

import json
import os
import time
from confluent_kafka import Consumer
from db import Base, engine, SessionLocal
from models import AnalyticsData

# Create the database tables
Base.metadata.create_all(bind=engine)

KAFKA_HOST = os.environ.get('KAFKA_HOST')

# Kafka configuration
bootstrap_servers = f'{KAFKA_HOST}:9092'
topic = 'analytics_topic'
group_id = 'analytics_consumer_group'

# Create a Kafka consumer instance
consumer = Consumer({
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id
})

# Subscribe to the Kafka topic
consumer.subscribe([topic])

# Consume and process the analytics events
while True:
    # Poll for new messages
    message = consumer.poll(1.0)

    if message is None:
        continue

    if message.error():
        print(f"Consumer error: {message.error()}")
        continue

    # Retrieve the event data from the message
    event_json = message.value().decode('utf-8')

    try:
        # Parse the event data as JSON
        event = json.loads(event_json)

        # Generate a timestamp
        timestamp = int(time.time())

        # Create an instance of AnalyticsData
        analytics_data = AnalyticsData(
            timestamp=timestamp,
            event_type=event['event_type'],
            user_id=event['user_id'],
            page_id=event['page_id'],
            duration=event['duration']
        )

        # Insert the analytics data into the database
        db = SessionLocal()
        db.add(analytics_data)
        db.commit()
        db.refresh(analytics_data)

        print("Received and stored event:", event)

    except json.JSONDecodeError:
        print("Invalid JSON format:", event_json)

# Close the Kafka consumer
consumer.close()

The above file imports two other files we'll have to define as well — db.py and models.py, in which we'll define the database connection and our SQL table respectively.

db.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os

POSTGRES_USER = os.environ.get('POSTGRES_USER')
POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD')
POSTGRES_HOST = os.environ.get('POSTGRES_HOST')
POSTGRES_DB = os.environ.get('POSTGRES_DB')

# Database configuration
DATABASE_URL = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}/{POSTGRES_DB}'
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

models.py

from sqlalchemy import create_engine, Column, Integer, String
from db import Base

class AnalyticsData(Base):
    __tablename__ = 'analytics_data'
    id = Column(Integer, primary_key=True, index=True)
    timestamp = Column(Integer)
    event_type = Column(String)
    user_id = Column(String)
    page_id = Column(String)
    duration = Column(Integer)

Define the Web-API service

With the above data flow complete, we can now write the `web-api` service, which will query the Neon Postgres instance, so we can display the analytics data written there by the `worker` service.

from db import Base, engine, SessionLocal
from models import AnalyticsData
from sqlalchemy import select

Base.metadata.create_all(bind=engine)
app = FastAPI()

def get_analytics_data(event_type: str, start_date: str = None, end_date: str = None):
    db = SessionLocal()
    # Prepare the start and end dates
    start_datetime = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_datetime = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Create the base select statement
    select_stmt = select(AnalyticsData).where(AnalyticsData.event_type == event_type)

    # Apply date filters if provided
    if start_datetime:
        select_stmt = select_stmt.where(AnalyticsData.timestamp >= start_datetime)
    if end_datetime:
        select_stmt = select_stmt.where(AnalyticsData.timestamp <= end_datetime)

    # Execute the select statement and retrieve the results
    results = db.execute(select_stmt).scalars().all()

    return results

@app.get("/api/click-analytics")
def get_click_analytics(start_date: str = None, end_date: str = None):
    return get_analytics_data("click", start_date, end_date)

@app.get("/api/view-analytics")
def get_view_analytics(start_date: str = None, end_date: str = None):
    return get_analytics_data("view", start_date, end_date)

Note that this service includes the same db.py and models.py files as the `worker` service, as it will also need to connect to the database, and query the “analytics” database we created above.

Start a Kubernetes cluster

Now that our services are all defined, we can begin deploying the application in Kubernetes. If you have a cluster running, you can deploy it there to see the full benefit of Velocity. But for the purpose of easily running this example, we'll start a Minikube cluster. To do so, we can run the following with Docker installed locally.

brew install minikube
minikube config set memory 5120
minikube config set cpu 4
minikube start
We can then confirm that the cluster is up by running:
kubectl get all

Start a Kafka instance with Helm

Now that the cluster is up, we can deploy our various services. 

We'll use Helm — a popular package manager for K8s. To install Helm on MacOS, you can run the following:

brew install helm
With Helm installed, we can then run the following to start Kafka in our cluster:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-kafka bitnami/kafka --version 23.0.1

Containerize and deploy the remaining services

We'll first have to create a Dockerfile for each service we defined above, and then build those images in our Minikube cluster with the `minikube image build -t <image-name>` command.

NOTE: for all services that connect to your Neon database, you’ll need to define the Neon host according to the value provided in the Neon dashboard, as follows:

The Dockerfiles for each of our services will be similar to that shown below, but each service will also require a requirements.txt file that will include the specific Python packages required by that particular service, which will be unique for each of the services we defined above.

# Use a lightweight Python image
FROM python:3.11-slim-buster

# Set the working directory
WORKDIR /app

# Copy the requirements file
COPY requirements.txt .

# Install the Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy the source code
COPY main.py .
COPY db.py .
COPY models.py .

# Start the application using uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

Note: the full project includes Dockerfiles and requirements.txt files for each service we defined above.

Once all of our services have been built in Minikube, we'll need to define YAML manifests for deploying those services in Kubernetes. 

data.yaml

The following manifest defines a Kubernetes Deployment and a Kubernetes Service for the Data microservice we defined above.

apiVersion: v1
kind: List
items:
  - apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: data-service
      labels:
        api: data-service
    spec:
      selector:
        matchLabels:
          api: data-service
      replicas: 1
      template:
        metadata:
          labels:
            api: data-service
        spec:
          containers:
            - name: data-service
              image: notifications-data:latest
	  imagePullPolicy: IfNotPresent
              env:
                - name: KAFKA_HOST
                  value: my-kafka-headless
              ports:
                - name: data-service
                  containerPort: 8000
                  protocol: TCP

Again, as with the above section on containerizing our services, this process will need to be completed for each service we need to deploy. The full project includes a Helm chart with all of these services defined for your convenience.

Pass your Neon password to the above services as a Kubernetes Secret

To securely pass your Neon password as an environment variable, you’ll want to create a Kubernetes Secret. To create this, run the following from the command line, where <value> is your Neon password:

kubectl create secret generic neondb-credentials --from-literal=password=

Deploy the services to Kubernetes

Finally, with each of the above services defined in YAML manifests, we can run the following apply each of them to the cluster as follows:

kubectl apply -f .yaml -n default

View the Postgres table in Neon

After each of the services start, we will be able to see the data that our Data service is passing to Kafka, and which — in turn — our Worker service is writing to Postgres in the Neon dashboard by clicking on the “Tables” tab in the sidebar menu, and then clicking on “analytics_data” — the name of the table we defined with SQLAlchemy.

Develop the Web-API Service with Velocity

Now that the full application is deployed in Kubernetes, the process of developing it further can be rather tedious, as we would traditionally need to update our local source code, rebuild the image, push it to our registry — DockerHub in our case — and then redeploy the associated Kubernetes resources with the new image.

But with Velocity's free IDE plugin, you can connect to your running application in Kubernetes, and develop as you would locally.

Below, we go through the process of starting a Velocity development session, updating our local code, and automatically updating the remotely running code — the actual image that's running in Kubernetes — with our new code. This way, we don't have to go through all the above steps each time we want to develop or debug code that is running in Kubernetes. Instead, we can just write code, and Velocity updates the Kubernetes Deployment with our new code!

Conclusion

Neon is an open-source, highly scalable Serverless Postgres provider and Kafka is an open-source distributed event streaming platform that is built specifically to handle high volumes of throughput. Together, these services provide a strong foundation for handling and storing large streams of data efficiently and effectively.

Above, we demonstrated this capability by developing a microservice-based application that writes and retrieves analytics data generated continuously. We were able to further develop and debug the application in a very straightforward way after it had been deployed to Kubernetes with Velocity's IDE plugin.

If we hadn’t used Velocity, for every code change we made, we would have had to wait for all relevant CI processes to complete, rebuild the image, and deploy it to our Kubernetes cluster. Instead, we were able to simply update our code as if we were developing locally, and Velocity dynamically replaced our running image with one that included our current local code.

Python class called ProcessVideo

Python class called ProcessVideo

Get started with Velocity