[Kafka] Avro 알아보기

2024. 3. 4. 20:11☕️ Java

 

 

서론

데이터를 파일에 쓰거나 네트워크를 통해 전송하려면 바이트열 형태로 encoding 작업이 필요하다. 인메모리 표현에서 바이트열로 전환하는 작업을 인코딩(encoding, 부호화, serializing) 이라 하며, 그 반대를 디코딩(decoding, 복호화, deserializing) 이라 한다. 

 

Binary Encoding

여러 인코딩 방법이 존재하지만 우선 이진 부호화 방식을 살펴보자. 이진 부호화는 데이터를 이진 숫자(0, 1)로 표현하는 과정을 의미한다. 데이터를 이진 형태로 표현하기 때문에 JSON, XML 같은 부호화 방법보다 효율적인 데이터 저장 및 전송이 가능하다.  가독성이 뛰어난 JSON 방식보다 읽기는 어렵겠지만 대규모 데이터셋을 저장해야 하는 상황이라면 이진 부호화가 좋은 선택이 될 수 있다. 작은 데이터셋의 경우엔 JSON 으로 저장 방식에 비해 크게 저장 효율성이 높아지진 않는다고 한다.

 

Thrift and Protoco Buffers

아파치 스리프트와 프로토콜 버퍼는 같은 원리를 기반으로 한 이진 부호화 라이브러리로, 프로토콜 버퍼는 구글에서, 스리프트는 페이스북에서 개발했다. 두 방식 모두 부호화할 데이터를 위한 스키마가 필요하다. 

protobuf 스키마 예시

위에서 정의한 스키마대로 Person 객체의 데이터를 저장할 수 있다. 문제는 시간이 지나고 애플리케이션이 커짐에 따라 스키마 역시 필연적으로 변하기 마련이다. 이를 Schema Evolution 이라 한다.

 

Avro 아브로

 

아브로는 프로토콜 버퍼와 스리프트와는 다른 또 하나의 이진 부호화 방법으로 하둡의 하위 프로젝트에서 시작됐다.

아브로 역시 부호화할 데이터 구조를 지정하기 위해 스키마를 사용한다. 2가지 스키마 언어가 존재하는데 사람이 편집할 수 있는 1) Avro IDL 과 기계가 더 쉽게 읽을 수 있는 2) JSON 기반 언어다. (사실 사람도 JSON 기반 언어를 잘 읽을 수 있지 않나?)

  • Avro IDL 예시
record Person {
    string               userName;
    union { null, long } favoriteNumber = null;
    array<string>        interests;
}
  • JSON 기반
{
  "type": "record",
  "name": "Person",
  "fields": [
    {"name": "userName", "type": "string"},
    {"name": "favoriteNumber", "type": ["null", "long"], "default": null},
    {"name": "interests", "type": {"type": "array", "items": "string"}}
  ]
}

 

Avro의 JSON 기반 스키마를 보면 type이나 name 같은 사전에 정의된 필드가 나온다.

Avro 에는 null, boolean, int, long, float, double, bytes, string 의 원시타입과 record, enums, arrays, maps, unions 등 복합 타입이 존재하는데 가장 많이 사용되는 record 타입은 아래 예시 코드와 같은 구성요소를 가진다.

  • name — 문자열 : 스키마 이름 (필수)
  • namespace — 문자열 : 패키지 (선택)
  • doc — 문자열 : 스키마에 대한 설명 (선택)
  • aliases — 배열 : 이 레코드에 대한 대체 이름 (선택)
    이름을 바꾼 경우 여기에 구버전 이름을 기입하여 사용할 수 있다.
  • fields — 배열 : 필드에 대한 리스트 (필수)

fields 하위 요소

  • name — 문자열 : 필드의 이름 (필수)
  • doc — 문자열 : 필드에 대한 설명 (선택)
  • type — 스키마 : 필드의 타입, 타입 혹은 스키마를 여러개 기입할 수 있다 (필수)
  • default : 필드의 기본 값 (선택)
  • order : 데이터를 정렬할 방향을 정한다.
    ascending이 기본이고, descending, ignore 옵션이 있다. 직렬화 될 때 정렬이 수행된다. (선택)

 

그래서 지금까지 살펴본 Avro 를 정리하면 우선 프로토콜 버퍼 같은 이진 부호화 방법 중 하나로 생각할 수 있다.

이 이진 부호화 방법의 장점은 데이터를 효율적으로 저장, 전송할 수 있을 뿐만 아니라 효율적으로 데이터 직렬화/역직렬화 하는데도 사용할 있단 점이다.

