Backend/πŸ”— Kafka

[Kafka] Avro μ•Œμ•„λ³΄κΈ°

Hugehoo 2024. 3. 4. 20:11

 

 

μ„œλ‘ 

데이터λ₯Ό νŒŒμΌμ— μ“°κ±°λ‚˜ λ„€νŠΈμ›Œν¬λ₯Ό 톡해 μ „μ†‘ν•˜λ €λ©΄ λ°”μ΄νŠΈμ—΄ ν˜•νƒœλ‘œ encoding μž‘μ—…μ΄ ν•„μš”ν•˜λ‹€. μΈλ©”λͺ¨λ¦¬ ν‘œν˜„μ—μ„œ λ°”μ΄νŠΈμ—΄λ‘œ μ „ν™˜ν•˜λŠ” μž‘μ—…μ„ 인코딩(encoding, λΆ€ν˜Έν™”, serializing) 이라 ν•˜λ©°, κ·Έ λ°˜λŒ€λ₯Ό λ””μ½”λ”©(decoding, λ³΅ν˜Έν™”, deserializing) 이라 ν•œλ‹€. 

 

Binary Encoding

μ—¬λŸ¬ 인코딩 방법이 μ‘΄μž¬ν•˜μ§€λ§Œ μš°μ„  이진 λΆ€ν˜Έν™” 방식을 μ‚΄νŽ΄λ³΄μž. 이진 λΆ€ν˜Έν™”λŠ” 데이터λ₯Ό 이진 숫자(0, 1)둜 ν‘œν˜„ν•˜λŠ” 과정을 μ˜λ―Έν•œλ‹€. 데이터λ₯Ό 이진 ν˜•νƒœλ‘œ ν‘œν˜„ν•˜κΈ° λ•Œλ¬Έμ— JSON, XML 같은 λΆ€ν˜Έν™” 방법보닀 효율적인 데이터 μ €μž₯ 및 전솑이 κ°€λŠ₯ν•˜λ‹€.  가독성이 λ›°μ–΄λ‚œ JSON 방식보닀 μ½κΈ°λŠ” μ–΄λ ΅κ² μ§€λ§Œ λŒ€κ·œλͺ¨ 데이터셋을 μ €μž₯ν•΄μ•Ό ν•˜λŠ” 상황이라면 이진 λΆ€ν˜Έν™”κ°€ 쒋은 선택이 될 수 μžˆλ‹€. μž‘μ€ λ°μ΄ν„°μ…‹μ˜ κ²½μš°μ—” JSON 으둜 μ €μž₯ 방식에 λΉ„ν•΄ 크게 μ €μž₯ νš¨μœ¨μ„±μ΄ 높아지진 μ•ŠλŠ”λ‹€κ³  ν•œλ‹€.

 

Thrift and Protoco Buffers

μ•„νŒŒμΉ˜ μŠ€λ¦¬ν”„νŠΈμ™€ ν”„λ‘œν† μ½œ λ²„νΌλŠ” 같은 원리λ₯Ό 기반으둜 ν•œ 이진 λΆ€ν˜Έν™” 라이브러리둜, ν”„λ‘œν† μ½œ λ²„νΌλŠ” κ΅¬κΈ€μ—μ„œ, μŠ€λ¦¬ν”„νŠΈλŠ” νŽ˜μ΄μŠ€λΆμ—μ„œ κ°œλ°œν–ˆλ‹€. 두 방식 λͺ¨λ‘ λΆ€ν˜Έν™”ν•  데이터λ₯Ό μœ„ν•œ μŠ€ν‚€λ§ˆκ°€ ν•„μš”ν•˜λ‹€. 

protobuf μŠ€ν‚€λ§ˆ μ˜ˆμ‹œ

μœ„μ—μ„œ μ •μ˜ν•œ μŠ€ν‚€λ§ˆλŒ€λ‘œ Person 객체의 데이터λ₯Ό μ €μž₯ν•  수 μžˆλ‹€. λ¬Έμ œλŠ” μ‹œκ°„μ΄ μ§€λ‚˜κ³  μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ΄ 컀짐에 따라 μŠ€ν‚€λ§ˆ μ—­μ‹œ ν•„μ—°μ μœΌλ‘œ λ³€ν•˜κΈ° λ§ˆλ ¨μ΄λ‹€. 이λ₯Ό Schema Evolution 이라 ν•œλ‹€.

 

