Zeebe

Since Camel 3.21

Both producer and consumer are supported

The Zeebe: components provides the ability to interact with business processes in Zeebe.

In order to use the Zeebe component, Maven users will need to add the following dependency to their pom.xml:

Prerequisites

You must have access to a local zeebe instance. More information is available at Camunda Zeebe.

URI format

zeebe://[endpoint]?[options]

Configuring Options

Camel components are configured on two separate levels:

  • component level

  • endpoint level

Configuring Component Options

The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.

Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.

Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.

Configuring Endpoint Options

Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.

Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.

A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.

The following two sections lists all the options, firstly for the component followed by the endpoint.

Component Options

The Zeebe component supports 8 options, which are listed below.

Name Description Default Type

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

autowiredEnabled (advanced)

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

boolean

clientId (security)

Client id to be used when requesting access token from OAuth authorization server.

String

clientSecret (security)

Client secret to be used when requesting access token from OAuth authorization server.

String

gatewayHost (security)

The gateway server hostname to connect to the Zeebe cluster.

localhost

String

gatewayPort (security)

The gateway server port to connect to the Zeebe cluster.

26500

int

oAuthAPI (security)

The authorization server’s URL, from which the access token will be requested.

String

Endpoint Options

The Zeebe endpoint is configured using URI syntax:

zeebe:operationName

with the following path and query parameters:

Path Parameters (1 parameters)

Name Description Default Type

operationName (common)

Required The operation to use.

Enum values:

  • startProcess

  • cancelProcess

  • publishMessage

  • completeJob

  • failJob

  • updateJobRetries

  • worker

  • throwError

  • deployResource

OperationName

Query Parameters (7 parameters)

Name Description Default Type

formatJSON (common)

Format the result in the body as JSON.

false

boolean

jobKey (consumer)

JobKey for the job worker.

String

timeout (consumer)

Timeout for job worker.

10

int

bridgeErrorHandler (consumer (advanced))

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

exceptionHandler (consumer (advanced))

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

ExceptionHandler

exchangePattern (consumer (advanced))

Sets the exchange pattern when the consumer creates an exchange.

Enum values:

  • InOnly

  • InOut

  • InOptionalOut

ExchangePattern

lazyStartProducer (producer (advanced))

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

Producer Endpoints:

Endpoint Description

startProcess

Creates and starts an instance of the specified process.

cancelProcess

Cancels a running process instance.

publishMessage

Publishes a message.

completeJob

Completes a job for a service task.

failJob

Fails a job.

updateJobRetries

Updates the number of retries for a job.

throwError

Throw an error to indicate that a business error has occurred.

deployResource

Deploy a process resource. Currently only supports process definitions.

The endpoints accept either Java request objects as shown in the examples below or JSON. In JSON camel case for property names is replaced with all lower case seperated by underscores, e.g. processId becomes process_id.

Examples

  • startProcess

    from("direct:start")
        .process(exchange -> {
            ProcessRequest request = new ProcessRequest();
            request.setProcessId("processId");
            request.setVariables(new HashMap<String,Object> ());
            exchange.getIn().setBody(request);
        })
        .to("zeebe://startProcess")
        .process(exchange -> {
            ProcessResponse body = exchange.getIn().getBody(ProcessResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
                long processInstanceKey = body.getProcessInstanceKey();
            }
        });

JSON Request Example

    {
        "process_id" : "Process_0e3ldfm",
        "variables" : { "v1": "a", "v2": 10 }
    }

JSON Response Example

    {
        "success": true,
        "process_id": "Process_0e3ldfm",
        "process_instance_key": 2251799813688297,
        "process_version": 4,
        "process_key": 2251799813685906
    }
  • cancelProcess

    from("direct:start")
        .process(exchange -> {
            ProcessRequest request = new ProcessRequest();
            request.setProcessInstanceKey(123);
            exchange.getIn().setBody(request);
        })
        .to("zeebe://cancelProcess")
        .process(exchange -> {
            ProcessResponse body = exchange.getIn().getBody(ProcessResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
            }
        });
  • publishMessage

    from("direct:start")
        .process(exchange -> {
            MessageRequest request = new MessageRequest();
            request.setCorrelationKey("messageKey");
            request.setTimeToLive(100);
            request.setVariables(new HashMap<String,Object>());
            request.setName("MessageName");
            exchange.getIn().setBody(request);
        })
        .to("zeebe://publishMessage")
        .process(exchange -> {
            MessageResponse body = exchange.getIn().getBody(MessageResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
                String messageKey = body.getMessageKey();
            }
        });

JSON Request Example

    {
        "correlation_key" : "messageKey",
        "time-to-live" : 100,
        "variables" : { "v1": "a", "v2": 10 },
        "name" : "MessageName"
    }

