Running Serverless AI with Knative and Ceph

This post was originally written while I worked at Koor Technologies. Since that website is no longer online, I am republishing that article here.

Machine Learning (ML) and Large Language Models (LLMs) are ubiquitous these days. They have applications ranging from generating images to writing articles (no, this article was written by a human 😉). There are many GenAI & ML models available online free to use by tinkerers, but you would need to pay to incorporate them into your business logic. That’s why a wide range of Machine Learning as a Service (MLaaS) providers became available.

But what if you wanted to escape the Big Cloud providers and build your own ML services? Whether you do this to save costs or to build internal processes, it could be worth it to consider deploying your ML models on Kubernetes. This tutorial builds on what we learned in part 1 of this series, where we deployed a basic serverless application with storage, and demonstrates deploying a pre-trained ML model using Knative and Ceph. This tutorial is self-contained though, so if you haven’t read the first part, you can do that another time.

As shown in the figure below, the pièce de résistance of our setup is the pre-trained ML model. We are using an open-source model from Hugging Face based on Stable diffusion called instruct-pix2pix. This model applies image editing prompts to an image. We added some more code around it to read the input image from a Ceph bucket and upload the output image to another Ceph bucket. We are also using Knative eventing to trigger the ML serverless function whenever an image is uploaded to the input Ceph bucket using the bucket notification feature in Ceph. You can replace the model with any other model that fits your needs to do more sophisticated image processing, generate accessible alt-texts, or dream up an image based on prompts. The possibilities are endless.

Using Ceph bucket notifications to trigger the pre-trained ML model

All the code for this tutorial is freely available on GitHub.

This tutorial is also available as a video:

Infrastructure 🏭

Prerequisites

You need to have a working Kubernetes cluster that meets the minimum requirements of both Rook and Knative. This tutorial uses Rook v1.13.0 and Knative v1.12.0 . This is a summary of the requirements for a production cluster:

  • Kubernetes v1.26 or newer.
  • At least three nodes, each having at least 2 CPUs, 4 GB of memory, and 20 GB of disk storage.
  • Raw devices / partitions / logical volumes
  • (Optional) Access to GPUs in the kubernetes cluster. This helps run the model faster, but you can run it without it.

We ran this using our demo cluster, which we set up using Terraform and Kubeone on Hetzner Cloud.

Rook Ceph

Installing Rook using Helm is pretty straightforward. First, you need to grab the helm repository and install the Rook-Ceph operator. The default values are pretty solid, so you might not need to specify a values.yaml file.

helm repo add rook-release https://charts.rook.io/release
helm repo update
helm install --create-namespace --namespace rook-ceph rook-ceph rook-release/rook-ceph -f values.yaml

After that, we need to install the Rook Ceph cluster. Again, the default values are very sane, and they create a Ceph object bucket storage class called ceph-bucket, which we will later use to create an ObjectBucketClaim.

helm install --namespace rook-ceph rook-ceph-cluster \
    --set operatorNamespace=rook-ceph rook-release/rook-ceph-cluster -f cluster-values.yaml

After a while, Rook should be ready. To check Ceph health, you can use the Rook toolbox or the Rook kubectl plugin:

$ kubectl rook-ceph ceph status
Info: running 'ceph' command with args: [status]
  cluster:
    id:     89bc2270-5e82-4833-99df-2d4338b0b81a
    health: HEALTH_OK

  services:
    mon: 3 daemons, quorum a,b,c (age 5d)
    mgr: a(active, since 5d), standbys: b
    mds: 1/1 daemons up, 1 hot standby
    osd: 3 osds: 3 up (since 5d), 3 in (since 5d)
    rgw: 1 daemon active (1 hosts, 1 zones)

  data:
    volumes: 1/1 healthy
    pools:   12 pools, 169 pgs
    objects: 436 objects, 668 KiB
    usage:   1.5 GiB used, 89 GiB / 90 GiB avail
    pgs:     169 active+clean

  io:
    client:   33 KiB/s rd, 0 B/s wr, 33 op/s rd, 21 op/s wr

Knative Serving

There are many ways to install Knative as described in the docs. The easiest in my opinion is to use the Knative Operator:

kubectl apply -f https://github.com/knative/operator/releases/download/knative-v1.12.2/operator.yaml

To install Knative Serving, create the following yaml file specifying the KnativeServing custom resource:

# serving.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
  name: knative-serving
  namespace: knative-serving