그럼 Avro 는 언제 사용할까? 대규모 데이터 처리 시스템에서 사용할 수도 있고, 메시지 큐 시스템에서 데이터를 전송하고 교환할 때도 사용할 수 있다. 데이터를 부호화할 때의 장점을 생각하면 왜 Avro 가 대규모 데이터 처리 시스템에서 사용되는지 알 수 있다. 그럼 앞서 잠깐 설명한 Thrift, Protoco Buffers 와 달리 Avro 만이 가지는 특징은 뭘까?

 

 

 

Avro 의 특징

1. 동적 코드 생성

Avro는 스키마를 사용하여 데이터를 직렬화하고 역직렬화하는 동적 코드 생성 기능을 제공한다. 런타임 시에 스키마 정보를 사용하여 코드를 생성하므로 개발자가 직접 코드를 작성할 필요가 없다. 아래 실습에서도 볼 수 있지만 JSON 으로 정의한 스키마를 자바 파일로 빌드하여 사용할 수 있어 개발이 용이하다는 장점을 갖는다.

2. 스키마 진화의 용이성

Avro는 스키마 진화를 지원하여 데이터 구조의 변경을 쉽게 처리할 수 있다. 새로운 필드를 추가하거나 기존 필드를 수정이 용이하며, 이는 데이터의 유연성과 상호 운용성을 높일 수 있다. 

실제로는 Thrift와 Protocol Buffers도 스키마 진화를 지원하지만 Avro 의 스키마 진화가 조금 더 간편하고 유연하게 이루어진다.
Thrift와 Protocol Buffers는 초기 스키마를 정의한 후에는 해당 스키마를 변경하는 것이 어렵지만, Avro는 새로운 필드를 추가하거나 기존 필드를 수정하는 등의 작업을 비교적 쉽게 처리할 수 있다. 또한, Avro는 스키마의 버전을 추적하고, 다른 버전의 스키마로 직렬화된 데이터를 역직렬화할 수 있는 기능을 제공한다.

  • 상위 호환성은 예전 코드가 새로운 코드로 기록된 레코드를 읽을 수 있는 것
  • 하위 호환성은 새로운 코드가 예전 데이터를 읽을 수 있는 것
  • Thrift, ProtoBufs 도 상위/하위 호환성을 유지하면서 스키마를 변경할 수 있다.


3. JSON 기반 스키마 정의

Avro의 스키마는 JSON 형식으로 정의되어 있어 사람이 쉽게 읽고 이해할 수 있다.

4. 자체 압축 기능

Avro는 데이터를 압축하여 저장하는 기능을 제공한다. 데이터의 전송 및 저장 과정에서 발생하는 네트워크 대역폭과 디스크 공간을 절약할 수 있습니다.

 

앞으로 데이터의 직렬화/역직렬화 측면에서 Avro 를 더 살펴보겠다.

 

 

[실습] Avro 를 사용하여 Kafka 메시지 전송 

실습 환경

- java 17

- kafka-avro-serializer 6.1.0

- avro 1.7.6

- confluent schema-registry

- kafka-clients 3.4.0

- docker

Kafka Avro Serializer 라이브러리의 직렬화 과정을 개략적으로 살펴보자. 

 

1. Producer 는 Kafka Broker에 메시지를 전송하기 전에 메시지 직렬화 과정을 거쳐야한다. 실습에서는 Confluent 에서 제공하는 AvroSerializer 를 사용하기로 한다. 

 

2. KafkaAvroSerializer 는 지정된 SchemaRegistry 서버에 스키마 정보를 등록한다. 실습에서는 Docker 를 사용해 Schema Registry Server 를 실행했다.

3. Schema Registry 에 스키마가 정상 등록되면 AvroSerializer 는 Schema Id 를 리턴 받는다.

4. AvroSerializer 는 리턴받은 Schema Id 와 전송할 메시지 본문을 포함하여 데이터를 직렬화 한다. Confluent Schema Registry 는 Magic Byte(1byte) + Schema Id (4byte) + Message 로 구성된 데이터를 직렬화하여 카프카 브로커에 전송한다. (Schema Registry 라이브러리마다 독자적인 방식으로 Schema Id 를 전달하는 점을 참고하자)

 

build.gradle

avro 를 사용한 Kafka Producer 를 만들기 위해 build.gradle 에 필요한 의존성을 설치한다.

kafka broker 와 schema-registry server 는 docker-compose 를 활용해 실행했다.

 

docker-compose

 

 

 

Avro : 메시지 스키마 작성

간단히 생각하면 Avro 는 3가지 기능 (메시지 스키마 제공 / 메시지 직렬화-역직렬화 / Schema to Code Generation)을 제공하는 시스템이다.

