Running Serverless AI with Knative and Ceph
This post was originally written while I worked at Koor Technologies. Since that website is no longer online, I am republishing that article here.

Machine Learning (ML) and Large Language Models (LLMs) are ubiquitous these days. They have applications ranging from generating images to writing articles (no, this article was written by a human 😉). There are many GenAI & ML models available online free to use by tinkerers, but you would need to pay to incorporate them into your business logic. That’s why a wide range of Machine Learning as a Service (MLaaS) providers became available.
But what if you wanted to escape the Big Cloud providers and build your own ML services? Whether you do this to save costs or to build internal processes, it could be worth it to consider deploying your ML models on Kubernetes. This tutorial builds on what we learned in part 1 of this series, where we deployed a basic serverless application with storage, and demonstrates deploying a pre-trained ML model using Knative and Ceph. This tutorial is self-contained though, so if you haven’t read the first part, you can do that another time.
As shown in the figure below, the pièce de résistance of our setup is the pre-trained ML model. We are using an open-source model from Hugging Face based on Stable diffusion called instruct-pix2pix. This model applies image editing prompts to an image. We added some more code around it to read the input image from a Ceph bucket and upload the output image to another Ceph bucket. We are also using Knative eventing to trigger the ML serverless function whenever an image is uploaded to the input Ceph bucket using the bucket notification feature in Ceph. You can replace the model with any other model that fits your needs to do more sophisticated image processing, generate accessible alt-texts, or dream up an image based on prompts. The possibilities are endless.

