2024. 3. 4. 20:11γBackend/π Kafka
μλ‘
λ°μ΄ν°λ₯Ό νμΌμ μ°κ±°λ λ€νΈμν¬λ₯Ό ν΅ν΄ μ μ‘νλ €λ©΄ λ°μ΄νΈμ΄ ννλ‘ encoding μμ μ΄ νμνλ€. μΈλ©λͺ¨λ¦¬ ννμμ λ°μ΄νΈμ΄λ‘ μ ννλ μμ μ μΈμ½λ©(encoding, λΆνΈν, serializing) μ΄λΌ νλ©°, κ·Έ λ°λλ₯Ό λμ½λ©(decoding, 볡νΈν, deserializing) μ΄λΌ νλ€.
Binary Encoding
μ¬λ¬ μΈμ½λ© λ°©λ²μ΄ μ‘΄μ¬νμ§λ§ μ°μ μ΄μ§ λΆνΈν λ°©μμ μ΄ν΄λ³΄μ. μ΄μ§ λΆνΈνλ λ°μ΄ν°λ₯Ό μ΄μ§ μ«μ(0, 1)λ‘ νννλ κ³Όμ μ μλ―Ένλ€. λ°μ΄ν°λ₯Ό μ΄μ§ ννλ‘ νννκΈ° λλ¬Έμ JSON, XML κ°μ λΆνΈν λ°©λ²λ³΄λ€ ν¨μ¨μ μΈ λ°μ΄ν° μ μ₯ λ° μ μ‘μ΄ κ°λ₯νλ€. κ°λ μ±μ΄ λ°μ΄λ JSON λ°©μλ³΄λ€ μ½κΈ°λ μ΄λ ΅κ² μ§λ§ λκ·λͺ¨ λ°μ΄ν°μ μ μ μ₯ν΄μΌ νλ μν©μ΄λΌλ©΄ μ΄μ§ λΆνΈνκ° μ’μ μ νμ΄ λ μ μλ€. μμ λ°μ΄ν°μ μ κ²½μ°μ JSON μΌλ‘ μ μ₯ λ°©μμ λΉν΄ ν¬κ² μ μ₯ ν¨μ¨μ±μ΄ λμμ§μ§ μλλ€κ³ νλ€.
Thrift and Protoco Buffers
μνμΉ μ€λ¦¬ννΈμ νλ‘ν μ½ λ²νΌλ κ°μ μ리λ₯Ό κΈ°λ°μΌλ‘ ν μ΄μ§ λΆνΈν λΌμ΄λΈλ¬λ¦¬λ‘, νλ‘ν μ½ λ²νΌλ ꡬκΈμμ, μ€λ¦¬ννΈλ νμ΄μ€λΆμμ κ°λ°νλ€. λ λ°©μ λͺ¨λ λΆνΈνν λ°μ΄ν°λ₯Ό μν μ€ν€λ§κ° νμνλ€.
μμμ μ μν μ€ν€λ§λλ‘ Person κ°μ²΄μ λ°μ΄ν°λ₯Ό μ μ₯ν μ μλ€. λ¬Έμ λ μκ°μ΄ μ§λκ³ μ ν리μΌμ΄μ μ΄ μ»€μ§μ λ°λΌ μ€ν€λ§ μμ νμ°μ μΌλ‘ λ³νκΈ° λ§λ ¨μ΄λ€. μ΄λ₯Ό Schema Evolution μ΄λΌ νλ€.
Avro μλΈλ‘
μλΈλ‘λ νλ‘ν μ½ λ²νΌμ μ€λ¦¬ννΈμλ λ€λ₯Έ λ νλμ μ΄μ§ λΆνΈν λ°©λ²μΌλ‘ νλ‘μ νμ νλ‘μ νΈμμ μμλλ€.
μλΈλ‘ μμ λΆνΈνν λ°μ΄ν° ꡬ쑰λ₯Ό μ§μ νκΈ° μν΄ μ€ν€λ§λ₯Ό μ¬μ©νλ€. 2κ°μ§ μ€ν€λ§ μΈμ΄κ° μ‘΄μ¬νλλ° μ¬λμ΄ νΈμ§ν μ μλ 1) Avro IDL κ³Ό κΈ°κ³κ° λ μ½κ² μ½μ μ μλ 2) JSON κΈ°λ° μΈμ΄λ€. (μ¬μ€ μ¬λλ JSON κΈ°λ° μΈμ΄λ₯Ό μ μ½μ μ μμ§ μλ?)
- Avro IDL μμ
record Person {
string userName;
union { null, long } favoriteNumber = null;
array<string> interests;
}
- JSON κΈ°λ°
{
"type": "record",
"name": "Person",
"fields": [
{"name": "userName", "type": "string"},
{"name": "favoriteNumber", "type": ["null", "long"], "default": null},
{"name": "interests", "type": {"type": "array", "items": "string"}}
]
}
Avroμ JSON κΈ°λ° μ€ν€λ§λ₯Ό 보면 typeμ΄λ name κ°μ μ¬μ μ μ μλ νλκ° λμ¨λ€.
Avro μλ null, boolean, int, long, float, double, bytes, string μ μμνμ κ³Ό record, enums, arrays, maps, unions λ± λ³΅ν© νμ μ΄ μ‘΄μ¬νλλ° κ°μ₯ λ§μ΄ μ¬μ©λλ record νμ μ μλ μμ μ½λμ κ°μ ꡬμ±μμλ₯Ό κ°μ§λ€.
- name — λ¬Έμμ΄ : μ€ν€λ§ μ΄λ¦ (νμ)
- namespace — λ¬Έμμ΄ : ν¨ν€μ§ (μ ν)
- doc — λ¬Έμμ΄ : μ€ν€λ§μ λν μ€λͺ (μ ν)
- aliases — λ°°μ΄ : μ΄ λ μ½λμ λν λ체 μ΄λ¦ (μ ν)
μ΄λ¦μ λ°κΎΌ κ²½μ° μ¬κΈ°μ ꡬλ²μ μ΄λ¦μ κΈ°μ νμ¬ μ¬μ©ν μ μλ€. - fields — λ°°μ΄ : νλμ λν 리μ€νΈ (νμ)
fields νμ μμ
- name — λ¬Έμμ΄ : νλμ μ΄λ¦ (νμ)
- doc — λ¬Έμμ΄ : νλμ λν μ€λͺ (μ ν)
- type — μ€ν€λ§ : νλμ νμ , νμ νΉμ μ€ν€λ§λ₯Ό μ¬λ¬κ° κΈ°μ ν μ μλ€ (νμ)
- default : νλμ κΈ°λ³Έ κ° (μ ν)
- order : λ°μ΄ν°λ₯Ό μ λ ¬ν λ°©ν₯μ μ νλ€.
ascendingμ΄ κΈ°λ³Έμ΄κ³ , descending, ignore μ΅μ μ΄ μλ€. μ§λ ¬ν λ λ μ λ ¬μ΄ μνλλ€. (μ ν)
κ·Έλμ μ§κΈκΉμ§ μ΄ν΄λ³Έ Avro λ₯Ό μ 리νλ©΄ μ°μ νλ‘ν μ½ λ²νΌ κ°μ μ΄μ§ λΆνΈν λ°©λ² μ€ νλλ‘ μκ°ν μ μλ€.
μ΄ μ΄μ§ λΆνΈν λ°©λ²μ μ₯μ μ λ°μ΄ν°λ₯Ό ν¨μ¨μ μΌλ‘ μ μ₯, μ μ‘ν μ μμ λΏλ§ μλλΌ ν¨μ¨μ μΌλ‘ λ°μ΄ν° μ§λ ¬ν/μμ§λ ¬ν νλλ°λ μ¬μ©ν μλ¨ μ μ΄λ€.
κ·ΈλΌ Avro λ μΈμ μ¬μ©ν κΉ? λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬ μμ€ν μμ μ¬μ©ν μλ μκ³ , λ©μμ§ ν μμ€ν μμ λ°μ΄ν°λ₯Ό μ μ‘νκ³ κ΅νν λλ μ¬μ©ν μ μλ€. λ°μ΄ν°λ₯Ό λΆνΈνν λμ μ₯μ μ μκ°νλ©΄ μ Avro κ° λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬ μμ€ν μμ μ¬μ©λλμ§ μ μ μλ€. κ·ΈλΌ μμ μ κΉ μ€λͺ ν Thrift, Protoco Buffers μ λ¬λ¦¬ Avro λ§μ΄ κ°μ§λ νΉμ§μ λκΉ?
Avro μ νΉμ§
1. λμ μ½λ μμ±
Avroλ μ€ν€λ§λ₯Ό μ¬μ©νμ¬ λ°μ΄ν°λ₯Ό μ§λ ¬ννκ³ μμ§λ ¬ννλ λμ μ½λ μμ± κΈ°λ₯μ μ 곡νλ€. λ°νμ μμ μ€ν€λ§ μ 보λ₯Ό μ¬μ©νμ¬ μ½λλ₯Ό μμ±νλ―λ‘ κ°λ°μκ° μ§μ μ½λλ₯Ό μμ±ν νμκ° μλ€. μλ μ€μ΅μμλ λ³Ό μ μμ§λ§ JSON μΌλ‘ μ μν μ€ν€λ§λ₯Ό μλ° νμΌλ‘ λΉλνμ¬ μ¬μ©ν μ μμ΄ κ°λ°μ΄ μ©μ΄νλ€λ μ₯μ μ κ°λλ€.
2. μ€ν€λ§ μ§νμ μ©μ΄μ±
Avroλ μ€ν€λ§ μ§νλ₯Ό μ§μνμ¬ λ°μ΄ν° ꡬ쑰μ λ³κ²½μ μ½κ² μ²λ¦¬ν μ μλ€. μλ‘μ΄ νλλ₯Ό μΆκ°νκ±°λ κΈ°μ‘΄ νλλ₯Ό μμ μ΄ μ©μ΄νλ©°, μ΄λ λ°μ΄ν°μ μ μ°μ±κ³Ό μνΈ μ΄μ©μ±μ λμΌ μ μλ€.
μ€μ λ‘λ Thriftμ Protocol Buffersλ μ€ν€λ§ μ§νλ₯Ό μ§μνμ§λ§ Avro μ μ€ν€λ§ μ§νκ° μ‘°κΈ λ κ°νΈνκ³ μ μ°νκ² μ΄λ£¨μ΄μ§λ€.
Thriftμ Protocol Buffersλ μ΄κΈ° μ€ν€λ§λ₯Ό μ μν νμλ ν΄λΉ μ€ν€λ§λ₯Ό λ³κ²½νλ κ²μ΄ μ΄λ ΅μ§λ§, Avroλ μλ‘μ΄ νλλ₯Ό μΆκ°νκ±°λ κΈ°μ‘΄ νλλ₯Ό μμ νλ λ±μ μμ
μ λΉκ΅μ μ½κ² μ²λ¦¬ν μ μλ€. λν, Avroλ μ€ν€λ§μ λ²μ μ μΆμ νκ³ , λ€λ₯Έ λ²μ μ μ€ν€λ§λ‘ μ§λ ¬νλ λ°μ΄ν°λ₯Ό μμ§λ ¬νν μ μλ κΈ°λ₯μ μ 곡νλ€.
- μμ νΈνμ±μ μμ μ½λκ° μλ‘μ΄ μ½λλ‘ κΈ°λ‘λ λ μ½λλ₯Ό μ½μ μ μλ κ²
- νμ νΈνμ±μ μλ‘μ΄ μ½λκ° μμ λ°μ΄ν°λ₯Ό μ½μ μ μλ κ²
- Thrift, ProtoBufs λ μμ/νμ νΈνμ±μ μ μ§νλ©΄μ μ€ν€λ§λ₯Ό λ³κ²½ν μ μλ€.
3. JSON κΈ°λ° μ€ν€λ§ μ μ
Avroμ μ€ν€λ§λ JSON νμμΌλ‘ μ μλμ΄ μμ΄ μ¬λμ΄ μ½κ² μ½κ³ μ΄ν΄ν μ μλ€.
4. μ체 μμΆ κΈ°λ₯
Avroλ λ°μ΄ν°λ₯Ό μμΆνμ¬ μ μ₯νλ κΈ°λ₯μ μ 곡νλ€. λ°μ΄ν°μ μ μ‘ λ° μ μ₯ κ³Όμ μμ λ°μνλ λ€νΈμν¬ λμνκ³Ό λμ€ν¬ 곡κ°μ μ μ½ν μ μμ΅λλ€.
μμΌλ‘ λ°μ΄ν°μ μ§λ ¬ν/μμ§λ ¬ν μΈ‘λ©΄μμ Avro λ₯Ό λ μ΄ν΄λ³΄κ² λ€.
[μ€μ΅] Avro λ₯Ό μ¬μ©νμ¬ Kafka λ©μμ§ μ μ‘
μ€μ΅ νκ²½
- java 17
- kafka-avro-serializer 6.1.0
- avro 1.7.6
- confluent schema-registry
- kafka-clients 3.4.0
- docker
Kafka Avro Serializer λΌμ΄λΈλ¬λ¦¬μ μ§λ ¬ν κ³Όμ μ κ°λ΅μ μΌλ‘ μ΄ν΄λ³΄μ.
1. Producer λ Kafka Brokerμ λ©μμ§λ₯Ό μ μ‘νκΈ° μ μ λ©μμ§ μ§λ ¬ν κ³Όμ μ κ±°μ³μΌνλ€. μ€μ΅μμλ Confluent μμ μ 곡νλ AvroSerializer λ₯Ό μ¬μ©νκΈ°λ‘ νλ€.
2. KafkaAvroSerializer λ μ§μ λ SchemaRegistry μλ²μ μ€ν€λ§ μ 보λ₯Ό λ±λ‘νλ€. μ€μ΅μμλ Docker λ₯Ό μ¬μ©ν΄ Schema Registry Server λ₯Ό μ€ννλ€.
3. Schema Registry μ μ€ν€λ§κ° μ μ λ±λ‘λλ©΄ AvroSerializer λ Schema Id λ₯Ό λ¦¬ν΄ λ°λλ€.
4. AvroSerializer λ 리ν΄λ°μ Schema Id μ μ μ‘ν λ©μμ§ λ³Έλ¬Έμ ν¬ν¨νμ¬ λ°μ΄ν°λ₯Ό μ§λ ¬ν νλ€. Confluent Schema Registry λ Magic Byte(1byte) + Schema Id (4byte) + Message
λ‘ κ΅¬μ±λ λ°μ΄ν°λ₯Ό μ§λ ¬ννμ¬ μΉ΄νμΉ΄ λΈλ‘컀μ μ μ‘νλ€. (Schema Registry λΌμ΄λΈλ¬λ¦¬λ§λ€ λ
μμ μΈ λ°©μμΌλ‘ Schema Id λ₯Ό μ λ¬νλ μ μ μ°Έκ³ νμ)
build.gradle
avro λ₯Ό μ¬μ©ν Kafka Producer λ₯Ό λ§λ€κΈ° μν΄ build.gradle μ νμν μμ‘΄μ±μ μ€μΉνλ€.
kafka broker μ schema-registry server λ docker-compose λ₯Ό νμ©ν΄ μ€ννλ€.
docker-compose
Avro : λ©μμ§ μ€ν€λ§ μμ±
κ°λ¨ν μκ°νλ©΄ Avro λ 3κ°μ§ κΈ°λ₯ (λ©μμ§ μ€ν€λ§ μ 곡 / λ©μμ§ μ§λ ¬ν-μμ§λ ¬ν / Schema to Code Generation)μ μ 곡νλ μμ€ν μ΄λ€.
λ©μμ§μ μ€ν€λ§ μ 곡 κΈ°λ₯μ μ΄ν΄λ³΄μ. μλμ²λΌ Avro Schema λ₯Ό JSON μΌλ‘ μ½κ² μμ±ν μ μκ³ νμΌμ νμ₯μλ .avsc λ‘ μ§μ νλ€. μ°λ¦¬κ° μ§λ ¬ν/μμ§λ ¬ν ν λ°μ΄ν°μ κ·κ²©(schema)μ μ μν μ μ΄λ€.
μ€ν€λ§μ μ§μ ν λλ‘ λ©μμ§λ₯Ό μ΄μ§ λΆνΈν ν μ μκ³ νλ‘μ νΈλ₯Ό λΉλνλ©΄ ν΄λΉ μ€ν€λ§μ λ§λ μλ° μ½λλ₯Ό μμ±ν μλ μλ€.
μμ±λ μ½λλ build.gradle - generateAvroJava - outputDir
μ μ μν κ²½λ‘μ μμ±λλ€.
Producer
μμλ‘ μμ±ν avsc νμΌμ Customer λΌλ μ΄λ¦μ schema λ₯Ό μ μνλ€. μμ μΈκΈν Avro μ μ₯μ μΈ Schema to Code κΈ°λ₯μ νμ©νλ©΄ JSON μΌλ‘ μ μν μ€ν€λ§λ₯Ό μλ° μΈμ€ν΄μ€μ²λΌ μ¬μ©ν μ μλ€.
class AvroProducer {
private val name = listOf("μ±ν", "μμ°¬", "λμ", "νꡬ", "λμΈ", "μ¬ν", "μ±μ€", "μ ν", "μ μ", "μ£Όν")
private val color = listOf(
"RED", "BLUE", "BLACK", "PURPLE", "ORANGE", "NAVY", "SKYBLUE", "MINT", "YELLOW",
"WHITE"
)
fun producerAvro() {
val start = System.currentTimeMillis()
val iterate: MutableList<Int> = ArrayList()
val configs = properties
val producer = KafkaProducer<String, Customer>(configs)
for (iter in 0..19) {
val i = (Math.random() * 10).toInt()
val customer = Customer.newBuilder()
.setName(name[i])
.setFavoriteColor(color[i])
// .setFavoriteNumber(21) // default κ° μΈν
.build()
val producerRecord = ProducerRecord<String, Customer>(Constants.TOPIC_AVRO, customer)
producer.send(producerRecord)
iterate.add(i)
}
val end = System.currentTimeMillis()
logger.info("Duration : {}, size : {}", end - start, iterate.size)
producer.flush()
producer.close()
}
companion object {
private val logger = LoggerFactory.getLogger(KafkaApplication::class.java)
private val properties: Properties
get() {
val properties = Properties()
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = Constants.BOOTSTRAP_SERVERS
properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
properties["schema.registry.url"] = Constants.SCHEMA_REGISTRY_URL
return properties
}
}
}
Consumer
Avro 컨μλ¨Έλ Avro ν¬λ§·μΌλ‘ μ§λ ¬νλ λ°μ΄ν°λ₯Ό μμ νκ³ μ΄ λ°μ΄ν°λ₯Ό μμ§λ ¬ννλ€. 컨μλ¨Έλ κ° λ©μμ§μ μμ λΆλΆμ μλ μ€ν€λ§ IDλ₯Ό μ¬μ©νμ¬ ν΄λΉ λ©μμ§μ Avro μ€ν€λ§λ₯Ό μλ³νκ³ μ€ν€λ§ λ μ§μ€νΈλ¦¬μμ ν΄λΉ IDμ λν μ€μ μ€ν€λ§λ₯Ό κ°μ Έμ¨λ€. μ¦ Kafka λ©μμ§μ μ§μ μ μΌλ‘ μ€ν€λ§κ° ν¬ν¨λμ΄ μμ§ μλλΌλ Avro μ€ν€λ§ μ 보λ₯Ό μ¬μ©ν μ μλ€. λ§€λ² μΉ΄νμΉ΄λ‘ λ©μμ§λ₯Ό μ μ‘ν λλ§λ€ Schema μ 보λ₯Ό λ΄μ νμκ° μμΌλ―λ‘ λ°μ΄ν° μ μ‘μ ν¨μ¨μ±μ΄ λμμ§λ€.
컨μλ¨Έλ λ©μμ§μ μ€ν€λ§λ₯Ό μ¬μ©νμ¬ λ©μμ§λ₯Ό λμ½λ©νκ³ νμν νλλ₯Ό μ½μ΄μ¨λ€. μ΄λ κ² νλ©΄ λ°μ΄ν°μ μ€ν€λ§ λ³κ²½μλ μ μ°νκ² λμν μ μλ€. μλ‘μ΄ μ€ν€λ§κ° λ±λ‘λλ©΄ 컨μλ¨Έλ μ€ν€λ§ λ μ§μ€νΈλ¦¬μμ ν΄λΉ μ€ν€λ§λ₯Ό κ°μ Έμ μλ‘μ΄ λ°μ΄ν°μ λν΄ μ¬λ°λ₯΄κ² μμ§λ ¬ν ν μ μλ€.
class AvroConsumer {
companion object {
private val logger: Logger = LoggerFactory.getLogger(AvroConsumer::class.java)
}
fun consume() {
val configs = properties()
val consumer = KafkaConsumer<String, Customer>(configs)
consumer.subscribe(Collections.singletonList(TOPIC_AVRO))
while (true) {
for (record: ConsumerRecord<String, Customer> in consumer.poll(Duration.ofSeconds(1))) {
logger.info("value | {}", record.value())
}
}
}
private fun properties(): Properties {
val configs = Properties()
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
configs[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
configs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
configs["schema.registry.url"] = SCHEMA_REGISTRY_URL
configs[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
return configs
}
}
Consumer μμ μμ ν λ°μ΄ν°λ₯Ό νμΈνλ©΄ μ μμ μΌλ‘ μμ§λ ¬νλλ κ²μ νμΈν μ μλ€.
ref
- https://elsboo.tistory.com/50
- https://medium.com/@gaemi/kafka-%EC%99%80-confluent-schema-registry-%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%8A%A4%ED%82%A4%EB%A7%88-%EA%B4%80%EB%A6%AC-1-cdf8c99d2c5c
- https://medium.com/@stephane.maarek/introduction-to-schemas-in-apache-kafka-with-the-confluent-schema-registry-3bf55e401321
'Backend > π Kafka' μΉ΄ν κ³ λ¦¬μ λ€λ₯Έ κΈ
[kafka] 리밸λ°μ± μ’ λ₯μ 컨μλ¨Έ νν°μ ν λΉ μ λ΅ (0) | 2024.05.04 |
---|---|
[Kafka μλ¬] LEADER_NOT_AVAILABLE (0) | 2024.01.04 |