Kafka 101
unpublished draft
KafkaC#.Net
Overview#
At the time writing this, using latest confluent kafka 7.0.0
- Intro to Kafka - Topics and partitions
- Confluent Apache Kafka series
- The Power of Kafka Partitions: How to Get the Most out of Your Kafka Cluster
- Some basic simplyfied math: How many partitions are needed?
- Confluent licence detail
- Confluent platform might costly for production, read license first.
- Kafka configuration using confluent image
- Configuration docker reference
- .Net Specific:
Docker setup#
Using separate image.
# If network is not yet created
docker network create -d bridge kafka-net
# Get Zookeeper
docker run -d -–net=kafka-net -–name=zookeeper --hostname=zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -e ZOOKEEPER_SYNC_LIMIT=2 confluentinc/cp-zookeeper
# Get Kafka broker
# bootstrap.servers was locate on localhost:9092 ì accessing from the outside.
docker run -d -–net=kafka-net -–name=kafka --hostname=kafka-broker -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:29092,PLAINTEXT_HOST://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -e KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL=http://schema-registry:8081 -e CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS=kafka-broker:29092 -e CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS=1 -e CONFLUENT_METRICS_ENABLE=true -e CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous confluentinc/cp-kafka
# Get Schema-registry
docker run -d -–net=kafka-net -–name=kafka-schema-registry --hostname=schema-registry -p 9501:8081 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka-broker:29092 -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 -e SCHEMA_REGISTRY_HOST_NAME=schema-registry confluentinc/cp-schema-registry
# Get Kafka-ui
# Located on localhost:9500
docker run -d -–net=kafka-net -p 9500:8080 --name=kafka-ui -e KAFKA_CLUSTERS_0_NAME=local -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-broker:29092 -e KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 -e KAFKA_CLUSTERS_0_SCHEMAREGISTRY=schema-registry:8081 provectuslabs/kafka-ui:latest
Using docker compose.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka-broker
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- '9501:8081'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-broker:29092'
KAFKA_CLUSTERS_0_ZOOKEEPER: http://0.0.0.0:8081
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- '9500:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local
SCHEMA_REGISTRY_LISTENERS: zookeeper:2181
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-broker:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: schema-registry:8081
Some little note#
- Schema registry would record new message type as publish message to topics.
- Commit offset will still be made if exception occured if auto commit enabled.
- Namespace and name in Avro schema after parse affected de-serializer process. Must ensure the order of project property, otherwise, exception would be throw.
- For schema evolving, for anything like removing, change datatype, change property name,... Best way to work with other languages was append the version in namespace, copy and paste the code to new class, publish to new topic and new consumer. After the project was fine, remove all the old-consumer related. Remember, Order in avsc matter!
- Testing schema registry: GET | http://localhost:9501/subjects or GET | http://localhost:9501/schemas
- Schema format
- Avro data types: null, boolean, int, long, float, double, bytes, string, record, enumeration, array, map, union, fixed, error, logical
- Allowed Logical underlying type:
- date
- decimal
- time-micros(TimeMicrosecond - Represent time of day)
- time-millis(TimeMillisecond - Represent time of day)
- timestamp-micros(TimestampMicrosecond - Represent datetime, micro second unit)
- timestamp-millis(TimestampMillisecond - Represent datetime, Millis second unit)
- uuid
- Avro doc
- Avro schema
- Avro apache specs
- Allowed Logical underlying type:
Some useful documentation#
- Quick tutorial on Schema Registry, Kafka and Avro messages
- Tool that generate avro from C#
- For custom desirializer
- Avrogen.exe | using like: avrogen.exe -s .\avro\test.avsc .
- Notable Kafka UI
- Confluent REST APIs
- Implementation specific .net
Dotnet-avro tool using example
# Ensure Chr.Avro.Cli tool was install
dotnet tool install Chr.Avro.Cli --global
# For inheritance class, might have to provide more assemblies, don't be suprise if the class cannot be generated!
# The tool doens't ensure properties order.
# Sample command - current context: The folder contail AvroSpecific.dll
dotnet avro create -a AvroSpecific.dll -t Confluent.Kafka.Examples.AvroSpecific.MyTestingClass