JSON Response Example

    {
        "success": true,
        "correlation_key": "messageKey",
        "message_key": 2251799813688336
    }
  • completeJob

    from("direct:start")
        .process(exchange -> {
            JobRequest request = new JobRequest();
            request.setJobKey("jobKey");
            request.setVariables(new HashMap<String,Object>());
            exchange.getIn().setBody(request);
        })
        .to("zeebe://completeJob")
        .process(exchange -> {
            JobResponse body = exchange.getIn().getBody(JobResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
            }
        });
  • failJob

    from("direct:start")
        .process(exchange -> {
            JobRequest request = new JobRequest();
            request.setJobKey("jobKey");
            request.setRetries(3);
            request.setErrorMessage("Error");
            exchange.getIn().setBody(request);
        })
        .to("zeebe://failJob")
        .process(exchange -> {
            JobResponse body = exchange.getIn().getBody(JobResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
            }
        });
  • updateJobRetries

    from("direct:start")
        .process(exchange -> {
            JobRequest request = new JobRequest();
            request.setJobKey("jobKey");
            request.setRetries(3);
            exchange.getIn().setBody(request);
        })
        .to("zeebe://updateJobRetries")
        .process(exchange -> {
            JobResponse body = exchange.getIn().getBody(JobResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
            }
        });
  • throwError

    from("direct:start")
        .process(exchange -> {
            JobRequest request = new JobRequest();
            request.setJobKey("jobKey");
            request.setErrorMessage("Error Message");
            request.setErrorCode("Error Code")
            exchange.getIn().setBody(request);
        })
        .to("zeebe://throwError")
        .process(exchange -> {
            JobResponse body = exchange.getIn().getBody(JobResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
            }
        });
  • deployResource

    from("direct:start")
        .process(exchange -> {
            DeploymentRequest request = new DeploymentRequest();
            request.setName("process.bpmn");
            request.setContent(content.getBytes());
            exchange.getIn().setBody(request);
        })
        .to("zeebe://deployResource")
        .process(exchange -> {
            JobWorkerMessage body = exchange.getIn().getBody(JobWorkerMessage.class);
            if (body != null) {
                long key = body.getKey();
                String type = body.getType();
                Map<String,String> customHeaders = body.getCustomHeaders();
                long processInstanceKey = body.getProcessInstanceKey();
                String bpmnProcessId = body.getBpmnProcessId();
                int processDefinitionVersion = body.getProcessDefinitionVersion();
                long processDefinitionKey = body.getProcessDefinitionKey();
                String elementId = body.getElementId();
                long elementInstanceKey = body.getElementInstanceKey();
                String worker = body.getWorker();
                int retries = body.getRetries();
                long deadline = body.getDeadline();
                Map<String,Object> variables = body.getVariables();
            }
        });

Consumer Endpoints:

Endpoint Description

worker

Registers a job worker for a job type and provides messages for available jobs.

Example

    from("zeebe://jobWorker?jobKey=job1&timeout=20")
        .process(exchange -> {
            ProcessDeploymentResponse body = exchange.getIn().getBody(ProcessDeploymentResponse.class);
            if (body != null) {
                bool success = body.getSuccess();
                String bpmnProcessId = body.getBpmnProcessId();
                int version = body.getVersion();
                long processDefinitionKey = body.getProcessDefinitionKey();
                String resourceName = body.getResourceName();
            }
        })

camel-zeebe creates a route exchange per job type with a job in the body.

Message Headers

The Zeebe component supports 7 message header(s), which is/are listed below:

Name Description Default Type

CamelZeebeResourceName (producer)

Constant: RESOURCE_NAME

The name of the resource.

CamelZeebeIsSuccess (producer)

Constant: IS_SUCCESS

True if the operation was successful.

CamelZeebeErrorMessage (producer)

Constant: ERROR_MESSAGE

In case of an error, the error message.

CamelZeebeErrorCode (producer)

Constant: ERROR_CODE

In case of an error, the error code if available.

CamelZeebeBPMNProcessId (producer)

Constant: BPMN_PROCESS_ID

The process ID of a deployed process.

CamelZeebeVersion (producer)

Constant: VERSION

The version of a deployed process.

CamelZeebeProcessDefinitionKey (producer)

Constant: PROCESS_DEFINITION_KEY

The process definition key of a deployed process.

Dependencies

Maven users will need to add the following dependency to their pom.xml.

pom.xml

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-zeebe</artifactId>
    <version>${camel-version}</version>
</dependency>

where ${camel-version} must be replaced by the actual version of Camel.

Spring Boot Auto-Configuration

When using zeebe with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-zeebe-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

The component supports 9 options, which are listed below.

Name Description Default Type

camel.component.zeebe.autowired-enabled

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

Boolean

camel.component.zeebe.bridge-error-handler

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

Boolean

camel.component.zeebe.client-id

Client id to be used when requesting access token from OAuth authorization server.

String

camel.component.zeebe.client-secret

Client secret to be used when requesting access token from OAuth authorization server.

String

camel.component.zeebe.enabled

Whether to enable auto configuration of the zeebe component. This is enabled by default.

Boolean

camel.component.zeebe.gateway-host

The gateway server hostname to connect to the Zeebe cluster.

localhost

String

camel.component.zeebe.gateway-port

The gateway server port to connect to the Zeebe cluster.

26500

Integer

camel.component.zeebe.lazy-start-producer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

Boolean

camel.component.zeebe.o-auth-a-p-i

The authorization server’s URL, from which the access token will be requested.

String