package consumer
- Alphabetic
- Public
- All
Type Members
- final case class AckableRecord[A] extends ConsumerAccess with Product with Serializable
Represents a message with the components needed to commit the offsets/partitions to Kafka
- trait BytesDecoder[A] extends AnyRef
If we have a BytesDecoder, then we can have an RecordDecoder.
If we have a BytesDecoder, then we can have an RecordDecoder. This shit chains.
- A
the result type
- case class ComputeResult[A, B](localConsumedOffset: ZipOffset, kafkaRecord: AckableRecord[A], taskResult: B) extends ConsumerAccess with Product with Serializable
A tuple of all the information at hand from a ConcurrentStream computation
- case class ConcurrentStream[A](kafkaData: Observable[A], asyncScheduler: Scheduler, kafkaFacade: KafkaFacade, minCommitFrequency: Int, awaitJobTimeout: FiniteDuration = 10.seconds, retryDuration: FiniteDuration = 50.milliseconds)(implicit hasOffset: HasOffset[A]) extends StrictLogging with AutoCloseable with Product with Serializable
Kafka consumer access is enforced to be single-threaded, and you can understand why.
Kafka consumer access is enforced to be single-threaded, and you can understand why.
Suppose a consumer was to read ten messages from kafka and send off ten async requests.
If the tenth request happened to come back first and commit its offset, then what about the other nine which might fail?
On the flip-side, if we were to block on each async call for every message, that would be a performance killer, and unnecessary if the calls are idempotent.
To enable async handling/commits, we just need to ensure we cater for this case:
msg1 -------------->+ | msg2 ----->+ | | !bang! ok <-----+ | | <- onFailure --+
we shouldn't commit the offset for msg2, even though it succeeded first.
The way we handle this is by having the futures drive a ConcurrentSubject of offsets zipped with the messages we receive from Kafka.
msg1 --------------> ??? msg2 --------------> ??? msg3 --------------> ??? msg4 --------------> ??? msg5 --------------> ??? msg6 --------------> ??? ... some mixed order - just ensuring we do get either a failure or a success for each result msg6 <--------- ??? msg2 <--------- ??? msg5 <--------- ??? msg1 <--------- ??? // here we can commit up to offset 2 as 1 and 2 have returned
- A
the messages in the kafka feed (typically AckableRecords)
- kafkaData
the data coming from kafka
- asyncScheduler
the scheduler to use in running tasks
- kafkaFacade
our means of committing offsets to kafka
- minCommitFrequency
how frequently we'll try and commit offsets to kafka. Set to 0 to commit as soon as tasks complete successfully
- awaitJobTimeout
the amount of time to wait for the last job to complete when trying to commit the last position back to kafka
- retryDuration
the "poll time" when checking for the result of the final task
- trait ConsumerAccess extends AnyRef
Access to Kafka is enforced to be single-threaded.
Access to Kafka is enforced to be single-threaded.
'ConsumerAccess' will expose access to our RichKafkaConsumer on the thread on which it was created.
Have fun, but Be Careful!
- trait ContiguousOrdering[A] extends AnyRef
A means of ordering 'A' assuming an incrementing long value.
A means of ordering 'A' assuming an incrementing long value.
This was created to ensure we can piece together a specific contiguous ordering after asynchronously dispatching a zipped observable to different tasks
- trait HasOffset[A] extends AnyRef
the functions required by ConcurrentStream to commit offsets back to kafka
- trait HasRecord[A] extends AnyRef
Represents something from which we can obtain a ConsumerRecord
Represents something from which we can obtain a ConsumerRecord
- A
the input value type (typically a tuple, list, etc)
- class KafkaStream[A] extends ConsumerAccess
Combines our concurrent stream and kafka access
- trait RecordDecoder[K, A, R] extends AnyRef
A means of decoding kafka records
A means of decoding kafka records
- K
the key type
- A
the value type
- R
the result type
- final class RichKafkaConsumer[K, V] extends AutoCloseable with ConsumerAccess with StrictLogging
A means of driving a kafka-stream using the consumer (not kafka streaming) API
- final case class TopicStatus(topic: String, partitions: Seq[KafkaPartitionInfo]) extends Product with Serializable
Value Members
- object AckableRecord extends Serializable
- object BytesDecoder
- object ConcurrentStream extends Serializable
- object ContiguousOrdering
- object HasOffset
- object HasRecord
- object KafkaStream
- object RebalanceListener extends ConsumerRebalanceListener with StrictLogging with Product with Serializable
- object RecordDecoder
- object RichKafkaConsumer extends StrictLogging