SQL

Since Camel 1.4

Both producer and consumer are supported

The SQL component allows you to work with databases using JDBC queries. The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.

This component uses spring-jdbc behind the scenes for the actual SQL handling.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

The SQL component also supports:

  • a JDBC based repository for the Idempotent Consumer EIP pattern. See further below.

  • a JDBC based repository for the Aggregator EIP pattern. See further below.

URI format

This component can be used as a Transactional Client.

The SQL component uses the following endpoint URI notation:

sql:select * from table where id=# order by name[?options]

You can use named parameters by using :`#name_of_the_parameter` style as shown:

sql:select * from table where id=:#myId order by name[?options]

When using named parameters, Camel will lookup the names from, in the given precedence:

  1. from message body if its a java.util.Map

  2. from message headers

If a named parameter cannot be resolved, then an exception is thrown.

You can use Simple expressions as parameters as shown:

sql:select * from table where id=:#${exchangeProperty.myId} order by name[?options]

Notice that the standard ? symbol that denotes the parameters to an SQL query is substituted with the # symbol, because the ? symbol is used to specify options for the endpoint. The ? symbol replacement can be configured on endpoint basis.

You can externalize your SQL queries to files in the classpath or file system as shown:

sql:classpath:sql/myquery.sql[?options]

And the myquery.sql file is in the classpath and is just a plain text

-- this is a comment
select *
from table
where
  id = :#${exchangeProperty.myId}
order by
  name

In the file you can use multilines and format the SQL as you wish. And also use comments such as the – dash line.

Configuring Options

Camel components are configured on two separate levels:

  • component level

  • endpoint level

Configuring Component Options

The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.

Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.

Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.

Configuring Endpoint Options

Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.

Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.

A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.

The following two sections lists all the options, firstly for the component followed by the endpoint.

Component Options

The SQL component supports 6 options, which are listed below.

Name Description Default Type

dataSource (common)

Autowired Sets the DataSource to use to communicate with the database.

DataSource

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

autowiredEnabled (advanced)

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

boolean

rowMapperFactory (advanced)

Autowired Factory for creating RowMapper.

RowMapperFactory

usePlaceholder (advanced)

Sets whether to use placeholder and replace all placeholder characters with sign in the SQL queries. This option is default true.

true

boolean

Endpoint Options

The SQL endpoint is configured using URI syntax:

sql:query

with the following path and query parameters:

Path Parameters (1 parameters)

Name Description Default Type

query (common)

Required Sets the SQL query to perform. You can externalize the query by using file: or classpath: as prefix and specify the location of the file.

String

Query Parameters (46 parameters)

Name Description Default Type

allowNamedParameters (common)

Whether to allow using named parameters in the queries.

true

boolean

dataSource (common)

Autowired Sets the DataSource to use to communicate with the database at endpoint level.

DataSource

outputClass (common)

Specify the full package and class name to use as conversion when outputType=SelectOne.

String

outputHeader (common)

Store the query result in a header instead of the message body. By default, outputHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If outputHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved.

String

outputType (common)

Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way: a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( ) FROM PROJECT will return a Long object. b) If the query has more than one column, then it will return a Map of that result. c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names. It will assume your class has a default constructor to create instance with. d) If the query resulted in more than one rows, it throws an non-unique result exception. StreamList streams the result of the query using an Iterator. This can be used with the Splitter EIP in streaming mode to process the ResultSet in streaming fashion.

Enum values:

  • SelectOne

  • SelectList

  • StreamList

SelectList

SqlOutputType

separator (common)

The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders. Notice if you use named parameters, then a Map type is used instead. The default value is comma.

,

char

breakBatchOnConsumeFail (consumer)

Sets whether to break batch if onConsume failed.

false

boolean

expectedUpdateCount (consumer)

Sets an expected update count to validate when using onConsume.

-1

int

maxMessagesPerPoll (consumer)