Avro μ•„λΈŒλ‘œ

 

μ•„λΈŒλ‘œλŠ” ν”„λ‘œν† μ½œ 버퍼와 μŠ€λ¦¬ν”„νŠΈμ™€λŠ” λ‹€λ₯Έ 또 ν•˜λ‚˜μ˜ 이진 λΆ€ν˜Έν™” λ°©λ²•μœΌλ‘œ ν•˜λ‘‘μ˜ ν•˜μœ„ ν”„λ‘œμ νŠΈμ—μ„œ μ‹œμž‘λλ‹€.

μ•„λΈŒλ‘œ μ—­μ‹œ λΆ€ν˜Έν™”ν•  데이터 ꡬ쑰λ₯Ό μ§€μ •ν•˜κΈ° μœ„ν•΄ μŠ€ν‚€λ§ˆλ₯Ό μ‚¬μš©ν•œλ‹€. 2가지 μŠ€ν‚€λ§ˆ μ–Έμ–΄κ°€ μ‘΄μž¬ν•˜λŠ”λ° μ‚¬λžŒμ΄ νŽΈμ§‘ν•  수 μžˆλŠ” 1) Avro IDL κ³Ό 기계가 더 μ‰½κ²Œ 읽을 수 μžˆλŠ” 2) JSON 기반 μ–Έμ–΄λ‹€. (사싀 μ‚¬λžŒλ„ JSON 기반 μ–Έμ–΄λ₯Ό 잘 읽을 수 μžˆμ§€ μ•Šλ‚˜?)

  • Avro IDL μ˜ˆμ‹œ
record Person {
    string               userName;
    union { null, long } favoriteNumber = null;
    array<string>        interests;
}
  • JSON 기반
{
  "type": "record",
  "name": "Person",
  "fields": [
    {"name": "userName", "type": "string"},
    {"name": "favoriteNumber", "type": ["null", "long"], "default": null},
    {"name": "interests", "type": {"type": "array", "items": "string"}}
  ]
}

 

Avro의 JSON 기반 μŠ€ν‚€λ§ˆλ₯Ό 보면 typeμ΄λ‚˜ name 같은 사전에 μ •μ˜λœ ν•„λ“œκ°€ λ‚˜μ˜¨λ‹€.

Avro μ—λŠ” null, boolean, int, long, float, double, bytes, string 의 μ›μ‹œνƒ€μž…κ³Ό record, enums, arrays, maps, unions λ“± 볡합 νƒ€μž…μ΄ μ‘΄μž¬ν•˜λŠ”λ° κ°€μž₯ 많이 μ‚¬μš©λ˜λŠ” record νƒ€μž…μ€ μ•„λž˜ μ˜ˆμ‹œ μ½”λ“œμ™€ 같은 κ΅¬μ„±μš”μ†Œλ₯Ό 가진닀.

  • name — λ¬Έμžμ—΄ : μŠ€ν‚€λ§ˆ 이름 (ν•„μˆ˜)
  • namespace — λ¬Έμžμ—΄ : νŒ¨ν‚€μ§€ (선택)
  • doc — λ¬Έμžμ—΄ : μŠ€ν‚€λ§ˆμ— λŒ€ν•œ μ„€λͺ… (선택)
  • aliases — λ°°μ—΄ : 이 λ ˆμ½”λ“œμ— λŒ€ν•œ λŒ€μ²΄ 이름 (선택)
    이름을 λ°”κΎΌ 경우 여기에 ꡬ버전 이름을 κΈ°μž…ν•˜μ—¬ μ‚¬μš©ν•  수 μžˆλ‹€.
  • fields — λ°°μ—΄ : ν•„λ“œμ— λŒ€ν•œ 리슀트 (ν•„μˆ˜)

