Skip to content

Consuming AWS S3 Events with Knative Eventing, Functions and Apache Camel K

Published on: 2024-06-14

Consuming AWS S3 Events with Knative Eventing, Functions and Apache Camel K

Author: Matthias Weßendorf, Senior Principal Software Engineer @ Red Hat

In this blog post you will learn how to easily consume events from an AWS S3 bucket in a Knative Function with Knative Eventing and Functions. The blog post builds up on the first post of the series on Apache Camel K and Knative

One of the typical use-cases for a Serverless Function is to react to events delivered by an external source of events. A common example of this is to receive notifications from an AWS S3 bucket in a Serverless Function. But how can you receive those events in an on premise environment, where instead of running on AWS the function runs on a custom Kubernetes setup?

Installation

The Installation from Apache Camel K offers a few choices, such as CLI, Kustomize, OLM or Helm. For example, the Helm installation is:

$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k

Besides Camel K we also need to have Knative Eventing installed, as described in the documentation.

Creating a Knative Broker instance

We are using a Knative Broker as the heart of our system, acting as an Event Mesh for both event producers and event consumers:

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  namespace: default
  name: my-broker

Now event producers can send events to it and event consumers can receive events.

Using Kamelets as Event Sources for AWS S3

In order to bind a Kamelet to a Knative component, like the broker we created above, we are using the Pipe API. A Pipe allows to declaratively move data from a system described by a Kamelet towards a Knative resource or from a Knative resource to another (external) system described by a Kamelet.

Below is a Pipe that uses a ready-to-use Kamelet, a aws-s3-source

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: aws-s3-source-pipe
  annotations:
    trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
  integration:
    dependencies:
    - "camel:cloudevents"
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-s3-source
    properties:
      bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
      accessKey: "${aws.s3.accessKey}"
      secretKey: "${aws.s3.secretKey}"
      region: "${aws.s3.region}"
    dataTypes:
      out:
        scheme: aws2-s3
        format: application-cloudevents
  sink:
    dataTypes:
      in:
        scheme: http
        format: application-cloudevents
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: my-broker

The aws-s3-source Kamelet is referenced as the source of the Pipe and sends CNCF CloudEvents to the outbound sink, when there is activity in the referenced bucket. Here we use the Knative Broker, which accepts CloudEvents. Later we will connect a Knative Function to the Broker for receiving events from it.

Note

The AWS S3 properties are stored in a secret, which is mounted into the Pipe, via the trait.camel.apache.org/mount.config annotation.

Creating a Knative Function as a consumer

In order to consume messages from the Knative broker, using Knative Function, we need will create a simple Golang function. Since the payload is sent as CloudEvents to the function we use the buildin cloudevents template, by executing the the following command:

$ func create -l go -t cloudevents s3-logger

This gives you a new project in the s3-logger folder of your current directory, and it contains a Golang file for the Knative Function:

package function

import (
    "context"
    "fmt"

    "github.com/cloudevents/sdk-go/v2/event"
)

// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    /*
     * YOUR CODE HERE
     *
     * Try running `go test`.  Add more test as you code in `handle_test.go`.
     */

    fmt.Println("Received event")
    fmt.Println(ce) // echo to local output
    return &e, nil // echo to caller
}

We can now modify the Handle function to print out a few attributes of the CloudEvent:

func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    fmt.Println("Received S3 event notification")
    fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
    fmt.Println("CloudEvent Source attribute:  " + ce.Source())

  // Some processing of the payload of the CloudEvent...

    return nil, nil

The above Handle function just prints the filename, represented by the subject attribute and the full qualified name of the bucket, which is stored as the source attribute of the received CloudEvent.

Note

Currently the AWS-S3-Source sends the entire data, so the full file is also accessible via the data property.

Subscribe the Knative Function to AWS S3 events

In order to be able to receive the S3 event notifications, we need to subscribe the function to the broker, which provides the events. But how do we know what events are available? For that we check what Knative EventTypes are available in the system:

$ kubectl get eventtypes.eventing.knative.dev 
NAME                                            TYPE                                      SOURCE                       SCHEMA   REFERENCE NAME   REFERENCE KIND   DESCRIPTION                             READY   REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263   org.apache.camel.event.aws.s3.getObject   aws.s3.bucket.<bucketname>            my-broker        Broker           Event Type auto-created by controller   True

Now we know that in our namespace there are org.apache.camel.event.aws.s3.getObject events available on the my-broker broker.

We can now perform the subscription of the function to that event type from the given broker:

$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker

This updates the func.yaml metadata of the project and when the function is being deployed, the CLI will build and deploy the consumer and creates a Knative Trigger for it, with matching filter arguments:

$ func deploy

Once deployed, on the terminal of the Knative Function the following output is visible for each S3 event:

Received S3 event notification

CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute:  aws.s3.bucket.<bucketname>

Conclusion

With Knative Eventing, Functions and Apache Camel K it is possible to trigger notifications from 3rd party cloud services, such as AWS S3, to functions running on your own on-premise Kubernetes cluster.

We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.

× OK