Usage

The kafka configurations are based on the the kafka4m typesafe configuration. In practice the intent is to achieve similar things to what Kafka Streams gives you (but w/o relying on the kafka streams API)

Copy A Kafka Topic (i.e. Mirror Maker):

  val config = ConfigFactory.load()

  // write data to kafka (assumes a configuration akin to kafka4m.producer.topic = someNewTopic)
  val kafkaWriter: Consumer[(String, Array[Byte]), Long] = kafka4m.writeKeyAndBytes(config)

  // read data from kafka (assumes a configuration akin to kafka4m.consumer.topic = originalTopic)
  val kafkaData: Observable[ConsumerRecord[String, Array[Byte]]] = kafka4m.readRecords(config)

  // then we'd write it back into kafka like this.
  val task: Task[Long] = kafkaData.map(r => (r.key, r.value)).consumeWith(kafkaWriter)

That provides the base primitives – getting data into and out of Kafka.

Manually Acking

By default Kafka just commits paritions/offsets periodically. To give client code more control over that, kafka4m sets the default of ‘enable.auto.commit’ to false.

It does this via:

kafka4m.consumer.enable.auto.commit : false

Which you can of course change.

The idea though is that a ‘kafka4m.reader’ returns a stream of ‘AckableRecords’, which is a wrapper around the ConsumerRecord which exposes a ‘persistOffset’ function which allows you to persist the current offset partitions back to Kafka.

A very basic example would be something like this:

    // our custom data type
    case class MyData(id : String, x: Int)

    // a means to unmarshall it from a record
    def unmarshal(bytes :Array[Byte]) : MyData = ???

    // some method to persist our data
    def writeToDB(value : MyData) : Future[Boolean] = ???

    kafka4m.read().map { ackable: AckableRecord[ConsumerRecord[String, Array[Byte]]] =>
      val data = unmarshal(ackable.record.value())
      val commitFuture: Future[Map[TopicPartition, OffsetAndMetadata]] = writeToDB(data).flatMap(_ => ackable.commitPosition())
      commitFuture.onComplete(x => logger.info("Committed: " + x))
      data
    } 

ETL

On top of that, kafka4m provides some basic conveniences for getting data into Kafka from the filesystem and writing data from kafka to the filesystem.

Kafka4mApp

The ‘Kafka4mApp’ serves as the entry-point for the ETL jobs and uses the args4c library. That simply means that the first argument should be either ‘read’ or ‘write’ (as in read data from kafka or write data to kafka), and the subsequent args are either key=value pairs or the location of a configuration file.

As a docker image

Aside from being able to extend it in your project, it also works just out-of-the box, and so we’ve published a docker image to do just that:

Write some data into kafka:

echo "example" > ./dataIn/example.txt
docker run kafka4m:latest write kafka4m.etl.intoKafka.dataDir=./dataIn kafka4m.topic=foo

Write a lot of data into kafka:

docker run kafka4m:latest write \
  kafka4m.etl.intoKafka.dataDir=./dataIn \
  kafka4m.topic=foo \ 
  kafka4m.etl.intoKafka.repeat=true

Read the data out:

docker run kafka4m:latest read \
  kafka4m.etl.fromKafka.dataDir=./dataOut \
  kafka4m.topic=foo

There are a lot more configuration options (caching, limits, rate-limiting, etc) for the ETL work. Please just consume the reference.conf for options.