fields ν•˜μœ„ μš”μ†Œ

  • name — λ¬Έμžμ—΄ : ν•„λ“œμ˜ 이름 (ν•„μˆ˜)
  • doc — λ¬Έμžμ—΄ : ν•„λ“œμ— λŒ€ν•œ μ„€λͺ… (선택)
  • type — μŠ€ν‚€λ§ˆ : ν•„λ“œμ˜ νƒ€μž…, νƒ€μž… ν˜Ήμ€ μŠ€ν‚€λ§ˆλ₯Ό μ—¬λŸ¬κ°œ κΈ°μž…ν•  수 μžˆλ‹€ (ν•„μˆ˜)
  • default : ν•„λ“œμ˜ κΈ°λ³Έ κ°’ (선택)
  • order : 데이터λ₯Ό μ •λ ¬ν•  λ°©ν–₯을 μ •ν•œλ‹€.
    ascending이 기본이고, descending, ignore μ˜΅μ…˜μ΄ μžˆλ‹€. 직렬화 될 λ•Œ 정렬이 μˆ˜ν–‰λœλ‹€. (선택)

 

κ·Έλž˜μ„œ μ§€κΈˆκΉŒμ§€ μ‚΄νŽ΄λ³Έ Avro λ₯Ό μ •λ¦¬ν•˜λ©΄ μš°μ„  ν”„λ‘œν† μ½œ 버퍼 같은 이진 λΆ€ν˜Έν™” 방법 쀑 ν•˜λ‚˜λ‘œ 생각할 수 μžˆλ‹€.

이 이진 λΆ€ν˜Έν™” λ°©λ²•μ˜ μž₯점은 데이터λ₯Ό 효율적으둜 μ €μž₯, 전솑할 수 μžˆμ„ 뿐만 μ•„λ‹ˆλΌ 효율적으둜 데이터 직렬화/역직렬화 ν•˜λŠ”λ°λ„ μ‚¬μš©ν•  μžˆλ‹¨ 점이닀.

그럼 Avro λŠ” μ–Έμ œ μ‚¬μš©ν• κΉŒ? λŒ€κ·œλͺ¨ 데이터 처리 μ‹œμŠ€ν…œμ—μ„œ μ‚¬μš©ν•  μˆ˜λ„ 있고, λ©”μ‹œμ§€ 큐 μ‹œμŠ€ν…œμ—μ„œ 데이터λ₯Ό μ „μ†‘ν•˜κ³  κ΅ν™˜ν•  λ•Œλ„ μ‚¬μš©ν•  수 μžˆλ‹€. 데이터λ₯Ό λΆ€ν˜Έν™”ν•  λ•Œμ˜ μž₯점을 μƒκ°ν•˜λ©΄ μ™œ Avro κ°€ λŒ€κ·œλͺ¨ 데이터 처리 μ‹œμŠ€ν…œμ—μ„œ μ‚¬μš©λ˜λŠ”μ§€ μ•Œ 수 μžˆλ‹€. 그럼 μ•žμ„œ 잠깐 μ„€λͺ…ν•œ Thrift, Protoco Buffers 와 달리 Avro 만이 κ°€μ§€λŠ” νŠΉμ§•μ€ 뭘까?

 

 

 

Avro 의 νŠΉμ§•

1. 동적 μ½”λ“œ 생성

AvroλŠ” μŠ€ν‚€λ§ˆλ₯Ό μ‚¬μš©ν•˜μ—¬ 데이터λ₯Ό μ§λ ¬ν™”ν•˜κ³  μ—­μ§λ ¬ν™”ν•˜λŠ” 동적 μ½”λ“œ 생성 κΈ°λŠ₯을 μ œκ³΅ν•œλ‹€. λŸ°νƒ€μž„ μ‹œμ— μŠ€ν‚€λ§ˆ 정보λ₯Ό μ‚¬μš©ν•˜μ—¬ μ½”λ“œλ₯Ό μƒμ„±ν•˜λ―€λ‘œ κ°œλ°œμžκ°€ 직접 μ½”λ“œλ₯Ό μž‘μ„±ν•  ν•„μš”κ°€ μ—†λ‹€. μ•„λž˜ μ‹€μŠ΅μ—μ„œλ„ λ³Ό 수 μžˆμ§€λ§Œ JSON 으둜 μ •μ˜ν•œ μŠ€ν‚€λ§ˆλ₯Ό μžλ°” 파일둜 λΉŒλ“œν•˜μ—¬ μ‚¬μš©ν•  수 μžˆμ–΄ 개발이 μš©μ΄ν•˜λ‹€λŠ” μž₯점을 κ°–λŠ”λ‹€.

