In this article I show how to implement a data pipelining solution utilizing Dockerized Kafka Connect and the Snowflake Sink Connector to pull events out of Apache Kafka and into the Snowflake Cloud Data Warehouse. To demonstrate this I use a Snowflake trial account along with a Docker Compose enviroment composed of Confuent Community Edition Docker images for Apache Kafka, Confluent Schema Registry and Kafka Connect.
The complete code for this project can be found on my GitHub account.
1) Create a private key (use a passphrase like Develop3r) for authenticating to Snowflake.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
Assign private key's contents to shell variable named SNOWFLAKE_PRIVATE_KEY which is everything between the header line of "-----BEGIN ENCRYPTED PRIVATE KEY-----" and the footer line of "-----END ENCRYPTED PRIVATE KEY-----"
SNOWFLAKE_PRIVATE_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' rsa_key.p8`|tr -d ' ')
2) Create a public key (use a passphrase like Develop3r) for authenticating to Snowflake.
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Assign the public key's contents to shell variable named SNOWFLAKE_PUBLIC_KEY which is everything between the header line of "-----BEGIN PUBLIC KEY-----" and footer line "-----END PUBLIC KEY-----"
SNOWFLAKE_PUBLIC_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' rsa_key.pub`|tr -d ' ')
3) Create Snowflake Objects
In this step I create the following Snowflake objects to be used in the integration between the orders events stored in Kafka and ingested into Snowflake via Kafka Connect.
In this step I am creating everything from scratch but, this might not be what is desired when you want to work with existing tables and stages. In that case you would likely want to grant the role ownership to the table along with read and write to the stage.
In a new worksheet within the Snowflake console execute the following SQL statements.
USE ROLE SYSADMIN;
CREATE WAREHOUSE KAFKA_CONNECT WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE;
USE WAREHOUSE KAFKA_CONNECT;
CREATE DATABASE ORDERS_DB;
USE DATABASE ORDERS_DB;
CREATE SCHEMA ORDERS_SCHEMA;
USE ROLE SECURITYADMIN;
CREATE ROLE KAFKA_CONNECT_ORDERS_ROLE;
GRANT USAGE ON WAREHOUSE KAFKA_CONNECT TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
GRANT USAGE ON DATABASE ORDERS_DB TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
USE ROLE SYSADMIN;
USE WAREHOUSE KAFKA_CONNECT;
USE DATABASE ORDERS_DB;
GRANT USAGE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
GRANT CREATE TABLE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
GRANT CREATE STAGE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
GRANT CREATE PIPE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;
USE ROLE SECURITYADMIN;
CREATE USER ORDERS_CONNECT_USER
PASSWORD = 'Develop3r'
DEFAULT_WAREHOUSE = 'KAFKA_CONNECT'
DEFAULT_NAMESPACE = 'ORDERS_DB.ORDERS_SCHEMA'
RSA_PUBLIC_KEY = 'REPLACE WITH THE VALUE OF SNOWFLAKE_PUBLIC_KEY VARIABLE'
DEFAULT_ROLE = 'KAFKA_CONNECT_ORDERS_ROLE';
GRANT ROLE KAFKA_CONNECT_ORDERS_ROLE TO USER ORDERS_CONNECT_USER;
Use the confluentinc/cp-kafka-connect-base:6.1.4 Kafka Connect image from Confluent in a Dockerfile within a new directory named kafka-connect-integration
mkdir kafka-connect-integration
cd kafka-connect-integration
Then add the following Dockerfile which installs the snowflakeinc/snowflake-kafka-connector:1.7.0 connector via the confluent-hub CLI tool provided in the Docker image.
FROM confluentinc/cp-kafka-connect-base:6.1.4
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars"
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.7.0
Then use docker-compose.yml file to include a new service definition named kafka-connect and point it to this Dockerfile.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-connect:
build: kafka-connect-integration
container_name: kafka-connect
hostname: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
Launch the Docker Compose services.
docker-compose up
The Snowflake Sink Connector is a integration tool used to pipeline events from a Kafka cluster to the Snowflake data warehouse. So before I get too much further I'll want some Kafka events present in Kafka to pipeline into Snowflake. To accomplish this I can use the Orders Avro producer I presented in the preceeding article Kafka Clients in Java with Avro Serialization and Confluent Schema Registry . Since the Docker Compose services are already running I can simply launch the producer using gradle as so.
./gradlew run --args="producer"
This will generate fake order events in the Avro data format shown below.
{
"namespace": "com.thecodinginterface.avro.orders",
"type": "record",
"name": "OrderValue",
"fields": [
{ "name": "id", "type": "string"},
{ "name": "amount", "type": "int"},
{ "name": "created",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
}
},
{"name": "customer", "type": "string"},
{"name": "creditcard", "type": "string"}
]
}
First I verify that the SnowflakeSinkConnector is present by querying the REST API endpoint.
http http://localhost:8083/connector-plugins
Output.
HTTP/1.1 200 OK
Content-Length: 393
Content-Type: application/json
Date: Tue, 18 Jan 2022 04:04:01 GMT
Server: Jetty(9.4.43.v20210629)
[
{
"class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"type": "sink",
"version": "1.7.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
Next I create the Snowflake Sink Connector configuration file named snowflake-connector.json which is detailed here in the Snowflake docs for launching connector in distributed mode.
{
"name":"OrdersAvroConnector",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"orders-avro",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"somealphanumeric.us-east-2.aws.snowflakecomputing.com:443",
"snowflake.user.name":"ORDERS_CONNECT_USER",
"snowflake.private.key":"REPLACE WITH THE VALUE OF SNOWFLAKE_PRIVATE_KEY VARIABLE",
"snowflake.private.key.passphrase":"Develop3r",
"snowflake.database.name":"ORDERS_DB",
"snowflake.schema.name":"ORDERS_SCHEMA",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}
}
Lastly, I launch the Snowflake Sink Connector by issuing a POST request to the Kafka Connect REST API endpoint as shown below.
http POST http://localhost:8083/connectors @snowflake-connector.json
Output.
HTTP/1.1 201 Created
Content-Length: 2549
Content-Type: application/json
Date: Tue, 18 Jan 2022 04:44:38 GMT
Location: http://localhost:8083/connectors/OrdersAvroConnector
Server: Jetty(9.4.43.v20210629)
{
"config": {
"buffer.count.records": "10000",
"buffer.flush.time": "60",
"buffer.size.bytes": "5000000",
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "OrdersAvroConnector",
"snowflake.database.name": "ORDERS_DB",
"snowflake.private.key": "RETURNS WITH THE VALUE OF SNOWFLAKE_PRIVATE_KEY VARIABLE",
"snowflake.private.key.passphrase": "Develop3r",
"snowflake.schema.name": "ORDERS_SCHEMA",
"snowflake.url.name": "somealphanumeric.us-east-2.aws.snowflakecomputing.com:443",
"snowflake.user.name": "ORDERS_CONNECT_USER",
"tasks.max": "8",
"topics": "orders-avro",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
},
"name": "OrdersAvroConnector",
"tasks": [],
"type": "sink"
}
I can now login to the Snowflake instance using the snowsql CLI tools with the ORDERS_CONNECT_USER and the private key like so.
snowsql -a somealphanumeric.us-east-2.aws -u ORDERS_CONNECT_USER --private-key-path=rsa_key.p8
Then list the tables to see the name of the table that was created by the connector.
ORDERS_CONNECT_USER#KAFKA_CONNECT@ORDERS_DB.ORDERS_SCHEMA>SHOW TABLES;
+-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------+
| created_on | name | database_name | schema_name | kind | comment | cluster_by | rows | bytes | owner | retention_time | automatic_clustering | change_tracking | is_external |
|-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------|
| 2022-01-17 20:44:42.567 -0800 | ORDERS_AVRO_1814321382 | ORDERS_DB | ORDERS_SCHEMA | TABLE | | | 2243 | 203264 | KAFKA_CONNECT_ORDERS_ROLE | 1 | OFF | OFF | N |
+-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------+
Now if I query the newly created ORDERS_AVRO_1814321382 table that was generated by the connector I get the following proving that data is flowing from Kafak into Snowflake.
ORDERS_CONNECT_USER#KAFKA_CONNECT@ORDERS_DB.ORDERS_SCHEMA>select * from ORDERS_AVRO_1814321382 limit 3;
+--------------------------------------------------+------------------------------------------------+
| RECORD_METADATA | RECORD_CONTENT |
|--------------------------------------------------+------------------------------------------------|
| { | { |
| "CreateTime": 1642479864337, | "amount": 6996, |
| "key": "78b91d3a-f09b-456c-a6e3-e742d098f194", | "created": 1642458263853, |
| "offset": 0, | "creditcard": "1428", |
| "partition": 0, | "customer": "Mindi Schultz", |
| "schema_id": 1, | "id": "78b91d3a-f09b-456c-a6e3-e742d098f194" |
| "topic": "orders-avro" | } |
| } | |
| { | { |
| "CreateTime": 1642479865974, | "amount": 9855, |
| "key": "4c25e284-94fc-47f1-8cd9-994e387bfcad", | "created": 1642458265973, |
| "offset": 1, | "creditcard": "3325", |
| "partition": 0, | "customer": "Wendy Carroll Sr.", |
| "schema_id": 1, | "id": "4c25e284-94fc-47f1-8cd9-994e387bfcad" |
| "topic": "orders-avro" | } |
| } | |
| { | { |
| "CreateTime": 1642479866075, | "amount": 8856, |
| "key": "5cc3520c-f9a3-453e-a429-0914053dbc32", | "created": 1642458266075, |
| "offset": 2, | "creditcard": "4448", |
| "partition": 0, | "customer": "Kim Klocko", |
| "schema_id": 1, | "id": "5cc3520c-f9a3-453e-a429-0914053dbc32" |
| "topic": "orders-avro" | } |
| } | |
+--------------------------------------------------+------------------------------------------------+
3 Row(s) produced. Time Elapsed: 0.388s
In this article I gave a practical example of how one can use the Snowflake Sink Connector for Kafka to pipeline Avro based Kafka events into Snowflake in a Docker Compose environment.
As always, I thank you for reading and please feel free to ask questions or critique in the comments section below.