Packages

p

kafka4m

package kafka4m

The high-level API space for kafka consumers

Linear Supertypes
AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. kafka4m
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Package Members

  1. package admin
  2. package consumer
  3. package data
  4. package io
  5. package jmx
  6. package partitions
  7. package producer
  8. package util

Type Members

  1. type AckBytes = AckableRecord[ConsumerRecord[String, Array[Byte]]]
  2. type AckRecord[K, A] = AckableRecord[ConsumerRecord[K, A]]
  3. type Bytes = Array[Byte]
  4. type Key = String
  5. type KeyValue = (Key, Bytes)

Value Members

  1. def adminTopic(config: Config): String

    config

    the root configuration

    returns

    the admin topic as per the config

  2. def byteArrayProducer(config: Config): RichKafkaProducer[Key, Bytes]
  3. def closeConsumerOnComplete(config: Config): Boolean

    config

    the kafka4m config

    returns

    true if observables should be closed when complete

  4. def closeProducerOnComplete(config: Config): Boolean
  5. def consumerTopics(config: Config): Set[String]

    config

    the root configuration

    returns

    the consumer topic as per the config

  6. def continueOnError(config: Config): Boolean
  7. def ensureTopicBlocking(config: Config)(implicit ec: ExecutionContext): Option[String]

    Kafka Streams will fail if the topic does not yet exist.

    Kafka Streams will fail if the topic does not yet exist. This way we can provide a means to 'getOrCreate' a topic if that's how it's configured.

  8. def fireAndForget(config: Config): Boolean
  9. def kafkaProducer[K, V](config: Config): RichKafkaProducer[K, V]

    config

    the kafka4m config

    returns

    a RichKafkaProducer for the given config

  10. def loadBalance[A, B](config: Config = ConfigFactory.load(), parallelism: Int = Runtime.getRuntime.availableProcessors(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(compute: (A) => Task[B])(implicit arg0: ByteArrayDecoder[A], scheduler: Scheduler): Observable[ComputeResult[A, B]]

    load balance the given compute, returning a tuple of the kafka data together with the completed request/response data

    load balance the given compute, returning a tuple of the kafka data together with the completed request/response data

    returns

    an observable of a data structure which contains the local offset (the single-threaded sequential order of the read message), the kafka record, and the deserialized request/response

  11. def producerTopic(config: Config): String

    config

    the root configuration

    returns

    the producer topic as per the config

  12. def read[A](config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit arg0: ByteArrayDecoder[A], ioScheduler: Scheduler): Observable[AckableRecord[A]]

    config

    the configuration

    kafkaScheduler

    the single-threaded scheduler for when like, we have to talk to kafka

  13. def readByteArray(config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit ioScheduler: Scheduler): Observable[AckBytes]
  14. def readRecords[A](config: Config = ConfigFactory.load())(implicit arg0: ByteArrayDecoder[A], scheduler: Scheduler): Observable[A]

    config

    the kafka4m configuration which contains the 'kafka4m.consumer' values

    returns

    an Observable of data coming from kafka. The offsets, etc will be controlled by the kafka4m.consumer configuration, which includes default offset strategy, etc.

  15. def richAdmin(config: Config): RichKafkaAdmin
  16. def stream(config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit scheduler: Scheduler): Task[KafkaStream[AckBytes]]

    returns

    a ConcurrentStream of the records

  17. def write[A](config: Config = ConfigFactory.load())(implicit ev: Aux[A, Key, Bytes]): Consumer[A, Long]

    A

    any type A which can be converted into a kafka ProducerRecord

    config

    the kafka4m configuration

    returns

    a consumer of the 'A' values and produce the number written

  18. def writeKeyAndBytes(config: Config = ConfigFactory.load()): Consumer[(String, Array[Byte]), Long]

    config

    the kafka4m configuration

    returns

    a consumer which will consume a stream of key/byte-array values into kafka and return the number written

  19. def writeText(config: Config = ConfigFactory.load()): Consumer[String, Long]

    config

    the kafka4m configuration

    returns

    a consumer which will consume raw text data and write it with null keys

  20. object Kafka4mApp extends ConfigApp with StrictLogging

    An ETL entry point to read data into or out of kafka

Inherited from AnyRef

Inherited from Any

Ungrouped