kafka4m
package kafka4m
The high-level API space for kafka consumers
- Alphabetic
- By Inheritance
- kafka4m
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Package Members
Type Members
- type AckBytes = AckableRecord[ConsumerRecord[String, Array[Byte]]]
- type AckRecord[K, A] = AckableRecord[ConsumerRecord[K, A]]
- type Bytes = Array[Byte]
- type Key = String
- type KeyValue = (Key, Bytes)
Value Members
- def adminTopic(config: Config): String
- config
the root configuration
- returns
the admin topic as per the config
- def byteArrayProducer(config: Config): RichKafkaProducer[Key, Bytes]
- def closeConsumerOnComplete(config: Config): Boolean
- config
the kafka4m config
- returns
true if observables should be closed when complete
- def closeProducerOnComplete(config: Config): Boolean
- def consumerTopics(config: Config): Set[String]
- config
the root configuration
- returns
the consumer topic as per the config
- def continueOnError(config: Config): Boolean
- def ensureTopicBlocking(config: Config)(implicit ec: ExecutionContext): Option[String]
Kafka Streams will fail if the topic does not yet exist.
Kafka Streams will fail if the topic does not yet exist. This way we can provide a means to 'getOrCreate' a topic if that's how it's configured.
- def fireAndForget(config: Config): Boolean
- def kafkaProducer[K, V](config: Config): RichKafkaProducer[K, V]
- config
the kafka4m config
- returns
a RichKafkaProducer for the given config
- def loadBalance[A, B](config: Config = ConfigFactory.load(), parallelism: Int = Runtime.getRuntime.availableProcessors(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(compute: (A) => Task[B])(implicit arg0: ByteArrayDecoder[A], scheduler: Scheduler): Observable[ComputeResult[A, B]]
load balance the given compute, returning a tuple of the kafka data together with the completed request/response data
load balance the given compute, returning a tuple of the kafka data together with the completed request/response data
- returns
an observable of a data structure which contains the local offset (the single-threaded sequential order of the read message), the kafka record, and the deserialized request/response
- def producerTopic(config: Config): String
- config
the root configuration
- returns
the producer topic as per the config
- def read[A](config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit arg0: ByteArrayDecoder[A], ioScheduler: Scheduler): Observable[AckableRecord[A]]
- config
the configuration
- kafkaScheduler
the single-threaded scheduler for when like, we have to talk to kafka
- def readByteArray(config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit ioScheduler: Scheduler): Observable[AckBytes]
- def readRecords[A](config: Config = ConfigFactory.load())(implicit arg0: ByteArrayDecoder[A], scheduler: Scheduler): Observable[A]
- config
the kafka4m configuration which contains the 'kafka4m.consumer' values
- returns
an Observable of data coming from kafka. The offsets, etc will be controlled by the kafka4m.consumer configuration, which includes default offset strategy, etc.
- def richAdmin(config: Config): RichKafkaAdmin
- def stream(config: Config = ConfigFactory.load(), kafkaScheduler: Scheduler = FixedScheduler().scheduler)(implicit scheduler: Scheduler): Task[KafkaStream[AckBytes]]
- returns
a ConcurrentStream of the records
- def write[A](config: Config = ConfigFactory.load())(implicit ev: Aux[A, Key, Bytes]): Consumer[A, Long]
- A
any type A which can be converted into a kafka ProducerRecord
- config
the kafka4m configuration
- returns
a consumer of the 'A' values and produce the number written
- def writeKeyAndBytes(config: Config = ConfigFactory.load()): Consumer[(String, Array[Byte]), Long]
- config
the kafka4m configuration
- returns
a consumer which will consume a stream of key/byte-array values into kafka and return the number written
- def writeText(config: Config = ConfigFactory.load()): Consumer[String, Long]
- config
the kafka4m configuration
- returns
a consumer which will consume raw text data and write it with null keys
- object Kafka4mApp extends ConfigApp with StrictLogging
An ETL entry point to read data into or out of kafka