001 Kafka Cdc

tech blog 글 읽고 정리하기 #

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) #

B2B 알림 서비스에 CDC를 도입을 하게 되었다.

B2B 알림 서비스 #

B2B 알림 서비스 프로젝트가 무엇일까? 배민 B2C 고객 서비스에서는 알림센터라는 시스템을 통해 고객에 알림을 발송한다. 하지만, 사장님에게 발송되는 알림은 플랫폼이 부재한 관계로 카카오 알림톡으로 발송하고 있었다. 개선을 위해 알림센터를 활용해 사장님에게 전달되는 메시지를 내부 서비스를 통해 전달함으로써, 내부 서비스 활용도와 사용자 편의성을 향상시키고자 진행한 프로젝트가 ‘B2B 알림서비스’다. 여기서, 세일즈 매니저에 대한 알림톡을 알림센터 내 웹 푸시 알림으로 전환 하는 과정에서 CDC를 도입하게되었다. img.png

B2B 알림서비스에 CDC를 도입하게 된 이유 #

세일즈 매니저에게 알림이 발송되는 경우

  1. 세일즈 매니저 본인이 만든 업무 요청 건의 상태가 변경되는 경우
  2. 세일즈 매니저 본인이 해당 가게의 세일즈 매니저로 설정되어있는 업무 요청 건의 상태가 변경될 경우

기존에 알림 발송은 프론트 코드에 있던 상태였지만, 아래와 같은 문제 존재

  1. 네트워크 문제로 알림 발송 누락
  2. 알림 발송 이후 요청 건의 상태 변경에 실패하면 실제 데이터와 맞지 않음 위와 같은 사유로 백엔드에서 처리를 해야했다. 그래서 요청 건의 상태가 변경되면 DB에 반영되는 것에 착안하여 변경된 데이터를 감지하는 CDC를 선택하게되었다.

CDC (Change Data Capture) #

소스 시스템에서 데이터가 변경된 것을 감지하여, 타깃 시스템이 변경 작업에 대응하는 작업을 수행하도록 하는 프로세스

  • 소스 시스템 : DB
  • 타깃 시스템 : B2B 알림 서비스 CDC를 활용하면 데이터를 사용하는 모든 시스템에서 일관성을 유지할 수 있다는 장점이 있다.
  1. pull 방식 타깃 시스템의 주기적인 풀링으로 변경 사항이 있는지 확인 실시간성이 떨어진다. 구현은 쉽다.

  2. push 방식 소스 시스템이 변경될때마다 타깃 시스템에 알려준다. pull 방식에 비해 소스 시스템이 많은 작업을 해야하고, 타깃 시스템에 문제가 발생하면 변경 이벤트에 누락이 발생할 수 있다. 실시간성이 뛰어나다.

Kafka CDC

  • 위 방식 중에 Push 방식에서 이벤트 누락의 단점을 메시지 큐인 Kafka를 통해 해결하여, CDC 시스템을 만드는 것

Debezium MySQL Connector

  • DB로부터 데이터의 변경 이벤트를 감지해서 Kafka 이벤트를 발행해주는 것
  • Mysql의 binlog를 읽어 INSERT, UPDATE, DELETE 연산에 대한 변경 이벤트를 만들어 KAfka 토픽으로 이벤트를 전송
  • binlog를 기반으로 데이터를 수집하기 때문에, DB에서 수행된 모든 이벤트가 안정적으로 수집되고, 이벤트 발행시 정확한 순서 보장

Kafka CDC를 활용 방법 #

  1. 사전준비
  • Kafka
  • Kafka Connect
  • Debezium MySQL Connector
  1. Kafka CDC를 활용한 코드 작성하기
  • Kafka를 통해 넘어오는 이벤트 레코드를 변환해야 하는데, Debezium MySQL Connector는 Apache Avro를 지원한다.
  • 이를 사용하려면 스키마 레지스트리를 사용해야한다.
  • 스키마 레지스트리에 등록된 스키마를 받기 위해서, 프로젝트의 gradle에 설정을 추가해두면 편하다.
