Packages

p

kafka4m

consumer

package consumer

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. All

Type Members

  1. final case class AckableRecord[A] extends ConsumerAccess with Product with Serializable

    Represents a message with the components needed to commit the offsets/partitions to Kafka

  2. trait BytesDecoder[A] extends AnyRef

    If we have a BytesDecoder, then we can have an RecordDecoder.

    If we have a BytesDecoder, then we can have an RecordDecoder. This shit chains.

    A

    the result type

  3. case class ComputeResult[A, B](localConsumedOffset: ZipOffset, kafkaRecord: AckableRecord[A], taskResult: B) extends ConsumerAccess with Product with Serializable

    A tuple of all the information at hand from a ConcurrentStream computation

  4. case class ConcurrentStream[A](kafkaData: Observable[A], asyncScheduler: Scheduler, kafkaFacade: KafkaFacade, minCommitFrequency: Int, awaitJobTimeout: FiniteDuration = 10.seconds, retryDuration: FiniteDuration = 50.milliseconds)(implicit hasOffset: HasOffset[A]) extends StrictLogging with AutoCloseable with Product with Serializable

    Kafka consumer access is enforced to be single-threaded, and you can understand why.

    Kafka consumer access is enforced to be single-threaded, and you can understand why.

    Suppose a consumer was to read ten messages from kafka and send off ten async requests.

    If the tenth request happened to come back first and commit its offset, then what about the other nine which might fail?

    On the flip-side, if we were to block on each async call for every message, that would be a performance killer, and unnecessary if the calls are idempotent.

    To enable async handling/commits, we just need to ensure we cater for this case:

    msg1 -------------->+
                        |
    msg2 ----->+        |
               |      !bang!
     ok  <-----+        |
                        |
         <- onFailure --+

    we shouldn't commit the offset for msg2, even though it succeeded first.

    The way we handle this is by having the futures drive a ConcurrentSubject of offsets zipped with the messages we receive from Kafka.

    msg1 --------------> ???
    msg2 --------------> ???
    msg3 --------------> ???
    msg4 --------------> ???
    msg5 --------------> ???
    msg6 --------------> ???
    
    ... some mixed order - just ensuring we do get either a failure or a success for each result
    
    msg6 <--------- ???
    msg2 <--------- ???
    msg5 <--------- ???
    msg1 <--------- ??? // here we can commit up to offset 2 as 1 and 2 have returned
    A

    the messages in the kafka feed (typically AckableRecords)

    kafkaData

    the data coming from kafka

    asyncScheduler

    the scheduler to use in running tasks

    kafkaFacade

    our means of committing offsets to kafka

    minCommitFrequency

    how frequently we'll try and commit offsets to kafka. Set to 0 to commit as soon as tasks complete successfully

    awaitJobTimeout

    the amount of time to wait for the last job to complete when trying to commit the last position back to kafka

    retryDuration

    the "poll time" when checking for the result of the final task

  5. trait ConsumerAccess extends AnyRef

    Access to Kafka is enforced to be single-threaded.

    Access to Kafka is enforced to be single-threaded.

    'ConsumerAccess' will expose access to our RichKafkaConsumer on the thread on which it was created.

    Have fun, but Be Careful!

  6. trait ContiguousOrdering[A] extends AnyRef

    A means of ordering 'A' assuming an incrementing long value.

    A means of ordering 'A' assuming an incrementing long value.

    This was created to ensure we can piece together a specific contiguous ordering after asynchronously dispatching a zipped observable to different tasks

  7. trait HasOffset[A] extends AnyRef

    the functions required by ConcurrentStream to commit offsets back to kafka

  8. trait HasRecord[A] extends AnyRef

    Represents something from which we can obtain a ConsumerRecord

    Represents something from which we can obtain a ConsumerRecord

    A

    the input value type (typically a tuple, list, etc)

  9. class KafkaStream[A] extends ConsumerAccess

    Combines our concurrent stream and kafka access

  10. trait RecordDecoder[K, A, R] extends AnyRef

    A means of decoding kafka records

    A means of decoding kafka records

    K

    the key type

    A

    the value type

    R

    the result type

  11. final class RichKafkaConsumer[K, V] extends AutoCloseable with ConsumerAccess with StrictLogging

    A means of driving a kafka-stream using the consumer (not kafka streaming) API

  12. final case class TopicStatus(topic: String, partitions: Seq[KafkaPartitionInfo]) extends Product with Serializable

Value Members

  1. object AckableRecord extends Serializable
  2. object BytesDecoder
  3. object ConcurrentStream extends Serializable
  4. object ContiguousOrdering
  5. object HasOffset
  6. object HasRecord
  7. object KafkaStream
  8. object RebalanceListener extends ConsumerRebalanceListener with StrictLogging with Product with Serializable
  9. object RecordDecoder
  10. object RichKafkaConsumer extends StrictLogging

Ungrouped