Dsquares Kafka Library

March, 2023

Dsquares Kafka NuGet Package

V5.6.0

Date

Version

Produced by

Reviewed by

Comments

Apr 4, 2023

v5.6.0

EP Team

First Draft

Table of Contents

Document versions 1

Introduction: 2

1. What is kafka?

2. What is event streaming?

3-Apache Kafka is an event Streaming Platform

4-what is Kafka topic?

5-Kafka Producer

6-Kafka Consumers

Dsquares.kafkaLibrary

1-LibProducerConfig

2-LibConsumerConfig

3-KafkaProducer

4-KafkaConsumer 3

Introduction

  • What is Kafka?

Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

  • What is event streaming?

Event streaming is the practice of capturing data in real-time from event sources like databases, mobile devices,cloud services and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating,processing, and reacting to the event streams in real-time.

  • Apache Kafka is an event streaming platform. What does that mean?

Kafka combines three key capabilities so you can implement your cases for event streaming end-to-end with a single battle-tested solution:

1-To publish (write) and subscribe to (read) streams of events, including continuous import and export of your data from other systems.

2-To Store streams of events durably and reliably

3-To Process streams of events

  • What is Kafka Topic?

Similar to how databases have tables to organize and segment datasets, Kafka uses the concept of topics to organize related messages.we may have a topic called logs that contains log messages, and another topic called purchases that may contains purchase data.

  • Kafka Producers

A Kafka producer sends messages to a topic, and messages are distributed to partitions according to a mechanism such as key hashing.

Each event message contains an optional key and a value.

In case the key (key ==null) is not specified to a producer, messages are distributed evenly across partitions in a topic. This means message are sent in a round-robin fashion

In case (key != null) then all message that share the same key will be always be sent and stored in the same kafka partition.

  • Kafka Consumers

Consumers can read from one or more partitions at a time in Apache kafka, and data is read in order within each partition. A consumer always read data from a lower offset to a higher offset and can not read data backwards.

As we have seen before, the data sent by the kafka producers is serialized. This means that the data received by kafka consumers must be correctly deserialized in the same format it was serialized in

Dsquares.KafkaLibrary

This is a nuget package we have created to facilitate the usage of kafka and not to repeat the same code for producer and consumer it just needs the configuration and handles the logic for producer and consumer.

It consists of four main classes:

1-LibProducerConfig 2-LibConsumerConfig 3-KafkaProducer 4-KafkaConsumer

  • 1-LibProducerConfig

Properties

The configuration for producer and consists of fields:

Data

Description

BootstrapServers

A list of host/port pairs that the producer will use to establish initial connections to the Kafka cluster.like”localhost:9092”

  • 2-LibConsumerConfig

Properties

The configuration for Consumer and consists of fields:

Data

Description

BootstrapServers

A list of host/port pairs that the producer will use to establish initial connections to the Kafka cluster.like”localhost:9092”

GroupId

The unique identifier of the consumer group that this consumer belongs to.

EnableAutoCommit

Whether the consumer should automatically commit its offsets to Kafka.

AutoOffsetReset

The action to take when the consumer has no initial offset or the current offset is out of range.like”Earliest”

AllowAutoCreateTopics

true:he broker will automatically create the topic with the default configuration settings when it receives the first message from the producer.

After the above two classes we are able to configure the Producer and consumer

  • 3-BaseEvent

It’s a shared class that represents the message that will be produced to kafka Topic(Queue) and consumed from the same topic. It consists of shared properties that we need from other programs. It’s synchronized between all programs and if in future we need to add more information to the message we can inherit from it and synchronize the new event with the gamification team.

It consists of:

Property

Description

Type : (int)

It describes the type of trigger that gamification listen to like(Earned Points, Redeemed Points,Total Points Balance,Redeemed Vouchers Count and Subscribed Vouchers Count). It can be an Enum and synchronized with gamification team.

CustomerMSISDN : (string)

It describes the customer number.

Value : (decimal)

It describes the value of trigger like (500 points , count of burned voucher)

SerializedResponse : (object)

It’s the response of the api like (response of SubcribeToOffer)

ClientName: (string)

It describes the client name like(CIB)

DateCreated:

(DateTime)

The date the message was created at. It has a default value(DateTime.Now)

  • 4-KafkaProducer

It’s a generic class that will be used to produce messages to a topic, it accepts object of type LibProducerConfig in its constructor and it contains only one method called ProduceAsync.

Methods

The methods of producer::

Data

Description

ProduceAsync(string topic, T @event, CancellationToken cancellationToken = default)

1-It will create a producerBuilder using the LibProducerConfig that you pass in constructor and 2- will create new message using value that we pass in producAsyn

3- it will produce using our builder and return deliveryResult.

Inputs:

1-topic: string (name of topic we will produce to it)

2-@event: it’s a generic but should be of type BaseEvent represents the message we need to produce.

Outputs:

1-deliverResult: DeliveryResult<String, string>

(specify the result of sending message success or fail).

  • How to use KafkaProducer?

1- Install Dsquares.KafkaLibrary version 5.6.0 the last stable version

2- create interface IKafkaService

3-create kafkaService that implements the IkafkaService

The steps for kafkaService:

1- Create an object of type LibProducerConfig that contains the configuration of producer for now we need only the BootstrapServers property it defines the url for our broker like (34.140.244.151:9092)

2- Create new object of type KafkaProducer<T> it a generic producer like (KafkaProducer<BaseEvent>) and pass the LibProducerConfig (created in step 1) to producer constructor

3-Use the producer to produce new message. The producer has a method called ProduceAsync(string topic, T event, T value, CancellationToken cancellationToken = default)

  • 3-KafkaConsumer

It’s the class that we will be used to Consume messages from a topic, it accepts object of type LibConsumerConfig in its constructor and it contains of methods:

Methods

The methods of Consumer::

Data

Description

Subscribe(string topic)

It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to Subscribe to a topic.

Inputs:

1-topic: string (name of topic we will consume from it)

Outputs: return void

Consumer()

It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to consume from a topic. It should be called after subscribing.

Inputs: takes no parameters

Outputs:

consumerResult: ConsumeResult<string, string>(the message that we consumed).

Commit()

It creates a ConsumerBuilder using libConsumerConfig we pass in the constructor and use this builder to commit (move the offset to the new cell after consuming it).should be called after consume().

Inputs: takes no parameters

Outputs: returns void

ConsumeAsync(string topic,bool AutoCommit, CancellationToken cancellationToken = default)

We do the above three steps in a single method(Subscribe, Consume,and Commit).

Inputs:

1-topic: string (name of topic we will consume from it)

2-AutoCommit: bool(to Enable auto Commit)

Outputs:

consumerResult: ConsumeResult<string, string>(the message that we consumed).

Dec, 2022

Dsquares Gamification – Marketplace APIs

Last updated