2. μŠ€ν‚€λ§ˆ μ§„ν™”μ˜ μš©μ΄μ„±

AvroλŠ” μŠ€ν‚€λ§ˆ 진화λ₯Ό μ§€μ›ν•˜μ—¬ 데이터 ꡬ쑰의 변경을 μ‰½κ²Œ μ²˜λ¦¬ν•  수 μžˆλ‹€. μƒˆλ‘œμš΄ ν•„λ“œλ₯Ό μΆ”κ°€ν•˜κ±°λ‚˜ κΈ°μ‘΄ ν•„λ“œλ₯Ό μˆ˜μ •μ΄ μš©μ΄ν•˜λ©°, μ΄λŠ” λ°μ΄ν„°μ˜ μœ μ—°μ„±κ³Ό μƒν˜Έ μš΄μš©μ„±μ„ 높일 수 μžˆλ‹€. 

μ‹€μ œλ‘œλŠ” Thrift와 Protocol Buffers도 μŠ€ν‚€λ§ˆ 진화λ₯Ό μ§€μ›ν•˜μ§€λ§Œ Avro 의 μŠ€ν‚€λ§ˆ 진화가 쑰금 더 κ°„νŽΈν•˜κ³  μœ μ—°ν•˜κ²Œ 이루어진닀.
Thrift와 Protocol BuffersλŠ” 초기 μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•œ ν›„μ—λŠ” ν•΄λ‹Ή μŠ€ν‚€λ§ˆλ₯Ό λ³€κ²½ν•˜λŠ” 것이 μ–΄λ ΅μ§€λ§Œ, AvroλŠ” μƒˆλ‘œμš΄ ν•„λ“œλ₯Ό μΆ”κ°€ν•˜κ±°λ‚˜ κΈ°μ‘΄ ν•„λ“œλ₯Ό μˆ˜μ •ν•˜λŠ” λ“±μ˜ μž‘μ—…μ„ 비ꡐ적 μ‰½κ²Œ μ²˜λ¦¬ν•  수 μžˆλ‹€. λ˜ν•œ, AvroλŠ” μŠ€ν‚€λ§ˆμ˜ 버전을 μΆ”μ ν•˜κ³ , λ‹€λ₯Έ λ²„μ „μ˜ μŠ€ν‚€λ§ˆλ‘œ μ§λ ¬ν™”λœ 데이터λ₯Ό 역직렬화할 수 μžˆλŠ” κΈ°λŠ₯을 μ œκ³΅ν•œλ‹€.

  • μƒμœ„ ν˜Έν™˜μ„±μ€ μ˜ˆμ „ μ½”λ“œκ°€ μƒˆλ‘œμš΄ μ½”λ“œλ‘œ 기둝된 λ ˆμ½”λ“œλ₯Ό 읽을 수 μžˆλŠ” 것
  • ν•˜μœ„ ν˜Έν™˜μ„±μ€ μƒˆλ‘œμš΄ μ½”λ“œκ°€ μ˜ˆμ „ 데이터λ₯Ό 읽을 수 μžˆλŠ” 것
  • Thrift, ProtoBufs 도 μƒμœ„/ν•˜μœ„ ν˜Έν™˜μ„±μ„ μœ μ§€ν•˜λ©΄μ„œ μŠ€ν‚€λ§ˆλ₯Ό λ³€κ²½ν•  수 μžˆλ‹€.


3. JSON 기반 μŠ€ν‚€λ§ˆ μ •μ˜

Avro의 μŠ€ν‚€λ§ˆλŠ” JSON ν˜•μ‹μœΌλ‘œ μ •μ˜λ˜μ–΄ μžˆμ–΄ μ‚¬λžŒμ΄ μ‰½κ²Œ 읽고 이해할 수 μžˆλ‹€.

4. 자체 μ••μΆ• κΈ°λŠ₯

AvroλŠ” 데이터λ₯Ό μ••μΆ•ν•˜μ—¬ μ €μž₯ν•˜λŠ” κΈ°λŠ₯을 제곡ν•œλ‹€. λ°μ΄ν„°μ˜ 전솑 및 μ €μž₯ κ³Όμ •μ—μ„œ λ°œμƒν•˜λŠ” λ„€νŠΈμ›Œν¬ λŒ€μ—­ν­κ³Ό λ””μŠ€ν¬ 곡간을 μ ˆμ•½ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

 