메시지의 스키마 제공 기능을 살펴보자. 아래처럼 Avro Schema 를 JSON 으로 쉽게 작성할 수 있고 파일의 확장자는 .avsc 로 지정했다. 우리가 직렬화/역직렬화 할 데이터의 규격(schema)을 정의한 셈이다.

 

스키마에 지정한 대로 메시지를 이진 부호화 할 수 있고 프로젝트를 빌드하면 해당 스키마에 맞는 자바 코드를 생성할 수도 있다.

생성된 코드는 build.gradle - generateAvroJava - outputDir 에 정의한 경로에 생성된다.

생성된 Customer 자바 객체

 

 

Producer

예시로 작성한 avsc 파일은 Customer 라는 이름의 schema 를 정의한다. 앞서 언급한 Avro 의 장점인 Schema to Code 기능을 활용하면 JSON 으로 정의한 스키마를 자바 인스턴스처럼 사용할 수 있다.

class AvroProducer {
    private val name = listOf("성후", "은찬", "나영", "현구", "동인", "재휘", "성윤", "정현", "정수", "주형")
    private val color = listOf(
        "RED", "BLUE", "BLACK", "PURPLE", "ORANGE", "NAVY", "SKYBLUE", "MINT", "YELLOW",
        "WHITE"
    )

    fun producerAvro() {
        val start = System.currentTimeMillis()
        val iterate: MutableList<Int> = ArrayList()
        val configs = properties
        val producer = KafkaProducer<String, Customer>(configs)

        for (iter in 0..19) {
            val i = (Math.random() * 10).toInt()
            val customer = Customer.newBuilder()
                .setName(name[i])
                .setFavoriteColor(color[i])
//                .setFavoriteNumber(21) // default 값 세팅
                .build()
            val producerRecord = ProducerRecord<String, Customer>(Constants.TOPIC_AVRO, customer)
            producer.send(producerRecord)
            iterate.add(i)
        }
        val end = System.currentTimeMillis()
        logger.info("Duration : {}, size : {}", end - start, iterate.size)
        producer.flush()
        producer.close()
    }

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaApplication::class.java)
        private val properties: Properties
            get() {
                val properties = Properties()
                properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = Constants.BOOTSTRAP_SERVERS
                properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
                properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
                properties["schema.registry.url"] = Constants.SCHEMA_REGISTRY_URL
                return properties
            }
    }
}

 

 

 

Consumer

Avro 컨슈머는 Avro 포맷으로 직렬화된 데이터를 수신하고 이 데이터를 역직렬화한다. 컨슈머는 각 메시지의 시작 부분에 있는 스키마 ID를 사용하여 해당 메시지의 Avro 스키마를 식별하고 스키마 레지스트리에서 해당 ID에 대한 실제 스키마를 가져온다. 즉 Kafka 메시지에 직접적으로 스키마가 포함되어 있지 않더라도 Avro 스키마 정보를 사용할 수 있다. 매번 카프카로 메시지를 전송할 때마다 Schema 정보를 담을 필요가 없으므로 데이터 전송의 효율성이 높아진다.

컨슈머는 메시지의 스키마를 사용하여 메시지를 디코딩하고 필요한 필드를 읽어온다. 이렇게 하면 데이터의 스키마 변경에도 유연하게 대응할 수 있다. 새로운 스키마가 등록되면 컨슈머는 스키마 레지스트리에서 해당 스키마를 가져와 새로운 데이터에 대해 올바르게 역직렬화 할 수 있다.

class AvroConsumer {
    companion object {
        private val logger: Logger = LoggerFactory.getLogger(AvroConsumer::class.java)
    }

    fun consume() {
        val configs = properties()
        val consumer = KafkaConsumer<String, Customer>(configs)
        consumer.subscribe(Collections.singletonList(TOPIC_AVRO))
        while (true) {
            for (record: ConsumerRecord<String, Customer> in consumer.poll(Duration.ofSeconds(1))) {
                logger.info("value | {}", record.value())
            }
        }
    }

    private fun properties(): Properties {
        val configs = Properties()
        configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
        configs[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
        configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.name
        configs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
        configs["schema.registry.url"] = SCHEMA_REGISTRY_URL
        configs[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
        return configs
    }
}

 

 

Consumer 에서 수신한 데이터를 확인하면 정상적으로 역직렬화되는 것을 확인할 수 있다.

 

 

 

ref
- https://elsboo.tistory.com/50

- https://medium.com/@gaemi/kafka-%EC%99%80-confluent-schema-registry-%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%8A%A4%ED%82%A4%EB%A7%88-%EA%B4%80%EB%A6%AC-1-cdf8c99d2c5c

- https://medium.com/@stephane.maarek/introduction-to-schemas-in-apache-kafka-with-the-confluent-schema-registry-3bf55e401321