val schemaRegistry = "http://localhost:8081" // 스키마 레지스트리 주소
val downloadInputs = listOf(
    "schema.data-key",
    "schema.data-value"
)
val avroDestination = "org/main/avro" //avro 스키마가 저장될 프로젝트상의 위치
schemaRegistry {
    url.set(schemaRegistry)
    download {
        // 패턴에 해당하는 서브젝트(스키마)를 다운로드
        downloadInputs.forEach {
            subjectPattern(
                inputPattern = it,
                file = avroDestination
            )
        }
    }
}
  • arvo 스키마
{
    "type": "record",
    "name": "Envelope",
    "namespace": "schema.data",
    "fields": [
        {
            "name": "before",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "Value",
                    "fields": [
                        {
                            "name": "id",
                            "type": "long"
                        }
                        // ...
                    ],
                    "connect.name": "schema.data"
                }
            ],
            "default": null
        },
        {
            "name": "after",
            "type": [
                "null",
                "Value"
            ],
            "default": null
        },
        {
            "name": "source",
            "type": {
                "type": "record",
                "name": "Source",
                "namespace": "io.debezium.connector.mysql",
                "fields": [
                    {
                        "name": "version",
                        "type": "string"
                    }
                    // ...
                ],
                "connect.name": "io.debezium.connector.mysql.Source"
            }
        },
        {
            "name": "op",
            "type": "string"
        },
        {
            "name": "ts_ms",
            "type": [
                "null",
                "long"
            ],
            "default": null
        },
        {
            "name": "transaction",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "ConnectDefault",
                    "namespace": "io.confluent.connect.avro",
                    "fields": [
                        {
                            "name": "id",
                            "type": "string"
                        },
                        {
                            "name": "total_order",
                            "type": "long"
                        },
                        {
                            "name": "data_collection_order",
                            "type": "long"
                        }
                    ]
                }
            ],
            "default": null
        }
    ],
    "connect.name": "schema.Envelope"
}
  • 빌드 후 생성되는 클래스
fun Envelop.toBefore(): CdcRecord? {
    val before = this.getBefore() ?: return null

    return CdcRecord(
        //...
    )
}
  • ConsumerConfig 설정
@Configuration
class CdcConsumerConfig {
    @Bean(CDC_CONTAINER_FACTORY)
    fun cdcListenerContainerFactory(
        properties: CdcConsumerProperties,
        @Value("\${spring.kafka.bootstrap-servers}") bootstrapServers: String
    ): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Envelope>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Envelope>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(
            mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to properties.keyDeserializerClass,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to properties.valueDeserializerClass,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to properties.enableAutoCommit,
                ConsumerConfig.MAX_POLL_RECORDS_CONFIG to properties.maxPollRecords,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to properties.autoOffsetReset,
                // Schema Registry, Avro 관련 설정 필수
                KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to properties.schemaRegistryUrl,
                KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to properties.specificAvroReader,
            )
        )
        // ...
    }
}
  • 이벤트 리스너 생성
class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    private val sinkQueue = Queues.get<List<Envelope>>(4096).get()
    private val sinks = Sinks.many()
        .unicast()
        .onBackpressureBuffer(sinkQueue)
    private lateinit var disposable: Disposable

    @KafkaListener(
        topics = ["\${kafka.cdc.topic}"],
        groupId = "\${kafka.cdc.groupId}",
        containerFactory = CdcConsumerConfig.CDC_CONTAINER_FACTORY,
    )
    fun listen(
        @Payload payloads: List<Envelope?>,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition: Int,
        @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) ts: Long,
        acknowledgment: Acknowledgment,
    ) {
        // ... 
    }
}

조건 : CDC 이벤트의 종류에 따라서 다른 알림을 보내야 한다. 서로 다른 이벤트의 종류를 받을 이벤트 프로세서 + 각 이벤트 내에서 어떤 알림을 보낼지 결정하는 이벤트 핸들러

  • 요청 건에 대한 어떤 이벤트인가? : 가게 로고 수정, 가게 소개 수정 등

  • 어떤 이벤트인가? : 요청이 승인됨, 요청이 반려됨 등 img_1.png

  • 각 이벤트 프로세서에 이벤트를 전달하는 코드