μ•žμœΌλ‘œ λ°μ΄ν„°μ˜ 직렬화/역직렬화 μΈ‘λ©΄μ—μ„œ Avro λ₯Ό 더 μ‚΄νŽ΄λ³΄κ² λ‹€.

 

 

[μ‹€μŠ΅] Avro λ₯Ό μ‚¬μš©ν•˜μ—¬ Kafka λ©”μ‹œμ§€ 전솑 

μ‹€μŠ΅ ν™˜κ²½

- java 17

- kafka-avro-serializer 6.1.0

- avro 1.7.6

- confluent schema-registry

- kafka-clients 3.4.0

- docker

Kafka Avro Serializer 라이브러리의 직렬화 과정을 개랡적으둜 μ‚΄νŽ΄λ³΄μž. 

 

1. Producer λŠ” Kafka Broker에 λ©”μ‹œμ§€λ₯Ό μ „μ†‘ν•˜κΈ° 전에 λ©”μ‹œμ§€ 직렬화 과정을 κ±°μ³μ•Όν•œλ‹€. μ‹€μŠ΅μ—μ„œλŠ” Confluent μ—μ„œ μ œκ³΅ν•˜λŠ” AvroSerializer λ₯Ό μ‚¬μš©ν•˜κΈ°λ‘œ ν•œλ‹€. 

 

2. KafkaAvroSerializer λŠ” μ§€μ •λœ SchemaRegistry μ„œλ²„μ— μŠ€ν‚€λ§ˆ 정보λ₯Ό λ“±λ‘ν•œλ‹€. μ‹€μŠ΅μ—μ„œλŠ” Docker λ₯Ό μ‚¬μš©ν•΄ Schema Registry Server λ₯Ό μ‹€ν–‰ν–ˆλ‹€.

3. Schema Registry 에 μŠ€ν‚€λ§ˆκ°€ 정상 λ“±λ‘λ˜λ©΄ AvroSerializer λŠ” Schema Id λ₯Ό 리턴 λ°›λŠ”λ‹€.

4. AvroSerializer λŠ” 리턴받은 Schema Id 와 전솑할 λ©”μ‹œμ§€ 본문을 ν¬ν•¨ν•˜μ—¬ 데이터λ₯Ό 직렬화 ν•œλ‹€. Confluent Schema Registry λŠ” Magic Byte(1byte) + Schema Id (4byte) + Message 둜 κ΅¬μ„±λœ 데이터λ₯Ό μ§λ ¬ν™”ν•˜μ—¬ μΉ΄ν”„μΉ΄ λΈŒλ‘œμ»€μ— μ „μ†‘ν•œλ‹€. (Schema Registry λΌμ΄λΈŒλŸ¬λ¦¬λ§ˆλ‹€ λ…μžμ μΈ λ°©μ‹μœΌλ‘œ Schema Id λ₯Ό μ „λ‹¬ν•˜λŠ” 점을 μ°Έκ³ ν•˜μž)

 

build.gradle

avro λ₯Ό μ‚¬μš©ν•œ Kafka Producer λ₯Ό λ§Œλ“€κΈ° μœ„ν•΄ build.gradle 에 ν•„μš”ν•œ μ˜μ‘΄μ„±μ„ μ„€μΉ˜ν•œλ‹€.

kafka broker 와 schema-registry server λŠ” docker-compose λ₯Ό ν™œμš©ν•΄ μ‹€ν–‰ν–ˆλ‹€.

 

docker-compose

 

 

 

Avro : λ©”μ‹œμ§€ μŠ€ν‚€λ§ˆ μž‘μ„±

κ°„λ‹¨νžˆ μƒκ°ν•˜λ©΄ Avro λŠ” 3가지 κΈ°λŠ₯ (λ©”μ‹œμ§€ μŠ€ν‚€λ§ˆ 제곡 / λ©”μ‹œμ§€ 직렬화-역직렬화 / Schema to Code Generation)을 μ œκ³΅ν•˜λŠ” μ‹œμŠ€ν…œμ΄λ‹€.