Sets the maximum number of messages to poll.

int

onConsume (consumer)

After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.

String

onConsumeBatchComplete (consumer)

After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.

String

onConsumeFailed (consumer)

After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.

String

routeEmptyResultSet (consumer)

Sets whether empty resultset should be allowed to be sent to the next hop. Defaults to false. So the empty resultset will be filtered out.

false

boolean

sendEmptyMessageWhenIdle (consumer)

If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.

false

boolean

transacted (consumer)

Enables or disables transaction. If enabled then if processing an exchange failed then the consumer breaks out processing any further exchanges to cause a rollback eager.

false

boolean

useIterator (consumer)

Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.

true

boolean

bridgeErrorHandler (consumer (advanced))

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

exceptionHandler (consumer (advanced))

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

ExceptionHandler

exchangePattern (consumer (advanced))

Sets the exchange pattern when the consumer creates an exchange.

Enum values:

  • InOnly

  • InOut

  • InOptionalOut

ExchangePattern

pollStrategy (consumer (advanced))

A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.

PollingConsumerPollStrategy

processingStrategy (consumer (advanced))

Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.

SqlProcessingStrategy

batch (producer)

Enables or disables batch mode.

false

boolean

noop (producer)

If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing.

false

boolean

useMessageBodyForSql (producer)

Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used. Note that query parameters in the message body are represented by a question mark instead of a # symbol.

false

boolean

lazyStartProducer (producer (advanced))

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

alwaysPopulateStatement (advanced)

If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.

false

boolean

parametersCount (advanced)

If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.

int

placeholder (advanced)

Specifies a character that will be replaced to in SQL query. Notice, that it is simple String.replaceAll() operation and no SQL parsing is involved (quoted strings will also change).

#

String

prepareStatementStrategy (advanced)

Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.

SqlPrepareStatementStrategy

rowMapperFactory (advanced)

Factory for creating RowMapper.

RowMapperFactory

templateOptions (advanced)

Configures the Spring JdbcTemplate with the key/values from the Map.

Map

usePlaceholder (advanced)

Sets whether to use placeholder and replace all placeholder characters with sign in the SQL queries.

true

boolean

backoffErrorThreshold (scheduler)

The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.

int

backoffIdleThreshold (scheduler)

The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.

int

backoffMultiplier (scheduler)

To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.

int

delay (scheduler)

Milliseconds before the next poll.

500

long

greedy (scheduler)

If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.

false

boolean

initialDelay (scheduler)

Milliseconds before the first poll starts.

1000

long

repeatCount (scheduler)

Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever.

0

long

runLoggingLevel (scheduler)

The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.

Enum values:

  • TRACE

  • DEBUG

  • INFO

  • WARN

  • ERROR

  • OFF

TRACE

LoggingLevel

scheduledExecutorService (scheduler)

Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.

ScheduledExecutorService

scheduler (scheduler)

To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler.

none

Object

schedulerProperties (scheduler)

To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler.

Map

startScheduler (scheduler)

Whether the scheduler should be auto started.

true

boolean

timeUnit (scheduler)

Time unit for initialDelay and delay options.

Enum values:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

MILLISECONDS

TimeUnit

useFixedDelay (scheduler)

Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.

true

boolean

Message Headers

The SQL component supports 8 message header(s), which is/are listed below:

Name Description Default Type

CamelSqlQuery (producer)

Constant: SQL_QUERY

Query to execute. This query takes precedence over the query specified in the endpoint URI. Note that query parameters in the header are represented by a instead of a pass:# symbol.

String

CamelSqlUpdateCount (producer)

Constant: SQL_UPDATE_COUNT

The number of rows updated for update operations, returned as an Integer object. This header is not provided when using outputType=StreamList.

Integer

CamelSqlRowCount (producer)

Constant: SQL_ROW_COUNT

The number of rows returned for select operations, returned as an Integer object. This header is not provided when using outputType=StreamList.

Integer

CamelSqlRetrieveGeneratedKeys (producer)

Constant: SQL_RETRIEVE_GENERATED_KEYS

Set its value to true to retrieve generated keys.

false

Boolean

CamelSqlGeneratedColumns (producer)

Constant: SQL_GENERATED_COLUMNS

Set it to specify the expected generated columns.

String[] or int[]

CamelSqlGeneratedKeysRowCount (producer)

Constant: SQL_GENERATED_KEYS_ROW_COUNT

The number of rows in the header that contains generated keys.

Integer

CamelSqlGeneratedKeyRows (producer)

Constant: SQL_GENERATED_KEYS_DATA

Rows that contains the generated keys (a list of maps of keys).

List

CamelSqlParameters (producer)

Constant: SQL_PARAMETERS

The SQL parameters when using the option useMessageBodyForSql.

Iterator

Treatment of the message body

The SQL component tries to convert the message body to an object of java.util.Iterator type and then uses this iterator to fill the query parameters (where each query parameter is represented by a # symbol (or configured placeholder) in the endpoint URI). If the message body is not an array or collection, the conversion results in an iterator that iterates over only one object, which is the body itself.

For example, if the message body is an instance of java.util.List, the first item in the list is substituted into the first occurrence of # in the SQL query, the second item in the list is substituted into the second occurrence of #, and so on.

If batch is set to true, then the interpretation of the inbound message body changes slightly – instead of an iterator of parameters, the component expects an iterator that contains the parameter iterators; the size of the outer iterator determines the batch size.

You can use the option useMessageBodyForSql that allows to use the message body as the SQL statement, and then the SQL parameters must be provided in a header with the key SqlConstants.SQL_PARAMETERS. This allows the SQL component to work more dynamically as the SQL query is from the message body. Use templating (such as Velocity, Freemarker) for conditional processing, e.g. to include or exclude where clauses depending on the presence of query parameters.

Result of the query

For select operations, the result is an instance of List<Map<String, Object>> type, as returned by the JdbcTemplate.queryForList() method. For update operations, a NULL body is returned as the update operation is only set as a header and never as a body.

By default, the result is placed in the message body. If the outputHeader parameter is set, the result is placed in the header. This is an alternative to using a full message enrichment pattern to add headers, it provides a concise syntax for querying a sequence or some other small value into a header. It is convenient to use outputHeader and outputType together:

from("jms:order.inbox")
    .to("sql:select order_seq.nextval from dual?outputHeader=OrderId&outputType=SelectOne")
    .to("jms:order.booking");

Using StreamList

The producer supports outputType=StreamList that uses an iterator to stream the output of the query. This allows to process the data in a streaming fashion which for example can be used by the Splitter EIP to process each row one at a time, and load data from the database as needed.

from("direct:withSplitModel")
        .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel")
        .to("log:stream")
        .split(body()).streaming()
            .to("log:row")
            .to("mock:result")
        .end();

Generated keys

Since Camel 2.12.4, 2.13.1 and 2.14

If you insert data using SQL INSERT, then the RDBMS may support auto generated keys. You can instruct the SQL producer to return the generated keys in headers.
To do that set the header CamelSqlRetrieveGeneratedKeys=true. Then the generated keys will be provided as headers with the keys listed in the table above.

To specify which generated columns should be retrieved, set the header CamelSqlGeneratedColumns to a String[] or int[], indicating the column names or indexes, respectively. Some databases requires this, such as Oracle. It may also be necessary to use the parametersCount option if the driver cannot correctly determine the number of parameters.

You can see more details in this unit test.

DataSource

You can set a reference to a DataSource in the URI directly:

sql:select * from table where id=# order by name?dataSource=#myDS

Using named parameters

Since Camel 2.11

In the given route below, we want to get all the projects from the projects table. Notice the SQL query has 2 named parameters, :#lic and :#min.
Camel will then lookup for these parameters from the message body or message headers. Notice in the example above we set two headers with constant value
for the named parameters:

   from("direct:projects")
     .setHeader("lic", constant("ASF"))
     .setHeader("min", constant(123))
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")

Though if the message body is a java.util.Map then the named parameters will be taken from the body.

   from("direct:projects")
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")

Using expression parameters in producers

Since Camel 2.14

In the given route below, we want to get all the project from the database. It uses the body of the exchange for defining the license and uses the value of a property as the second parameter.

from("direct:projects")
  .setBody(constant("ASF"))
  .setProperty("min", constant(123))
  .to("sql:select * from projects where license = :#${body} and id > :#${exchangeProperty.min} order by id")

Using expression parameters in consumers

When using the SQL component as consumer, you can now also use expression parameters (simple language) to build dynamic query parameters, such as calling a method on a bean to retrieve an id, date or something.

For example in the sample below we call the nextId method on the bean myIdGenerator:

from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}")
    .to("mock:result");