class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    // ...
    @PostConstruct
    protected fun init() {
        disposable = sinks.asFlux()
            // ...
            // 이벤트를 처리 하는 과정에서 doAlarm으로 알림 발송
            .doOnNext(::doAlarm)
            // ...
    }

    private fun doAlarm(CdcRecords: List<Envelope> = emptyList()) {
        Flux.fromIterable(CdcRecords)
            .flatMap {
                Mono.fromCallable {
                    // 각각의 이벤트 프로세서에게 이벤트를 처리 하도록 지시
                    cdcEventProcessor.forEach { processor ->
                        try {
                            processor.process(
                                before = it.toBefore(),
                                after = it.toAfter(),
                            )
                        } catch (e: Exception) {
                            log.warn("[CdcEventProcessor] occured exception", e)
                        }
                    }
                }.subscribeOn(Schedulers.boundedElastic())
            }.subscribe()
    }
}
  • 이벤트를 처리하는 이벤트 프로세서
@Service
class NotificationCenterAlarmFacade(
    private val notificationHandlers: List<NotificationHandler>
) : CdcEventProcessor {

    override fun process(before: CdcRecord?, after: CdcRecord?) {
        log.debug("[알림서비스] process 진입 before = `{}`, after = `{}`", before, after)

        if (after == null) {
            log.info("[알림서비스] 데이터 삭제건에 대해서는 알림서비스 발송처리를 하지 않습니다. before: `{}`", before)
            return
        }

        notificationHandlers.find { it.accept(before = before, after = after) }
            ?.send(record = after)
    }
}

이벤트 프로세서에서는 자신이 전달받은 이벤트에 대해서 처리할 수 있는 이벤트인지 확인한 후, 처리할 수 있는 이벤트의 종류라면 자신이 가지고 있는 이벤트 핸들러들에게 처리를 위임한다.

  • 핸들러는 본인이 처리할 수 있는 이벤트인지 확인하고 알림을 보낸다. -> B2B 알림 서비스 동작
@Component
class CompleteHandler : NotificationHandler {
    override fun send(record: CdcRecord) {
        // 알림 발송 로직
    }

    override fun accept(before: CdcRecord?, after: CdcRecord?): Boolean {
        // 완료 이벤트에 대한 알림을 발송하는 핸들러이기 때문에, 완료 이벤트인지 확인하는 조건
        if (before == null || after == null ||
            before.status == Complete || after.status != Complete
        ) {
            return false
        }
        log.info("[알림서비스] 완료 이벤트 감지 `{}`", after.id)
        return true
    }
}

이제 특정 요청 건들의 상태가 변경되면 자동으로 알림을 받을 수 있다.

주의할점 #

  1. AWS Aurora 환경에서 쓰기 부하가 많은 경우 Debezium MySQL Connector를 연동하면 binlog dump thread가 Aurora MySQL 클러스터 스토리지의 binlog를 읽는데, 이때 락을 건다. Aurora MySQL 2.10.2 미만의 버전에서는 쓰기 부하가 많은 경우 부하가 심해질 수 있다. binlog dump thread의 부하가 심해지는 경우 INSERT, UPDATE, DELETE, COMMIT 등 DML 관련 레이턴시가 증가하게 되고, 이에 따라 장애가 발생할 수 있다.

  2. 중복 메시지 발생의 가능성 여러 가지 경우로 Kakfa 메시지는 중복될 수 있다.

해결 방법 : Redis Cache

class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    // ...
    @PostConstruct
    protected fun init() {
        disposable = sinks.asFlux()
            // ...
            // 이벤트를 처리 하는 과정에서 doCheckDuplicationPrevent 으로 중복 확인 
            .flatMap(::doCheckDuplicationPrevent)
            // ...
    }

    private fun doCheckDuplicationPrevent(cdcRecords: List<Envelope>): Mono<List<Envelope>> {
        return Mono.fromCallable {
            cdcRecords.filter {
                // HashCode를 이용한 RedisKey 생성
                val key = RedisCacheType.DUPLICATION_PREVENT.addPostfix(name = "${it.getAfter().getId()}:${it.hashCode()}")
                // 해당 Key가 이미 존재하는지 확인
                val existKey = redisTemplate.opsForValue().existKey(
                    key = key
                )
                log.debug(
                    "[CDC][EventEmitterSinks] Check Duplication Prevent. Key = `{}`, Value = `{}`",
                    key, existKey
                )
                (!existKey)
            }
        }.subscribeOn(Schedulers.boundedElastic())
    }
}

정리 #

CDC 키워드로 보게된 기술 블로그 포스팅이였는데, CDC에 대한 감을 잡는데에 도움이 된것같다. CDC, Kafka CDC, Debezium MySQL Connector 키워드에 대해서 좀더 공부해야할 필요를 느꼈다.