λ©”μ‹œμ§€μ˜ μŠ€ν‚€λ§ˆ 제곡 κΈ°λŠ₯을 μ‚΄νŽ΄λ³΄μž. μ•„λž˜μ²˜λŸΌ Avro Schema λ₯Ό JSON 으둜 μ‰½κ²Œ μž‘μ„±ν•  수 있고 파일의 ν™•μž₯μžλŠ” .avsc 둜 μ§€μ •ν–ˆλ‹€. μš°λ¦¬κ°€ 직렬화/역직렬화 ν•  λ°μ΄ν„°μ˜ 규격(schema)을 μ •μ˜ν•œ μ…ˆμ΄λ‹€.

 

μŠ€ν‚€λ§ˆμ— μ§€μ •ν•œ λŒ€λ‘œ λ©”μ‹œμ§€λ₯Ό 이진 λΆ€ν˜Έν™” ν•  수 있고 ν”„λ‘œμ νŠΈλ₯Ό λΉŒλ“œν•˜λ©΄ ν•΄λ‹Ή μŠ€ν‚€λ§ˆμ— λ§žλŠ” μžλ°” μ½”λ“œλ₯Ό 생성할 μˆ˜λ„ μžˆλ‹€.

μƒμ„±λœ μ½”λ“œλŠ” build.gradle - generateAvroJava - outputDir 에 μ •μ˜ν•œ κ²½λ‘œμ— μƒμ„±λœλ‹€.

μƒμ„±λœ Customer μžλ°” 객체

 

 

Producer

μ˜ˆμ‹œλ‘œ μž‘μ„±ν•œ avsc νŒŒμΌμ€ Customer λΌλŠ” μ΄λ¦„μ˜ schema λ₯Ό μ •μ˜ν•œλ‹€. μ•žμ„œ μ–ΈκΈ‰ν•œ Avro 의 μž₯점인 Schema to Code κΈ°λŠ₯을 ν™œμš©ν•˜λ©΄ JSON 으둜 μ •μ˜ν•œ μŠ€ν‚€λ§ˆλ₯Ό μžλ°” μΈμŠ€ν„΄μŠ€μ²˜λŸΌ μ‚¬μš©ν•  수 μžˆλ‹€.

class AvroProducer {
    private val name = listOf("μ„±ν›„", "은찬", "λ‚˜μ˜", "ν˜„κ΅¬", "동인", "재휘", "μ„±μœ€", "μ •ν˜„", "μ •μˆ˜", "μ£Όν˜•")
    private val color = listOf(
        "RED", "BLUE", "BLACK", "PURPLE", "ORANGE", "NAVY", "SKYBLUE", "MINT", "YELLOW",
        "WHITE"
    )

    fun producerAvro() {
        val start = System.currentTimeMillis()
        val iterate: MutableList<Int> = ArrayList()
        val configs = properties
        val producer = KafkaProducer<String, Customer>(configs)

        for (iter in 0..19) {
            val i = (Math.random() * 10).toInt()
            val customer = Customer.newBuilder()
                .setName(name[i])
                .setFavoriteColor(color[i])
//                .setFavoriteNumber(21) // default κ°’ μ„ΈνŒ…
                .build()
            val producerRecord = ProducerRecord<String, Customer>(Constants.TOPIC_AVRO, customer)
            producer.send(producerRecord)
            iterate.add(i)
        }
        val end = System.currentTimeMillis()
        logger.info("Duration : {}, size : {}", end - start, iterate.size)
        producer.flush()
        producer.close()
    }

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaApplication::class.java)
        private val properties: Properties
            get() {
                val properties = Properties()
                properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = Constants.BOOTSTRAP_SERVERS
                properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
                properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
                properties["schema.registry.url"] = Constants.SCHEMA_REGISTRY_URL
                return properties
            }
    }
}

 

 

 

Consumer