All the code for this tutorial is freely available on GitHub.
This tutorial is also available as a video:
Infrastructure 🏭
Prerequisites
You need to have a working Kubernetes cluster that meets the minimum requirements of both Rook and Knative. This tutorial uses Rook v1.13.0 and Knative v1.12.0 . This is a summary of the requirements for a production cluster:
- Kubernetes v1.26 or newer.
- At least three nodes, each having at least 2 CPUs, 4 GB of memory, and 20 GB of disk storage.
- Raw devices / partitions / logical volumes
- (Optional) Access to GPUs in the kubernetes cluster. This helps run the model faster, but you can run it without it.
We ran this using our demo cluster, which we set up using Terraform and Kubeone on Hetzner Cloud.
Rook Ceph
Installing Rook using Helm is pretty straightforward. First, you need to grab the helm repository and install the Rook-Ceph operator. The default values are pretty solid, so you might not need to specify a values.yaml
file.
helm repo add rook-release https://charts.rook.io/release
helm repo update
helm install --create-namespace --namespace rook-ceph rook-ceph rook-release/rook-ceph -f values.yaml
After that, we need to install the Rook Ceph cluster. Again, the default values are very sane, and they create a Ceph object bucket storage class called ceph-bucket
, which we will later use to create an ObjectBucketClaim
.
helm install --namespace rook-ceph rook-ceph-cluster \
--set operatorNamespace=rook-ceph rook-release/rook-ceph-cluster -f cluster-values.yaml
After a while, Rook should be ready. To check Ceph health, you can use the Rook toolbox or the Rook kubectl plugin:
$ kubectl rook-ceph ceph status
Info: running 'ceph' command with args: [status]
cluster:
id: 89bc2270-5e82-4833-99df-2d4338b0b81a
health: HEALTH_OK
services:
mon: 3 daemons, quorum a,b,c (age 5d)
mgr: a(active, since 5d), standbys: b
mds: 1/1 daemons up, 1 hot standby
osd: 3 osds: 3 up (since 5d), 3 in (since 5d)
rgw: 1 daemon active (1 hosts, 1 zones)
data:
volumes: 1/1 healthy
pools: 12 pools, 169 pgs
objects: 436 objects, 668 KiB
usage: 1.5 GiB used, 89 GiB / 90 GiB avail
pgs: 169 active+clean
io:
client: 33 KiB/s rd, 0 B/s wr, 33 op/s rd, 21 op/s wr
Knative Serving
There are many ways to install Knative as described in the docs. The easiest in my opinion is to use the Knative Operator:
kubectl apply -f https://github.com/knative/operator/releases/download/knative-v1.12.2/operator.yaml
To install Knative Serving, create the following yaml file specifying the KnativeServing custom resource:
# serving.yaml
apiVersion: v1
kind: Namespace
metadata:
name: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
spec:
ingress:
kourier:
enabled: true
services:
- name: kourier
# Override annotations for the kourier service
annotations:
# A provider-specific annotation is necessary to access the functions using a load balancer
# For Hetzner it is:
load-balancer.hetzner.cloud/name: "my-load-balancer"
config:
autoscaler:
# Retaining the function for longer keeps the machine learning model warm
# and helps with debugging
scale-to-zero-pod-retention-period: "20s"
domain:
# Replace with your domain if you would like to expose the service
"your-subdomain.example.com": ""
network:
ingress-class: "kourier.ingress.networking.knative.dev"
kubectl apply -f serving.yaml
Knative Eventing
We also need to install knative Eventing to handle bucket notifications. This is done by creating a KnativeEventing
custom resource. The Ceph bucket notifications event source is enabled from the same resource.
# eventing.yaml
apiVersion: v1
kind: Namespace
metadata:
name: knative-eventing
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
name: knative-eventing
namespace: knative-eventing
spec:
source:
ceph:
enabled: true
$ kubectl apply -f eventing.yaml
$ kubectl get deployment -n knative-eventing
NAME READY UP-TO-DATE AVAILABLE AGE
ceph-controller 1/1 1 1 44s
ceph-webhook 1/1 1 1 43s
eventing-controller 1/1 1 1 3m5s
eventing-webhook 1/1 1 1 3m5s
imc-controller 1/1 1 1 3m1s
imc-dispatcher 1/1 1 1 3m
mt-broker-controller 1/1 1 1 2m58s
mt-broker-filter 1/1 1 1 2m59s
mt-broker-ingress 1/1 1 1 2m59s
pingsource-mt-adapter 0/0 0 0 3m5s
$ kubectl get KnativeEventing knative-eventing -n knative-eventing
NAME VERSION READY REASON
knative-eventing 1.12.0 True
Tools 🧰
Knative Func
Knative func
is a CLI that makes it easy for users to create and deploy functions as Knative Services. You can install it using Homebrew:
brew tap knative-extensions/kn-plugins
brew install func
Or from GitHub releases:
wget -O func https://github.com/knative/func/releases/download/knative-v1.12.0/func_$(go env GOOS)_$(go env GOARCH)
chmod +x func
sudo mv func /usr/local/bin
func version
s5cmd
We also need a tool to interact with Ceph’s S3 API, like s5cmd
. It could be installed from GitHub releases or using Homebrew:
brew install peak/tap/s5cmd
Creating the Buckets 🪣
For our application we need two buckets. One to store the ML model’s inputs and another to store the outputs. Rook handles bucket creation and access through a custom resource called ObjectBucketClaim
, (OBC). This in turn creates a bucket with an auto-generated name in Ceph, and a ConfigMap
and a Secret
with the information needed to access the bucket.
# obc.yaml
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
name: knative-ml-inputs
spec:
generateBucketName: ml-inputs
storageClassName: ceph-bucket
---
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
name: knative-ml-outputs
spec:
generateBucketName: ml-outputs
storageClassName: ceph-bucket
$ kubectl apply -f obc.yaml
$ kubectl describe objectbucketclaims/knative-ml-inputs
Name: knative-ml-inputs
Namespace: default
...
Spec:
Bucket Name: ml-inputs-9f6e8659-7bb5-4534-8579-9930ac8236dd
Generate Bucket Name: ml-inputs
Object Bucket Name: obc-default-knative-ml-inputs
Storage Class Name: ceph-bucket
Status:
Phase: Bound
$ kubectl describe configmap/knative-ml-inputs
Name: knative-ml-inputs
Namespace: default
Labels: bucket-provisioner=rook-ceph.ceph.rook.io-bucket
Annotations: <none>
Data
====
BUCKET_HOST:
----
rook-ceph-rgw-ceph-objectstore.rook-ceph.svc
BUCKET_NAME:
----
ml-inputs-882f3043-3fc8-43b8-8171-cb7872434197
BUCKET_PORT:
----
80
...
$ kubectl describe secret/knative-ml-inputs
Name: knative-ml-inputs
Namespace: default
Labels: bucket-provisioner=rook-ceph.ceph.rook.io-bucket
Annotations: <none>
Type: Opaque
Data
====
AWS_ACCESS_KEY_ID: 20 bytes
AWS_SECRET_ACCESS_KEY: 40 bytes
The Code! 🧑💻
We will use a Knative function to load and execute the Machine Learning model. The model’s repository on Hugging Face recommends using the diffusers Python library and has some sample code. We just need to add access to the Ceph buckets we created earlier and use CloudEvents to get the object’s name.
Using func
, we create a Python function.
$ kn func create -l python ml
Created python function in /path/to/ml
$ tree ml -a
ml
├── app.sh
├── .func
├── .funcignore
├── func.py
├── func.yaml
├── .gitignore
├── Procfile
├── README.md
├── requirements.txt
└── test_func.py
Then, we need to add the ConfigMaps
and Secrets
for the two buckets as environment variables. This can be done by running func config env add
and following the prompts, but it is less tedious to add the following directly to ml/func.yaml
run:
envs:
- name: INPUTS_BUCKET_HOST
value: ""
- name: INPUTS_BUCKET_PORT
value: ""
- name: INPUTS_BUCKET_NAME
value: ""
- name: INPUTS_ACCESS_KEY_ID
value: ""
- name: INPUTS_SECRET_ACCESS_KEY
value: ""
- name: OUTPUTS_BUCKET_HOST
value: ""
- name: OUTPUTS_BUCKET_PORT
value: ""
- name: OUTPUTS_BUCKET_NAME
value: ""
- name: OUTPUTS_ACCESS_KEY_ID
value: ""
- name: OUTPUTS_SECRET_ACCESS_KEY
value: ""
All the information needed to access the buckets is now stored in environment variables. We can now fill in the code:
# ml/func.py
import os
import io
import pathlib
from parliament import Context
import boto3
import PIL
from diffusers import StableDiffusionInstructPix2PixPipeline, EulerAncestralDiscreteScheduler
# Access to the inputs bucket
in_endpoint = "http://" + os.environ["INPUTS_BUCKET_HOST"]
in_bucket = os.environ["INPUTS_BUCKET_NAME"]
in_s3 = boto3.client('s3',
endpoint_url=in_endpoint,
aws_access_key_id=os.environ["INPUTS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["INPUTS_SECRET_ACCESS_KEY"])
# Access to the outputs bucket
out_endpoint = "http://" + os.environ["OUTPUTS_BUCKET_HOST"]
out_bucket = os.environ["OUTPUTS_BUCKET_NAME"]
out_s3 = boto3.client('s3',
endpoint_url=in_endpoint,
aws_access_key_id=os.environ["OUTPUTS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["OUTPUTS_SECRET_ACCESS_KEY"])
# Load the model and keep in memory for subsequent runs
model_id = "timbrooks/instruct-pix2pix"
pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(model_id, safety_checker=None)
pipe.to("cpu") # Change to cuda if you have a gpu
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
prompt = "turn it into a cartoon"
def read_input_image(key: str) -> PIL.Image:
"""
Reads the image from the inputs bucket
"""
print("Read input image", flush=True)
file_byte_string = in_s3.get_object(Bucket=in_bucket, Key=key)["Body"].read()
image = PIL.Image.open(io.BytesIO(file_byte_string))
image = PIL.ImageOps.exif_transpose(image)
image = image.convert("RGB")
print("Input image size is " + str(image.size), flush=True)
return image
def write_output_image(image: PIL.Image, key: str):
"""
Writes the image to the output bucket
"""
print("Write output image")
print("Output image size " + str(image.size), flush=True)
# Save the image to an in-memory file
file = io.BytesIO()
file.name = pathlib.Path(key).name # Lets pillow figure out the file format
image.save(file)
file.seek(0)
# Upload image to s3
out_s3.upload_fileobj(file, out_bucket, key)
print("File is uploaded", flush=True)
def main(context: Context):
"""
Called when uploading an image to in_bucket.
Transforms the image using the prompt and uploads the result using out_bucket
"""
if context.cloud_event is None:
return "A cloud event is required", 400
event_attributes = context.cloud_event.get_attributes()
key = event_attributes['subject']
print("Key is " + key)
image = read_input_image(key)
result_images = pipe(prompt, image=image, num_inference_steps=10, image_guidance_scale=1).images
write_output_image(result_images[0], key)
return "Done", 200
Building and Deploying 🛠️
Let’s build and deploy this function. Replace docker.io/<your_username>
with your registry.
cd ml
kn func build --registry docker.io/<your_username>
kn func deploy
Bucket Notifications 🔔
We would like to notify our function of uploaded objects and process them in a queue. Knative Eventing handles incoming events through an event source, which can be passed on to a broker to trigger a function. See the figure above.
Let’s create a broker,
# broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: MTChannelBasedBroker
name: ml-broker
spec:
delivery:
backoffDelay: P10M # 10 minutes to allow for long execution
A trigger that calls the ml function,
# trigger.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: ml-trigger
spec:
broker: ml-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: ml
And a Knative Event Source
# ceph-source.yaml
apiVersion: sources.knative.dev/v1alpha1
kind: CephSource
metadata:
name: ml-ceph-source
namespace: default
spec:
port: "8888"
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
---
# This will be called by Ceph notifications.
# The address of this service will be http://ml-ceph-source-svc.default.svc:80
apiVersion: v1
kind: Service
metadata:
name: ml-ceph-source-svc
namespace: default
spec:
selector:
eventing.knative.dev/sourceName: ml-ceph-source
ports:
- protocol: TCP
port: 80
targetPort: 8888
kubectl apply -f broker.yaml
kubectl apply -f ceph-source.yaml
kubectl apply -f trigger.yaml
Rook Ceph supports the creation of bucket notifications through the custom resources CephBucketNotification
(which buckets will process notifications) and CephBucketTopic
(where the notifications will go). We will direct the bucket notifications to the service we created earlier.
# notifications.yaml
apiVersion: ceph.rook.io/v1
kind: CephBucketTopic
metadata:
name: ml-topic
spec:
objectStoreName: ceph-objectstore
objectStoreNamespace: rook-ceph
endpoint:
http:
# This is the service uri from before
uri: http://ml-ceph-source-svc.default.svc:80
disableVerifySSL: true
sendCloudEvents: true
---
apiVersion: ceph.rook.io/v1
kind: CephBucketNotification
metadata:
name: ml-notification
spec:
topic: ml-topic
events:
- s3:ObjectCreated:*
We also need to add a label to the ObjectBucketClaim
of the input bucket to apply the notifications to it.
kubectl apply -f notifications.yaml
kubectl patch obc/knative-ml-inputs --type merge \
-p '{"metadata": {"labels": {"bucket-notification-ml-notification":"ml-notification"}}}'
Let’s Run This! 🚀
Now we’re ready to roll! Well, not quite. We need to access the bucket from our local machine first. We could expose the bucket using an ingress like Nginx, or we could simply use a port-forward.
(Optional) External access using an nginx ingress
We are using cert-manager for the TLS certificates:# issuers.yaml
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: letsencrypt-prod
namespace: rook-ceph
spec:
acme:
# The ACME server URL
server: https://acme-v02.api.letsencrypt.org/directory
# Email address used for ACME registration
email: [email protected]
# Name of a secret used to store the ACME account private key
privateKeySecretRef:
name: letsencrypt-prod
# Enable the HTTP-01 challenge provider
solvers:
- http01:
ingress:
ingressClassName: nginx
helm repo add jetstack https://charts.jetstack.io
helm repo update
helm install \
cert-manager jetstack/cert-manager \
--namespace cert-manager \
--create-namespace \
--version v1.13.1 \
--set installCRDs=true
kubectl apply -f issuers.yaml
# nginx-values.yaml
controller:
config:
entries:
use-forwarded-headers: "true"
compute-full-forwarded-for: "true"
use-proxy-protocol: "true"
service:
annotations:
# This annotation is provicer-specific
load-balancer.hetzner.cloud/name: "your-lb"
helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm repo update
helm -n ingress-nginx install ingress-nginx ingress-nginx/ingress-nginx --create-namespace -f deploy/nginx-values.yaml
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: rook-ceph-rgw-external
namespace: rook-ceph
annotations:
cert-manager.io/issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/use-regex: "true"
spec:
ingressClassName: "nginx"
tls:
- hosts:
- your-subdomain.example.com
secretName: rook-ceph-rgw-external
rules:
- host: your-subdomain.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: rook-ceph-rgw-ceph-objectstore
port:
number: 80
kubectl apply -f deploy/ingress.yaml
(Optional) External access using a port-forward
You need to run this command in another terminal:$ kubectl port-forward -n rook-ceph service/rook-ceph-rgw-ceph-objectstore 8888:80
Forwarding from 127.0.0.1:8888 -> 8080
Forwarding from [::1]:8888 -> 8080
Ok, now we’re ready for real. Let’s set up the credentials:
export S3_ENDPOINT_URL=https://your-subdomain.example.com
# or if using port-forward
# export S3_ENDPOINT_URL=http://localhost:8888
export INPUTS_BUCKET_NAME=$(kubectl -n default get cm knative-ml-inputs -o jsonpath='{.data.BUCKET_NAME}')
export INPUTS_ACCESS_KEY_ID=$(kubectl -n default get secret knative-ml-inputs -o jsonpath='{.data.AWS_ACCESS_KEY_ID}' | base64 --decode)
export INPUTS_SECRET_ACCESS_KEY=$(kubectl -n default get secret knative-ml-inputs -o jsonpath='{.data.AWS_SECRET_ACCESS_KEY}' | base64 --decode)
export OUTPUTS_BUCKET_NAME=$(kubectl -n default get cm knative-ml-outputs -o jsonpath='{.data.BUCKET_NAME}')
export OUTPUTS_ACCESS_KEY_ID=$(kubectl -n default get secret knative-ml-outputs -o jsonpath='{.data.AWS_ACCESS_KEY_ID}' | base64 --decode)
export OUTPUTS_SECRET_ACCESS_KEY=$(kubectl -n default get secret knative-ml-outputs -o jsonpath='{.data.AWS_SECRET_ACCESS_KEY}' | base64 --decode)
mkdir ~/.aws
cat > ~/.aws/credentials << EOF
[inputs]
aws_access_key_id = ${INPUTS_ACCESS_KEY_ID}
aws_secret_access_key = ${INPUTS_SECRET_ACCESS_KEY}
[outputs]
aws_access_key_id = ${OUTPUTS_ACCESS_KEY_ID}
aws_secret_access_key = ${OUTPUTS_SECRET_ACCESS_KEY}
EOF
To test our pipeline, we will use an image at the path input-imgs/pexels-justin-shaifer.jpg
.
$ s5cmd --profile inputs cp input-imgs/pexels-justin-shaifer.jpg s3://$INPUTS_BUCKET_NAME
cp input-imgs/pexels-justin-shaifer.jpg s3://ml-inputs-882f3043-3fc8-43b8-8171-cb7872434197/pexels-justin-shaifer.jpg
$ s5cmd --profile inputs ls s3://$INPUTS_BUCKET_NAME
2024/02/28 07:00:47 29146 pexels-justin-shaifer.jpg
We can track the progress of the function using kubectl logs -f
:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
cephsource-my-ceph-source-416fe9a2-82f8-4f2a-9e73-7dd1545fg7qxc 1/1 Running 0 6d13h
knative-operator-5945688b59-lh2tq 1/1 Running 0 6d13h
ml-00001-deployment-9c5cbfb95-cn6hl 2/2 Running 0 7s
operator-webhook-8557b8ddff-2kzp9 1/1 Running 0 6d13h
$ kubectl logs ml-00001-deployment-9c5cbfb95-cn6hl
...
Loading pipeline components...: 100%|██████████| 6/6 [00:00<00:00, 9.89it/s]0, 495MB/s]
Key is pexels-justin-shaifer.jpg
Read input image
Input image size is (640, 427)
100%|██████████| 10/10 [00:54<00:00, 5.45s/it]
Write output image
Output image size (640, 424)
File is uploaded
When the function is done processing the image, you can find it in the output bucket:
$ s5cmd --profile outputs ls s3://$OUTPUTS_BUCKET_NAME
2024/02/28 07:05:16 13750 pexels-justin-shaifer.jpg
$ s5cmd --profile outputs cp s3://$OUTPUTS_BUCKET_NAME/pexels-justin-shaifer.jpg .
cp s3://ml-outputs-a560800a-9fa8-4f13-bc0d-3e787dbfbc7b/pexels-justin-shaifer.jpg pexels-justin-shaifer.jpg


Rook Ceph is a Perfect Match for Your AI Needs
As we saw in this tutorial, Rook Ceph effortlessly fits your Machine Learning storage needs. You can run them both on the same cluster or use a separate cluster to host your Ceph S3 buckets.
Enjoy Reading This Article?
Here are some more articles you might like to read next: