Try it out locally
Run Kafka
Install a locally running kafka instance by following Apache Kafka quickstart guide. This usually boils down to:
export KAFKA_HOME=<your kafka install dir>
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic mytopic
For using the quickstart we’ll use the plugin.path
property, so you’ll have to add a path for your connectors.
Open your configuration file located at $KAFKA_HOME/config/connect-standalone.properties
and add a property at the end
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connectors
At this point you’re able to run the connectors quickstart.
Next, run Camel kafka connectors source and/or sink:
You can use these Kafka utilities to listen or produce from a Kafka topic:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
Try some examples
For the following examples you need to fetch the camel-kafka-connector
project and build it locally. You can either build the entire project with ./mvnw package
from the root directory (which may take some time) or build just the connectors you are interested in with the following command from within a connector’s directory.
> cd connectors/camel-log-kafka-connector/
> mvn package -pl camel-timer-kafka-connector -am
Look into the config
and docs/examples
directories for the configuration files (*.properties
) of the examples showcased here. There is also a comprehensive set of examples with instructions on how to run them in this repository.
Simple logger (sink)
Unzip or untar the camel-log-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-log-kafka-connector/target/
a .zip file named camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-log-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelLogSinkConnector.properties
Timer (source)
Unzip or untar the camel-timer-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-timer-kafka-connector/target/
a .zip file named camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-timer-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip
This is an example of a source that produces a message every second to mytopic
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTimerSourceConnector.properties
AWS Kinesis (source)
Unzip or untar the camel-aws-kinesis-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-aws-kinesis-kafka-connector/target/
a .zip file named camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-kinesis-kafka-connector/target/camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example consumes from AWS Kinesis data stream and transfers the payload to mytopic
topic in Kafka.
Adjust properties in examples/CamelAWSKinesisSourceConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-kinesis.configuration.access-key=youraccesskey
, camel.component.aws-kinesis.configuration.secret-key=yoursecretkey
and camel.component.aws-kinesis.configuration.region=yourregion
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSKinesisSourceConnector.properties
AWS SQS (sink)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/
a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example consumes from Kafka topic mytopic
and transfers the payload to AWS SQS.
Adjust properties in examples/CamelAWSSQSSinkConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey
, camel.component.aws-sqs.configuration.secret-key=yoursecretkey
and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSinkConnector.properties
AWS SQS (source)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/
a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example consumes from AWS SQS queue mysqs
and transfers the payload to mytopic
topic in Kafka.
Adjust properties in examples/CamelAWSSQSSourceConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey
, camel.component.aws-sqs.configuration.secret-key=yoursecretkey
and camel.component.aws-sqs.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSourceConnector.properties
AWS SNS (sink)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-aws-sns-kafka-connector/target/
a .zip file named camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-sns-kafka-connector/target/camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example consumes from mytopic
Kafka topic and transfers the payload to AWS SNS topic
topic.
Adjust properties in examples/CamelAWSSNSSinkConnector.properties
for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sns.configuration.access-key=youraccesskey
, camel.component.aws-sns.configuration.secret-key=yoursecretkey
and camel.component.aws-sns.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSNSSinkConnector.properties
AWS S3 (source)
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-aws-s3-kafka-connector/target/
a .zip file named camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-aws-s3-kafka-connector/target/camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example fetches objects from AWS S3 in the camel-kafka-connector
bucket and transfers the payload to mytopic
Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka’s SchemaAndValue
.
Adjust properties in examples/CamelAWSS3SourceConnector.properties
for your environment, you need to configure access key, secret key and region by adding camel.component.aws-s3.configuration.access-key=youraccesskey
, camel.component.aws-s3.configuration.secret-key=yoursecretkey
and camel.component.aws-s3.configuration.region=yourregion
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSS3SourceConnector.properties
Apache Cassandra
Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-cql-kafka-connector/target/
a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you’ll need to run a Cassandra instance:
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
Next, check and make sure Cassandra is running:
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
To populate the database using to the cqlsh
tool, you’ll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with LOCAL_CASSANDRA_HOME
. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
Next, execute the following script to create keyspace test
, the table users
and insert one row into it.
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
In the configuration .properties
file we use below the IP address of the Cassandra master node needs to be configured, replace the value 172.17.0.2
in the camel.source.url
or localhost
in camel.sink.url
configuration property with the IP of the master node obtained from Docker. Each example uses a different .properties
file shown in the command line to run the example.
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
Apache Cassandra (source)
This example polls Cassandra via CSQL (select * from users
) in the test
keyspace and transfers the result to the mytopic
Kafka topic.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSourceConnector.properties
Apache Cassandra (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-cql-kafka-connector/target/
a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example adds data to the users
table in Cassandra from the data consumed from the mytopic
Kafka topic. Notice how the name
column is populated from the Kafka message using CQL command insert into users…
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSinkConnector.properties
Elasticsearch (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-elasticsearch-rest-kafka-connector/target/
a .zip file named camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-elasticsearch-rest-kafka-connector/target/camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example passes data from mytopic
Kafka topic to sampleIndexName
index in Elasticsearch. Adjust properties in docs/examples/CamelElasticSearchSinkConnector.properties
to reflect your environment, for example change the hostAddresses
to a valid Elasticsearch instance hostname and port.
For the index operation, it might be necessary to provide or implement a transformer
. A sample configuration would be similar to the one below:
transforms=ElasticSearchTransformer
This is the sample Transformer used in the integration test code that transforms Kafka’s ConnectRecord to a Map:
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.elasticsearch.sink.transforms.ConnectRecordValueToMapTransformer
This is a configuration for the sample transformer that defines the key used in the map:
transforms.ElasticSearchTransformer.key=MyKey
When the configuration is ready run the sink with:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelElasticSearchSinkConnector.properties
File (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-file-kafka-connector/target/
a .zip file named camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-file-kafka-connector/target/camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example appends data from mytopic
Kafka topic to a file in /tmp/kafkaconnect.txt
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelFileSinkConnector.properties
HTTP (sink)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-http-kafka-connector/target/
a .zip file named camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-http-kafka-connector/target/camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example sends data from mytopic
Kafka topic to a HTTP service. Adjust properties in docs/examples/CamelHttpSinkConnector.properties
for your environment, for example configuring the camel.sink.url
.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelHttpSinkConnector.properties
JMS (source)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-sjms2-kafka-connector/target/
a .zip file named camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-sjsm2-kafka-connector/target/camel-sjms2-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip
These are the basic connectors. For camel-sjms2 we have a bunch of provided dependencies we need to add in our path, so run the following commands (note that this is not needed from 0.7.0 onward for ActiveMQ and Artemis JMS clients, as their dependecies are packaged along with the SJMS2 connector):
> cd /home/connectors/camel-sjms2-kafka-connector
> wget https://repo1.maven.org/maven2/org/apache/activemq/activemq-client/5.15.11/activemq-client-5.15.11.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-jms_2.0_spec/1.0-alpha-2/geronimo-jms_2.0_spec-1.0-alpha-2.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-annotation_1.0_spec/1.1.1/geronimo-annotation_1.0_spec-1.1.1.jar
> wget https://repo1.maven.org/maven2/javax/management/j2ee/management-api/1.1-rev-1/management-api-1.1-rev-1.jar
> wget https://repo1.maven.org/maven2/org/fusesource/hawtbuf/hawtbuf/1.11/hawtbuf-1.11.jar
This example receives messages from a JMS queue named myqueue
and transfers them to mytopic
Kafka topic. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616
. Adjust properties in examples/CamelJmsSourceConnector.properties
for your environment, for example configuring username and password by setting camel.component.sjms2.connection-factory.userName=yourusername
and camel.component.sjms2.connection-factory.password=yourpassword
or change the camel.component.sjms2.connection-factory
and camel.component.sjms2.connection-factory.brokerURL
to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSourceConnector.properties
JMS (sink)
This example receives messages from mytopic
Kafka topic and transfers them to JMS queue named myqueue
. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616
. You can adjust properties in examples/CamelJmsSinkConnector.properties
for your environment, for example configure username and password by adding camel.component.sjms2.connection-factory.userName=yourusername
and camel.component.sjms2.connection-factory.password=yourpassword
or change the camel.component.sjms2.connection-factory
and camel.component.sjms2.connection-factory.brokerURL
to reflect your JMS implementation and URL.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSinkConnector.properties
Telegram (source)
First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path
location. After building the project you should have in connectors/camel-telegram-kafka-connector/target/
a .zip file named camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip
> cd /home/connectors/
> cp connectors/camel-telegram-kafka-connector/target/camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip
This example transfers messages sent to Telegram bot to the mytopic
Kafka topic. Adjust to set telegram bot token in examples/CamelTelegramSourceConnector.properties
to reflect your bot’s token.
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTelegramSourceConnector.properties