spec:
  ingress:
    kourier:
      enabled: true
  services:
    - name: kourier
      # Override annotations for the kourier service
      annotations:
        # A provider-specific annotation is necessary to access the functions using a load balancer
        # For Hetzner it is:
        load-balancer.hetzner.cloud/name: "my-load-balancer"
  config:
    autoscaler:
      # Retaining the function for longer keeps the machine learning model warm
      # and helps with debugging
      scale-to-zero-pod-retention-period: "20s"
    domain:
      # Replace with your domain if you would like to expose the service
      "your-subdomain.example.com": ""
    network:
      ingress-class: "kourier.ingress.networking.knative.dev"
kubectl apply -f serving.yaml

Knative Eventing

We also need to install knative Eventing to handle bucket notifications. This is done by creating a KnativeEventing custom resource. The Ceph bucket notifications event source is enabled from the same resource.

# eventing.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: knative-eventing
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec:
  source:
    ceph:
      enabled: true
$ kubectl apply -f eventing.yaml
$ kubectl get deployment -n knative-eventing
NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
ceph-controller         1/1     1            1           44s
ceph-webhook            1/1     1            1           43s
eventing-controller     1/1     1            1           3m5s
eventing-webhook        1/1     1            1           3m5s
imc-controller          1/1     1            1           3m1s
imc-dispatcher          1/1     1            1           3m
mt-broker-controller    1/1     1            1           2m58s
mt-broker-filter        1/1     1            1           2m59s
mt-broker-ingress       1/1     1            1           2m59s
pingsource-mt-adapter   0/0     0            0           3m5s
$ kubectl get KnativeEventing knative-eventing -n knative-eventing
NAME               VERSION   READY   REASON
knative-eventing   1.12.0    True

Tools 🧰

Knative Func

Knative func is a CLI that makes it easy for users to create and deploy functions as Knative Services. You can install it using Homebrew:

brew tap knative-extensions/kn-plugins
brew install func

Or from GitHub releases:

wget -O func https://github.com/knative/func/releases/download/knative-v1.12.0/func_$(go env GOOS)_$(go env GOARCH)
chmod +x func
sudo mv func /usr/local/bin
func version

s5cmd

We also need a tool to interact with Ceph’s S3 API, like s5cmd. It could be installed from GitHub releases or using Homebrew:

brew install peak/tap/s5cmd

Creating the Buckets 🪣

For our application we need two buckets. One to store the ML model’s inputs and another to store the outputs. Rook handles bucket creation and access through a custom resource called ObjectBucketClaim, (OBC). This in turn creates a bucket with an auto-generated name in Ceph, and a ConfigMap and a Secret with the information needed to access the bucket.

# obc.yaml
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
  name: knative-ml-inputs
spec:
  generateBucketName: ml-inputs
  storageClassName: ceph-bucket
---
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
  name: knative-ml-outputs
spec:
  generateBucketName: ml-outputs
  storageClassName: ceph-bucket
$ kubectl apply -f obc.yaml
$ kubectl describe objectbucketclaims/knative-ml-inputs
Name:         knative-ml-inputs
Namespace:    default
...
Spec:
  Bucket Name:           ml-inputs-9f6e8659-7bb5-4534-8579-9930ac8236dd
  Generate Bucket Name:  ml-inputs
  Object Bucket Name:    obc-default-knative-ml-inputs
  Storage Class Name:    ceph-bucket
Status:
  Phase:  Bound
$ kubectl describe configmap/knative-ml-inputs
Name:         knative-ml-inputs
Namespace:    default
Labels:       bucket-provisioner=rook-ceph.ceph.rook.io-bucket
Annotations:  <none>

Data
====
BUCKET_HOST:
----
rook-ceph-rgw-ceph-objectstore.rook-ceph.svc
BUCKET_NAME:
----
ml-inputs-882f3043-3fc8-43b8-8171-cb7872434197
BUCKET_PORT:
----
80
...
$ kubectl describe secret/knative-ml-inputs
Name:         knative-ml-inputs
Namespace:    default
Labels:       bucket-provisioner=rook-ceph.ceph.rook.io-bucket
Annotations:  <none>

Type:  Opaque

Data
====
AWS_ACCESS_KEY_ID:      20 bytes
AWS_SECRET_ACCESS_KEY:  40 bytes

The Code! 🧑‍💻

We will use a Knative function to load and execute the Machine Learning model. The model’s repository on Hugging Face recommends using the diffusers Python library and has some sample code. We just need to add access to the Ceph buckets we created earlier and use CloudEvents to get the object’s name.

