Running an Pipe

The Pipe is a concept that enable the user to cerate a "composable" Event Driven Architecture design. The Pipe can bind source and sink endpoints where an endpoint represents a source/sink external entity (could be any Camel URI or a Kubernetes resource such as Kamelets, Kafka (Strimzi) or Knative resources).

make sure you’re familiar with the concept of Kamelet before continuing.

The operator is in charge to transform a binding between a source and a sink and transform into a running Integration taking care to do all the building involved and the transformation required.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:foo

The above example is the simplest example we can use to show how to "connect" a Camel URI source to a Camel URI sink. You can run it executing kubectl apply -f timer-to-log.yaml. Once executed, you can check the status of your Pipe:

kubectl get pipe -w

NAME           PHASE      REPLICAS
timer-to-log   Creating
timer-to-log   Ready      0
timer-to-log   Ready      1

The operator has taken the Pipe and has created an Integration from the Pipe configuration. The Integration is the resource that will run your final application and you can look at it accordingly:

NAME             PHASE     READY   RUNTIME PROVIDER   RUNTIME VERSION   CATALOG VERSION   KIT                        REPLICAS
timer-to-log     Running   True    quarkus            3.8.1             3.8.1             kit-crbgrhmn5tgc73cb1tl0   1

Sources, Sinks and Actions

The development of a Pipe should be limiting the binding between a source and a sink. However sometimes you may need to perform slight transformation when consuming the events. In such case you can include a set of actions that will take care of that.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: timer-to-log
spec:
  sink:
    uri: log:bar
  source:
    uri: timer:foo
  steps:
  - uri: https://gist.githubusercontent.com/squakez/48b4ebf24c2579caf6bcb3e8a59fa509/raw/c7d9db6ee5e8851f5dc6a564172d85f00d87219c/gistfile1.txt

In the example above we’re making sure to call an intermediate resource in order to fill the content with some value.

Differences with Integrations

The simples examples above may make you wonder which are the differences between a Pipe and an Integration. The Integration is meant for any generic Camel workload where you have complex business logic to perform, whereas the Pipe are more useful when you have events and you want to emit or consume such events in an connector style approach.

Most of the time you will have consumer applications (one Pipe) which are consuming events from a topic (Kafka, Kamelet or Knative) and producer applications (another Pipe) producing to a topic.

Camel K operator will allow you to use directly Kafka (Strimzi) and Knative endpoints custom resources.

Examples

Here some other examples involving Kamelets, Knative and Kafka.

Binding Kamelets

One development that emerges is the Connector development. You can consider a Kamelet as a connector endpoint, therefore binding together source and sink Kamelets to perform some logic. In this one, for instance, we’re moving data from an AWS Kinesis source to a PostgreSQL database.

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: from-kinesis-to-pgdb
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-kinesis-source
    properties:
      region: my-region
      stream: my-stream
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: postgresql-sink
    properties:
      databaseName: my-db
      password: my-pwd
      query: INSERT INTO accounts (username,city) VALUES (:#username,:#city)
      serverName: localhost
      username: my-usr

Consuming events from a Kafka topic

Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by Strimzi operator) or Knative resources:

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: beer-event-source
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: beer-source
    properties:
      period: 5000
  sink:
    ref:
      kind: KafkaTopic
      apiVersion: kafka.strimzi.io/v1beta1
      name: beer-events

Using Kamel CLI

Camel K works very well with any Kubernetes compatible user interface (such as CLI as kubectl, oc or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it’s kamel CLI.