We need to use the kafka-connect-cassandra
which is published on Maven Central by Tuplejump.
It can be defined as a dependency in the build file. For example, with SBT:
libraryDependencies += "com.tuplejump" %% "kafka-connect-cassandra" % "0.0.7"
This code polls Cassandra with a specific query. Using this, data can be fetched from Cassandra in two modes:
- Bulk
- Timestamp based
The modes change automatically based on the query. For example:
Bulk:
SELECT * FROM userlog;
Timestamp based:
SELECT * FROM userlog WHERE ts > previousTime(); SELECT * FROM userlog WHERE ts = currentTime(); SELECT * FROM userlog WHERE ts >= previousTime() AND ts <= currentTime() ;
Here, previousTime()
and currentTime()
are replaced before fetching the data.
CQL Type |
Schema Type |
ASCII |
STRING |
VARCHAR |
STRING |
TEXT |
STRING |
BIGINT |
INT64 |
COUNTER |
INT64 |
BOOLEAN |
BOOLEAN |
DECIMAL |
FLOAT64 |
DOUBLE |
FLOAT64 |
FLOAT |
FLOAT32 |
TIMESTAMP |
TIMESTAMP |
Table...