Using func, we create a Python function.

$ kn func create -l python ml
Created python function in /path/to/ml
$ tree ml -a
ml
├── app.sh
├── .func
├── .funcignore
├── func.py
├── func.yaml
├── .gitignore
├── Procfile
├── README.md
├── requirements.txt
└── test_func.py

Then, we need to add the ConfigMaps and Secrets for the two buckets as environment variables. This can be done by running func config env add and following the prompts, but it is less tedious to add the following directly to ml/func.yaml

run:
  envs:
    - name: INPUTS_BUCKET_HOST
      value: ""
    - name: INPUTS_BUCKET_PORT
      value: ""
    - name: INPUTS_BUCKET_NAME
      value: ""
    - name: INPUTS_ACCESS_KEY_ID
      value: ""
    - name: INPUTS_SECRET_ACCESS_KEY
      value: ""
    - name: OUTPUTS_BUCKET_HOST
      value: ""
    - name: OUTPUTS_BUCKET_PORT
      value: ""
    - name: OUTPUTS_BUCKET_NAME
      value: ""
    - name: OUTPUTS_ACCESS_KEY_ID
      value: ""
    - name: OUTPUTS_SECRET_ACCESS_KEY
      value: ""

All the information needed to access the buckets is now stored in environment variables. We can now fill in the code:

# ml/func.py
import os
import io
import pathlib
from parliament import Context
import boto3
import PIL
from diffusers import StableDiffusionInstructPix2PixPipeline, EulerAncestralDiscreteScheduler