And the bean has the following method:

public static class MyIdGenerator {

    private int id = 1;

    public int nextId() {
        return id++;
    }

Notice that there is no existing Exchange with message body and headers, so the simple expression you can use in the consumer are most useable for calling bean methods as in this example.

Using IN queries with dynamic values

The SQL producer allows to use SQL queries with IN statements where the IN values is dynamic computed. For example from the message body or a header etc.

To use IN you need to:

  • prefix the parameter name with in:

  • add ( ) around the parameter

An example explains this better. The following query is used:

-- this is a comment
select *
from projects
where project in (:#in:names)
order by id

In the following route:

from("direct:query")
    .to("sql:classpath:sql/selectProjectsIn.sql")
    .to("log:query")
    .to("mock:query");

Then the IN query can use a header with the key names with the dynamic values such as:

// use an array
template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"});

// use a list
List<String> names = new ArrayList<String>();
names.add("Camel");
names.add("AMQ");

template.requestBodyAndHeader("direct:query", "Hi there!", "names", names);

// use a string separated values with comma
template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ");

The query can also be specified in the endpoint instead of being externalized (notice that externalizing makes maintaining the SQL queries easier)

from("direct:query")
    .to("sql:select * from projects where project in (:#in:names) order by id")
    .to("log:query")
    .to("mock:query");

Using the JDBC based idempotent repository

In this section we will use the JDBC based idempotent repository.

Abstract class

There is an abstract class org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository you can extend to build custom JDBC idempotent repository.

First we have to create the database table which will be used by the idempotent repository. We use the following schema:

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId) )
The SQL Server TIMESTAMP type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, or TIMESTAMP.

The above SQL is consistent with most popular SQL vendors.

When working with concurrent consumers it is crucial to create a unique constraint on the column combination of processorName and messageId. This constraint will be prevent multiple consumers adding the same key to the repository and allow only one consumer to handle the message.

The SQL above includes the constraint by creating a primary key. If you prefer to use a different constraint, or your SQL server uses a different syntax for table creation, you can create the table yourself using the above schema as a starting point.

Customize the JDBC idempotency repository

You have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository for your needs:

Parameter Default Value Description

createTableIfNotExists

true

Defines whether or not Camel should try to create the table if it doesn’t exist.

tableName

CAMEL_MESSAGEPROCESSED

To use a custom table name instead of the default name: CAMEL_MESSAGEPROCESSED.

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn’t exist.

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)

The statement which is used to create the table.

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name (String) and the second one is the message id (String).

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (String), the second one is the message id (String) and the third one is the timestamp (java.sql.Timestamp) when this entry was added to the repository.

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name (String) and the second one is the message id (String).

The option tableName can be used to use the default SQL queries but with a different table name. However if you want to customize the SQL queries then you can configure each of them individually.

Orphan Lock aware Jdbc IdempotentRepository