Avro μ»¨μŠˆλ¨ΈλŠ” Avro 포맷으둜 μ§λ ¬ν™”λœ 데이터λ₯Ό μˆ˜μ‹ ν•˜κ³  이 데이터λ₯Ό μ—­μ§λ ¬ν™”ν•œλ‹€. μ»¨μŠˆλ¨ΈλŠ” 각 λ©”μ‹œμ§€μ˜ μ‹œμž‘ 뢀뢄에 μžˆλŠ” μŠ€ν‚€λ§ˆ IDλ₯Ό μ‚¬μš©ν•˜μ—¬ ν•΄λ‹Ή λ©”μ‹œμ§€μ˜ Avro μŠ€ν‚€λ§ˆλ₯Ό μ‹λ³„ν•˜κ³  μŠ€ν‚€λ§ˆ λ ˆμ§€μŠ€νŠΈλ¦¬μ—μ„œ ν•΄λ‹Ή ID에 λŒ€ν•œ μ‹€μ œ μŠ€ν‚€λ§ˆλ₯Ό κ°€μ Έμ˜¨λ‹€. 즉 Kafka λ©”μ‹œμ§€μ— μ§μ ‘μ μœΌλ‘œ μŠ€ν‚€λ§ˆκ°€ ν¬ν•¨λ˜μ–΄ μžˆμ§€ μ•Šλ”λΌλ„ Avro μŠ€ν‚€λ§ˆ 정보λ₯Ό μ‚¬μš©ν•  수 μžˆλ‹€. 맀번 μΉ΄ν”„μΉ΄λ‘œ λ©”μ‹œμ§€λ₯Ό 전솑할 λ•Œλ§ˆλ‹€ Schema 정보λ₯Ό 담을 ν•„μš”κ°€ μ—†μœΌλ―€λ‘œ 데이터 μ „μ†‘μ˜ νš¨μœ¨μ„±μ΄ 높아진닀.

μ»¨μŠˆλ¨ΈλŠ” λ©”μ‹œμ§€μ˜ μŠ€ν‚€λ§ˆλ₯Ό μ‚¬μš©ν•˜μ—¬ λ©”μ‹œμ§€λ₯Ό λ””μ½”λ”©ν•˜κ³  ν•„μš”ν•œ ν•„λ“œλ₯Ό μ½μ–΄μ˜¨λ‹€. μ΄λ ‡κ²Œ ν•˜λ©΄ λ°μ΄ν„°μ˜ μŠ€ν‚€λ§ˆ 변경에도 μœ μ—°ν•˜κ²Œ λŒ€μ‘ν•  수 μžˆλ‹€. μƒˆλ‘œμš΄ μŠ€ν‚€λ§ˆκ°€ λ“±λ‘λ˜λ©΄ μ»¨μŠˆλ¨ΈλŠ” μŠ€ν‚€λ§ˆ λ ˆμ§€μŠ€νŠΈλ¦¬μ—μ„œ ν•΄λ‹Ή μŠ€ν‚€λ§ˆλ₯Ό 가져와 μƒˆλ‘œμš΄ 데이터에 λŒ€ν•΄ μ˜¬λ°”λ₯΄κ²Œ 역직렬화 ν•  수 μžˆλ‹€.

class AvroConsumer {
    companion object {
        private val logger: Logger = LoggerFactory.getLogger(AvroConsumer::class.java)
    }

    fun consume() {
        val configs = properties()
        val consumer = KafkaConsumer<String, Customer>(configs)
        consumer.subscribe(Collections.singletonList(TOPIC_AVRO))
        while (true) {
            for (record: ConsumerRecord<String, Customer> in consumer.poll(Duration.ofSeconds(1))) {
                logger.info("value | {}", record.value())
            }
        }
    }

    private fun properties(): Properties {
        val configs = Properties()
        configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
        configs[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
        configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
        configs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
        configs["schema.registry.url"] = SCHEMA_REGISTRY_URL
        configs[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
        return configs
    }
}

 

 

Consumer μ—μ„œ μˆ˜μ‹ ν•œ 데이터λ₯Ό ν™•μΈν•˜λ©΄ μ •μƒμ μœΌλ‘œ μ—­μ§λ ¬ν™”λ˜λŠ” 것을 확인할 수 μžˆλ‹€.

 

 

 

ref
- https://elsboo.tistory.com/50

- https://medium.com/@gaemi/kafka-%EC%99%80-confluent-schema-registry-%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%8A%A4%ED%82%A4%EB%A7%88-%EA%B4%80%EB%A6%AC-1-cdf8c99d2c5c

- https://medium.com/@stephane.maarek/introduction-to-schemas-in-apache-kafka-with-the-confluent-schema-registry-3bf55e401321