# Access to the inputs bucket
in_endpoint = "http://" + os.environ["INPUTS_BUCKET_HOST"]
in_bucket = os.environ["INPUTS_BUCKET_NAME"]
in_s3 = boto3.client('s3',
    endpoint_url=in_endpoint,
    aws_access_key_id=os.environ["INPUTS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["INPUTS_SECRET_ACCESS_KEY"])

# Access to the outputs bucket
out_endpoint = "http://" + os.environ["OUTPUTS_BUCKET_HOST"]
out_bucket = os.environ["OUTPUTS_BUCKET_NAME"]
out_s3 = boto3.client('s3',
    endpoint_url=in_endpoint,
    aws_access_key_id=os.environ["OUTPUTS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["OUTPUTS_SECRET_ACCESS_KEY"])

# Load the model and keep in memory for subsequent runs
model_id = "timbrooks/instruct-pix2pix"
pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(model_id, safety_checker=None)
pipe.to("cpu") # Change to cuda if you have a gpu
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)

prompt = "turn it into a cartoon"

def read_input_image(key: str) -> PIL.Image:
    """
    Reads the image from the inputs bucket
    """
    print("Read input image", flush=True)
    file_byte_string = in_s3.get_object(Bucket=in_bucket, Key=key)["Body"].read()
    image = PIL.Image.open(io.BytesIO(file_byte_string))
    image = PIL.ImageOps.exif_transpose(image)
    image = image.convert("RGB")
    print("Input image size is " + str(image.size), flush=True)
    return image

def write_output_image(image: PIL.Image, key: str):
    """
    Writes the image to the output bucket
    """
    print("Write output image")
    print("Output image size " + str(image.size), flush=True)
    # Save the image to an in-memory file
    file = io.BytesIO()
    file.name = pathlib.Path(key).name # Lets pillow figure out the file format
    image.save(file)
    file.seek(0)

    # Upload image to s3
    out_s3.upload_fileobj(file, out_bucket, key)
    print("File is uploaded", flush=True)

def main(context: Context):
    """
    Called when uploading an image to in_bucket.
    Transforms the image using the prompt and uploads the result using out_bucket
    """
    if context.cloud_event is None:
        return "A cloud event is required", 400

    event_attributes = context.cloud_event.get_attributes()
    key = event_attributes['subject']
    print("Key is " + key)

    image = read_input_image(key)
    result_images = pipe(prompt, image=image, num_inference_steps=10, image_guidance_scale=1).images
    write_output_image(result_images[0], key)
    return "Done", 200

Building and Deploying 🛠️

Let’s build and deploy this function. Replace docker.io/<your_username> with your registry.

cd ml
kn func build --registry docker.io/<your_username>
kn func deploy

Bucket Notifications 🔔

We would like to notify our function of uploaded objects and process them in a queue. Knative Eventing handles incoming events through an event source, which can be passed on to a broker to trigger a function. See the figure above.

Let’s create a broker,

# broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: MTChannelBasedBroker
  name: ml-broker
spec:
  delivery:
    backoffDelay: P10M # 10 minutes to allow for long execution

A trigger that calls the ml function,

# trigger.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: ml-trigger
spec:
  broker: ml-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: ml

And a Knative Event Source

# ceph-source.yaml
apiVersion: sources.knative.dev/v1alpha1
kind: CephSource
metadata:
  name: ml-ceph-source
  namespace: default
spec:
  port: "8888"
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
---
# This will be called by Ceph notifications.
# The address of this service will be http://ml-ceph-source-svc.default.svc:80
apiVersion: v1
kind: Service
metadata:
  name: ml-ceph-source-svc
  namespace: default
spec:
  selector:
    eventing.knative.dev/sourceName: ml-ceph-source
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8888
kubectl apply -f broker.yaml
kubectl apply -f ceph-source.yaml
kubectl apply -f trigger.yaml

Rook Ceph supports the creation of bucket notifications through the custom resources CephBucketNotification (which buckets will process notifications) and CephBucketTopic (where the notifications will go). We will direct the bucket notifications to the service we created earlier.

# notifications.yaml
apiVersion: ceph.rook.io/v1
kind: CephBucketTopic
metadata:
  name: ml-topic
spec:
  objectStoreName: ceph-objectstore
  objectStoreNamespace: rook-ceph
  endpoint:
    http:
      # This is the service uri from before
      uri: http://ml-ceph-source-svc.default.svc:80
      disableVerifySSL: true
      sendCloudEvents: true
---
apiVersion: ceph.rook.io/v1
kind: CephBucketNotification
metadata:
  name: ml-notification
spec:
  topic: ml-topic
  events:
    - s3:ObjectCreated:*

We also need to add a label to the ObjectBucketClaim of the input bucket to apply the notifications to it.

kubectl apply -f notifications.yaml
kubectl patch obc/knative-ml-inputs --type merge \
    -p '{"metadata": {"labels": {"bucket-notification-ml-notification":"ml-notification"}}}'

Let’s Run This! 🚀

Now we’re ready to roll! Well, not quite. We need to access the bucket from our local machine first. We could expose the bucket using an ingress like Nginx, or we could simply use a port-forward.

(Optional) External access using an nginx ingress We are using cert-manager for the TLS certificates:
# issuers.yaml
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
  name: letsencrypt-prod
  namespace: rook-ceph
spec:
  acme:
    # The ACME server URL
    server: https://acme-v02.api.letsencrypt.org/directory
    # Email address used for ACME registration
    email: [email protected]
    # Name of a secret used to store the ACME account private key
    privateKeySecretRef:
      name: letsencrypt-prod
    # Enable the HTTP-01 challenge provider
    solvers:
      - http01:
          ingress:
            ingressClassName: nginx
helm repo add jetstack https://charts.jetstack.io
helm repo update

helm install \
  cert-manager jetstack/cert-manager \
  --namespace cert-manager \
  --create-namespace \
  --version v1.13.1 \
  --set installCRDs=true

kubectl apply -f issuers.yaml
As well as nginx for ingress:
# nginx-values.yaml
controller:
config:
  entries:
    use-forwarded-headers: "true"
    compute-full-forwarded-for: "true"
    use-proxy-protocol: "true"
service:
  annotations:
    # This annotation is provicer-specific
    load-balancer.hetzner.cloud/name: "your-lb"
helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm repo update
helm -n ingress-nginx install ingress-nginx ingress-nginx/ingress-nginx --create-namespace -f deploy/nginx-values.yaml
Then we create an ingress:
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: rook-ceph-rgw-external
  namespace: rook-ceph
  annotations:
    cert-manager.io/issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/use-regex: "true"
spec:
  ingressClassName: "nginx"
  tls:
    - hosts:
        - your-subdomain.example.com
      secretName: rook-ceph-rgw-external
  rules:
    - host: your-subdomain.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: rook-ceph-rgw-ceph-objectstore
                port:
                  number: 80
kubectl apply -f deploy/ingress.yaml
(Optional) External access using a port-forward You need to run this command in another terminal:
$ kubectl port-forward -n rook-ceph service/rook-ceph-rgw-ceph-objectstore 8888:80
Forwarding from 127.0.0.1:8888 -> 8080
Forwarding from [::1]:8888 -> 8080

Ok, now we’re ready for real. Let’s set up the credentials:

export S3_ENDPOINT_URL=https://your-subdomain.example.com
# or if using port-forward
# export S3_ENDPOINT_URL=http://localhost:8888

export INPUTS_BUCKET_NAME=$(kubectl -n default get cm knative-ml-inputs -o jsonpath='{.data.BUCKET_NAME}')
export INPUTS_ACCESS_KEY_ID=$(kubectl -n default get secret knative-ml-inputs -o jsonpath='{.data.AWS_ACCESS_KEY_ID}' | base64 --decode)
export INPUTS_SECRET_ACCESS_KEY=$(kubectl -n default get secret knative-ml-inputs -o jsonpath='{.data.AWS_SECRET_ACCESS_KEY}' | base64 --decode)

export OUTPUTS_BUCKET_NAME=$(kubectl -n default get cm knative-ml-outputs -o jsonpath='{.data.BUCKET_NAME}')
export OUTPUTS_ACCESS_KEY_ID=$(kubectl -n default get secret knative-ml-outputs -o jsonpath='{.data.AWS_ACCESS_KEY_ID}' | base64 --decode)
export OUTPUTS_SECRET_ACCESS_KEY=$(kubectl -n default get secret knative-ml-outputs -o jsonpath='{.data.AWS_SECRET_ACCESS_KEY}' | base64 --decode)

mkdir ~/.aws
cat > ~/.aws/credentials << EOF
[inputs]
aws_access_key_id = ${INPUTS_ACCESS_KEY_ID}
aws_secret_access_key = ${INPUTS_SECRET_ACCESS_KEY}

[outputs]
aws_access_key_id = ${OUTPUTS_ACCESS_KEY_ID}
aws_secret_access_key = ${OUTPUTS_SECRET_ACCESS_KEY}
EOF

To test our pipeline, we will use an image at the path input-imgs/pexels-justin-shaifer.jpg.

$ s5cmd --profile inputs cp input-imgs/pexels-justin-shaifer.jpg s3://$INPUTS_BUCKET_NAME
cp input-imgs/pexels-justin-shaifer.jpg s3://ml-inputs-882f3043-3fc8-43b8-8171-cb7872434197/pexels-justin-shaifer.jpg
$ s5cmd --profile inputs ls s3://$INPUTS_BUCKET_NAME
2024/02/28 07:00:47             29146  pexels-justin-shaifer.jpg

We can track the progress of the function using kubectl logs -f:

$ kubectl get pods
NAME                                                              READY   STATUS        RESTARTS   AGE
cephsource-my-ceph-source-416fe9a2-82f8-4f2a-9e73-7dd1545fg7qxc   1/1     Running       0          6d13h
knative-operator-5945688b59-lh2tq                                 1/1     Running       0          6d13h
ml-00001-deployment-9c5cbfb95-cn6hl                               2/2     Running       0          7s
operator-webhook-8557b8ddff-2kzp9                                 1/1     Running       0          6d13h
$ kubectl logs ml-00001-deployment-9c5cbfb95-cn6hl
...
Loading pipeline components...: 100%|██████████| 6/6 [00:00<00:00,  9.89it/s]0, 495MB/s]
Key is pexels-justin-shaifer.jpg
Read input image
Input image size is (640, 427)
100%|██████████| 10/10 [00:54<00:00,  5.45s/it]
Write output image
Output image size (640, 424)
File is uploaded

When the function is done processing the image, you can find it in the output bucket:

$ s5cmd --profile outputs ls s3://$OUTPUTS_BUCKET_NAME
2024/02/28 07:05:16             13750  pexels-justin-shaifer.jpg
$ s5cmd --profile outputs cp s3://$OUTPUTS_BUCKET_NAME/pexels-justin-shaifer.jpg .
cp s3://ml-outputs-a560800a-9fa8-4f13-bc0d-3e787dbfbc7b/pexels-justin-shaifer.jpg pexels-justin-shaifer.jpg
Left: Photo by Justin Shaifer from Pexels, Right: The output image using the prompt "turn it into a cartoon"

Rook Ceph is a Perfect Match for Your AI Needs

As we saw in this tutorial, Rook Ceph effortlessly fits your Machine Learning storage needs. You can run them both on the same cluster or use a separate cluster to host your Ceph S3 buckets.




    Enjoy Reading This Article?

    Here are some more articles you might like to read next:

  • Serverless Storage with Knative and Ceph
  • Welcome to my blog!