Dsquares Kafka Library
Last updated
Last updated
March, 2023
Dsquares Kafka NuGet Package
Document versions 1
Introduction: 2
1. What is kafka?
2. What is event streaming?
3-Apache Kafka is an event Streaming Platform
4-what is Kafka topic?
5-Kafka Producer
6-Kafka Consumers
Dsquares.kafkaLibrary
1-LibProducerConfig
2-LibConsumerConfig
3-KafkaProducer
4-KafkaConsumer 3
What is Kafka?
Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
What is event streaming?
Event streaming is the practice of capturing data in real-time from event sources like databases, mobile devices,cloud services and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating,processing, and reacting to the event streams in real-time.
Apache Kafka is an event streaming platform. What does that mean?
Kafka combines three key capabilities so you can implement your cases for event streaming end-to-end with a single battle-tested solution:
1-To publish (write) and subscribe to (read) streams of events, including continuous import and export of your data from other systems.
2-To Store streams of events durably and reliably
3-To Process streams of events
What is Kafka Topic?
Similar to how databases have tables to organize and segment datasets, Kafka uses the concept of topics to organize related messages.we may have a topic called logs that contains log messages, and another topic called purchases that may contains purchase data.
Kafka Producers
A Kafka producer sends messages to a topic, and messages are distributed to partitions according to a mechanism such as key hashing.
Each event message contains an optional key and a value.
In case the key (key ==null) is not specified to a producer, messages are distributed evenly across partitions in a topic. This means message are sent in a round-robin fashion
In case (key != null) then all message that share the same key will be always be sent and stored in the same kafka partition.
Kafka Consumers
Consumers can read from one or more partitions at a time in Apache kafka, and data is read in order within each partition. A consumer always read data from a lower offset to a higher offset and can not read data backwards.
As we have seen before, the data sent by the kafka producers is serialized. This means that the data received by kafka consumers must be correctly deserialized in the same format it was serialized in
This is a nuget package we have created to facilitate the usage of kafka and not to repeat the same code for producer and consumer it just needs the configuration and handles the logic for producer and consumer.
It consists of four main classes:
1-LibProducerConfig 2-LibConsumerConfig 3-KafkaProducer 4-KafkaConsumer
1-LibProducerConfig
The configuration for producer and consists of fields:
2-LibConsumerConfig
The configuration for Consumer and consists of fields:
After the above two classes we are able to configure the Producer and consumer
3-BaseEvent
It’s a shared class that represents the message that will be produced to kafka Topic(Queue) and consumed from the same topic. It consists of shared properties that we need from other programs. It’s synchronized between all programs and if in future we need to add more information to the message we can inherit from it and synchronize the new event with the gamification team.
It consists of:
4-KafkaProducer
It’s a generic class that will be used to produce messages to a topic, it accepts object of type LibProducerConfig in its constructor and it contains only one method called ProduceAsync.
Methods
The methods of producer::
How to use KafkaProducer?
1- Install Dsquares.KafkaLibrary version 5.6.0 the last stable version
2- create interface IKafkaService
3-create kafkaService that implements the IkafkaService
The steps for kafkaService:
1- Create an object of type LibProducerConfig that contains the configuration of producer for now we need only the BootstrapServers property it defines the url for our broker like (34.140.244.151:9092)
2- Create new object of type KafkaProducer<T> it a generic producer like (KafkaProducer<BaseEvent>) and pass the LibProducerConfig (created in step 1) to producer constructor
3-Use the producer to produce new message. The producer has a method called ProduceAsync(string topic, T event, T value, CancellationToken cancellationToken = default)
3-KafkaConsumer
It’s the class that we will be used to Consume messages from a topic, it accepts object of type LibConsumerConfig in its constructor and it contains of methods:
Methods
The methods of Consumer::
Dec, 2022
Dsquares Gamification – Marketplace APIs
Date
Version
Produced by
Reviewed by
Comments
Apr 4, 2023
v5.6.0
EP Team
First Draft
Data
Description
BootstrapServers
A list of host/port pairs that the producer will use to establish initial connections to the Kafka cluster.like”localhost:9092”
Data
Description
BootstrapServers
A list of host/port pairs that the producer will use to establish initial connections to the Kafka cluster.like”localhost:9092”
GroupId
The unique identifier of the consumer group that this consumer belongs to.
EnableAutoCommit
Whether the consumer should automatically commit its offsets to Kafka.
AutoOffsetReset
The action to take when the consumer has no initial offset or the current offset is out of range.like”Earliest”
AllowAutoCreateTopics
true:he broker will automatically create the topic with the default configuration settings when it receives the first message from the producer.
Property
Description
Type : (int)
It describes the type of trigger that gamification listen to like(Earned Points, Redeemed Points,Total Points Balance,Redeemed Vouchers Count and Subscribed Vouchers Count). It can be an Enum and synchronized with gamification team.
CustomerMSISDN : (string)
It describes the customer number.
Value : (decimal)
It describes the value of trigger like (500 points , count of burned voucher)
SerializedResponse : (object)
It’s the response of the api like (response of SubcribeToOffer)
ClientName: (string)
It describes the client name like(CIB)
DateCreated:
(DateTime)
The date the message was created at. It has a default value(DateTime.Now)
Data
Description
ProduceAsync(string topic, T @event, CancellationToken cancellationToken = default)
1-It will create a producerBuilder using the LibProducerConfig that you pass in constructor and 2- will create new message using value that we pass in producAsyn
3- it will produce using our builder and return deliveryResult.
Inputs:
1-topic: string (name of topic we will produce to it)
2-@event: it’s a generic but should be of type BaseEvent represents the message we need to produce.
Outputs:
1-deliverResult: DeliveryResult<String, string>
(specify the result of sending message success or fail).
Data
Description
Subscribe(string topic)
It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to Subscribe to a topic.
Inputs:
1-topic: string (name of topic we will consume from it)
Outputs: return void
Consumer()
It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to consume from a topic. It should be called after subscribing.
Inputs: takes no parameters
Outputs:
consumerResult: ConsumeResult<string, string>(the message that we consumed).
Commit()
It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to commit (move the offset to the new cell after consuming it).should be called after consume().
Inputs: takes no parameters
Outputs: returns void
ConsumeAsync(string topic,bool AutoCommit, CancellationToken cancellationToken = default)
We do the above three steps in a single method(Subscribe, Consume,and Commit).
Inputs:
1-topic: string (name of topic we will consume from it)
2-AutoCommit: bool(to Enable auto Commit)
Outputs:
consumerResult: ConsumeResult<string, string>(the message that we consumed).