One of the limitations of org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository. This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist :

  • lock entry does not exist then the lock is provided using the base implementation of JdbcMessageIdRepository.

  • lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock

  • lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism

This repository has two additional configuration parameters

Parameter

Description

lockMaxAgeMillis

This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned.

lockKeepAliveIntervalMillis

The frequency at which keep alive updates are done to createdAt Timestamp column.

Caching Jdbc IdempotentRepository

Some SQL implementations are not fast on a per query basis. The JdbcMessageIdRepository implementation does its idempotent checks individually within SQL transactions. Checking a mere 100 keys can take minutes. The JdbcCachedMessageIdRepository preloads an in-memory cache on start with the entire list of keys. This cache is then checked first before passing through to the original implementation.

As with all cache implementations, there are considerations that should be made with regard to stale data and your specific usage.

Using the JDBC based aggregation repository

JdbcAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not lose messages, as the default aggregator will use an in memory only AggregationRepository. The JdbcAggregationRepository allows together with Camel to provide persistent support for the Aggregator.

Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success.

You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit.

You can see some examples in the unit tests of camel-sql, for example JdbcAggregateRecoverDeadLetterChannelTest.java

Database

To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the RepositoryName property. In the following example aggregation will be used.

The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the id field does not have the same content depending on the table.
In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.

Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name.

CREATE TABLE aggregation (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 constraint aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE aggregation_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 constraint aggregation_completed_pk PRIMARY KEY (id)
);

Storing body and headers as text

You can configure the JdbcAggregationRepository to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName and accountName use the following SQL:

CREATE TABLE aggregationRepo3 (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 body varchar(1000),
 companyName varchar(1000),
 accountName varchar(1000),
 constraint aggregationRepo3_pk PRIMARY KEY (id)
);
CREATE TABLE aggregationRepo3_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 version BIGINT NOT NULL,
 body varchar(1000),
 companyName varchar(1000),
 accountName varchar(1000),
 constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);

And then configure the repository to enable this behavior as shown below:

<bean id="repo3"
  class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="repositoryName" value="aggregationRepo3"/>
  <property name="transactionManager" ref="txManager3"/>
  <property name="dataSource" ref="dataSource3"/>
  <!-- configure to store the message body and following headers as text in the repo -->
  <property name="storeBodyAsText" value="true"/>
  <property name="headersToStoreAsText">
    <list>
      <value>companyName</value>
      <value>accountName</value>
    </list>
  </property>
</bean>

Codec (Serialization)

Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream.

The ClassLoadingAwareObjectInputStream has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.

While deserializing it’s important to notice that the decode function and the unmarshallExchange method will allow only all java packages and subpackages and org.apache.camel packages and subpackages. The remaining classes will be blacklisted. So you’ll need to change the filter in case of need. This could be accomplished by changing the deserializationFilter field on the repository.

Transaction

A Spring PlatformTransactionManager is required to orchestrate transaction.

Service (Start/Stop)

The start method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.

Aggregator configuration

Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property.

Here is the declaration for Oracle:

<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
  <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
</bean>
<bean id="nativeJdbcExtractor"
  class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
  class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="repositoryName" value="aggregation"/>
  <property name="dataSource" ref="dataSource"/>
  <!-- Only with Oracle, else use default -->
  <property name="lobHandler" ref="lobHandler"/>
</bean>

Optimistic locking

You can turn on optimisticLocking and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistic locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper which works as follows:

The following check is done:

  • If the caused exception is an SQLException then the SQLState is checked if starts with 23.

  • If the caused exception is a DataIntegrityViolationException

  • If the caused exception class name has "ConstraintViolation" in its name.

  • Optional checking for FQN class name matches if any class names has been configured.

You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistic locking error.

Here is an example, where we define 2 extra FQN class names from the JDBC vendor.

<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="repositoryName" value="aggregation"/>
  <property name="dataSource" ref="dataSource"/>
  <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean>
<!-- use the default mapper with extraFQN class names from our JDBC driver -->
<bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
  <property name="classNames">
    <util:set>
      <value>com.foo.sql.MyViolationExceptoion</value>
      <value>com.foo.sql.MyOtherViolationExceptoion</value>
    </util:set>
  </property>
</bean>

Propagation behavior

JdbcAggregationRepository uses two distinct transaction templates from Spring-TX. One is read-only and one is used for read-write operations.

However, when using JdbcAggregationRepository within a route that itself uses <transacted /> and there’s common PlatformTransactionManager used, there may be a need to configure propagation behavior used by transaction templates inside JdbcAggregationRepository.

Here’s a way to do it:

<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
  <property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
</bean>

Propagation is specified by constants of org.springframework.transaction.TransactionDefinition interface, so propagationBehaviorName is convenient setter that allows to use names of the constants.

Clustering

JdbcAggregationRepository does not provide recovery in a clustered environment.

You may use ClusteredJdbcAggregationRepository that provides a limited support for recovery in a clustered environment : recovery mechanism is dealt separately by members of the cluster, i.e. a member may only recover exchanges that it completed itself.

To enable this behaviour, property recoverByInstance must be set to true, and instanceId property must be defined using a unique identifier (a string) for each member of the cluster.

Besides, completed table must have a instance_id VARCHAR(255) column.

N.B. : since each member is the only responsible for the recovery of its completed exchanges, if a member is stopped, its completed exchanges will not be recovered until it is restarted, unless you update completed table to affect them to another member (by changing instance_id for those completed exchanges).

PostgreSQL case

There’s special database that may cause problems with optimistic locking used by JdbcAggregationRepository. PostgreSQL marks connection as invalid in case of data integrity violation exception (the one with SQLState 23505). This makes the connection effectively unusable within nested transaction. Details can be found in this document.

org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository extends JdbcAggregationRepository and uses special INSERT .. ON CONFLICT .. statement to provide optimistic locking behavior.

This statement is (with default aggregation table definition):

INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING

Details can be found in PostgreSQL documentation.

When this clause is used, java.sql.PreparedStatement.executeUpdate() call returns 0 instead of throwing SQLException with SQLState=23505. Further handling is exactly the same as with generic JdbcAggregationRepository, but without marking PostgreSQL connection as invalid.

Camel Sql Starter

A starter module is available to spring-boot users. When using the starter, the DataSource can be directly configured using spring-boot properties.

# Example for a mysql datasource
spring.datasource.url=jdbc:mysql://localhost/test
spring.datasource.username=dbuser
spring.datasource.password=dbpass
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

To use this feature, add the following dependencies to your spring boot pom.xml file:

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-sql-starter</artifactId>
    <version>${camel.version}</version> <!-- use the same version as your Camel core version -->
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>${spring-boot-version}</version>
</dependency>

You should also include the specific database driver, if needed.

Spring Boot Auto-Configuration

When using sql with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-sql-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

The component supports 9 options, which are listed below.

Name Description Default Type

camel.component.sql-stored.autowired-enabled

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

Boolean

camel.component.sql-stored.enabled

Whether to enable auto configuration of the sql-stored component. This is enabled by default.

Boolean

camel.component.sql-stored.lazy-start-producer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

Boolean

camel.component.sql.autowired-enabled

Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc.

true

Boolean

camel.component.sql.bridge-error-handler

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

Boolean

camel.component.sql.enabled

Whether to enable auto configuration of the sql component. This is enabled by default.

Boolean

camel.component.sql.lazy-start-producer

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

Boolean

camel.component.sql.row-mapper-factory

Factory for creating RowMapper. The option is a org.apache.camel.component.sql.RowMapperFactory type.

RowMapperFactory

camel.component.sql.use-placeholder

Sets whether to use placeholder and replace all placeholder characters with sign in the SQL queries. This option is default true.

true

Boolean