elk & kafka 기반의 로그 수집 및 모니터링 구축 (2)

zookeeper 와 kafka 설치 및 구성

본 포스팅에서는 지난 포스팅 의 아키텍쳐 중에서 zookeeperkafka 의 간단한 소개 및 설치, 구성에 대한 내용을 기술한다.

zookeeper 소개

주키퍼는 분산 시스템의 코디네이션 작업을 가능하게 하는 아파치 프로젝트 오픈소스이다. 코디네이션 작업은 여러 프로세스들에 대한 것인데, 이러한 작업의 목적은 프로세스들 간의 협력이나 경합을 조절하는 것이다.

  • 협력이란 프로세스들이 작업을 함께 수행해야 하고 다른 프로세스가 진행될 수 있도록 어떤 일을 해야 하는 것을 의미한다. 예를 들어 마스터 워커 구조에서 워커는 자신이 일을 할 수 있는 상태임을 마스터에게 알리고, 이에 마스터는 워커에게 작업을 할당한다.
  • 경합은 두 프로세스가 동시에 작업을 진행할 수 없는 상황을 의미한다. 그래서 한 프로세스는 다른 프로세스가 끝날 때까지 기다려야 하는데, 예를 들어 읽기/쓰기 락 또는 글로벌 락과 같은 것이 있다.

또한, 프로세스 상호 간의 진행 상황을 전달하기 위한 설정(configuration) 과 같은 메타데이터를 주키퍼를 통하여 공유하기도 한다.

주키퍼는 기본 요소를 표현하기 위해 파일 시스템의 트리(tree) 처럼 계층적으로 구성된 znode 라고 부르는 작은 데이터 노드를 사용하는데, 주키퍼의 API 는 이를 조작하는 일련의 작업(operation) 이다.

신규 znode 를 생성할 때는 모드를 명시하여 하는데 각 모드는 znode 의 동작 방법을 결정한다.

  • 영구 znode (persistent): 생성 후 명시적으로 delete API 가 호출되어야 삭제되는 znode 이다. 마스터가 작업을 할당하고 장애가 발생해도 워커에게 할당된 작업들은 유지되어야 한다. 이런 경우 영구 znode 가 유용하다.
  • 임시 znode (ephemeral): 임시 znode 를 생성한 클라이언트가 주키퍼와 연결이 끊어지거나 클라이언트에 장애가 발생하면 삭제되는 znode 이다. 마스터 워커 예시에서 마스터 znode임시 znode 이다. 만약 마스터에 문제가 발생했을 때 반드시 znode 도 제거되어야 하고 이런 경우 임시 znode 가 유용하다.
  • 순차 znode (sequential): 순차 znode 는 유일하면서 점차 증가하는 정수로 할당된다. 예를 들어 클라이언트가 순차 znode/tasks/task- 경로에 생성하는 경우, 주키퍼는 1 이라는 순차 번호를 할당하고 경로의 끝에 덧붙인다. 순차 znode 는 유일한 이름의 znode 를 생성하는 편리한 방법을 제공한다. 또한 znode 의 생성 순서도 쉽게 확인이 가능하다.

> 주키퍼 ZooKeeper 23 ~ 45p 참고 (에이콘출판사)

zookeeper 설치 및 구성

아파치 카프카는 컨슈머 클라이언트와 카프카 클러스터에 관한 메타데이터를 저장하기 위해 주키퍼를 사용한다. 주키퍼는 카프카 배포판에 포함되어 있어서 이를 사용해도 되지만 여기서는 별도로 다운로드하여 설치를 진행한다.

설치 대상 서버에 아래와 같은 작업을 진행한다 (서버 1 ~ 5)

1
2
3
4
5
6
7
8
9
$ wget http://apache.mirror.cdnetworks.com/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
$ tar -xvf apache-zookeeper-3.5.5-bin.tar.gz
$ mv apache-zookeeper-3.5.5-bin /usr/local/zookeeper
$ mkdir -p /var/lib/zookeeper
$ cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
> tickTime=2000
> dataDir=/var/lib/zookeeper
> clientPort=2181
> EOF

서버 한 대에서 주키퍼 설치 후 아래를 실행하면 독립 실행 모드로 동작한다.

1
2
3
$ /usr/local/zookeeper/bin/zkServer.sh start

Starting zookeeper ... STARTED

개발 환경에서는 하나의 주키퍼와 카프카를 이용하여 개발을 진행하는 것이 가능하겠지만, 운영 환경에서는 주키퍼와 카프카의 클러스터를 구성하여 사용하는 것이 일반적이다.

주키퍼의 클러스터는 앙상블(ensemble) 이라는 별도의 명칭이 있고, 하나의 앙상블은 여러 개의 서버(노드) 를 맴버로 가질 수 있다.

앙상블은 홀수 개의 서버를 맴버로 가지고, 앙상블의 서버 중 과반수가 작동 가능하다면 언제든지 요청 처리(데이터 읽기, 쓰기) 가 가능하다. 앙상블의 서버가 다섯 대로 구성된다면, 서버 한 대가 장애 중이라도 앙상블에 대한 유지보수 작업이 가능하므로 가능하다면 다섯 대 이상의 노드를 맴버로 가지도록 하자.

주키퍼 서버를 앙상블로 구성하려면 각 서버가 공통된 구성 파일을 가져야 한다. 또한, 각 서버는 자신의 ID 번호를 지정한 myid 파일을 데이터 디렉토리에 갖고 있어야 한다.
예를 들어 앙상블에 속한 서버들의 호스트 이름이 zoo1.example.com, zoo2.example.com, zoo3.example.com … 이라면 구성 파일의 내역은 다음과 같이 될 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
$ vi /usr/local/zookeeper/conf/zoo.cfg 이 후 아래의 내용 반영

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
server.4=zoo4.example.com:2888:3888
server.5=zoo5.example.com:2888:3888
  • X: 각 서버의 ID 번호이며 정수이다.
  • initLimit & tickTime: initLimit 는 팔로어가 리더에 접속할 수 있는 시간이며 tickTime 을 기준으로 설정된다. 여기서는 initLimit 가 20 * 2000(밀리초) = 40초가 된다.
  • clientPort: 클라이언트가 앙상블에 연결 시 사용할 port
  • syncLimit: 리더가 될 수 있는 팔로어의 최대 갯수를 나타냄
  • hostname: 각 서버의 호스트 이름이나 IP
  • peerPort: 앙상블의 서버들이 상호 통신하는 데 사용하는 TCP 포트 (default: 2888)
  • leaderPort: 리더를 선출하는 데 사용하는 TCP 포트 (default: 3888)

이 후 각 서버는 dataDir 에 지정된 디렉토리에 각각 myid 라는 이름의 파일을 가지고 있어야 한다.

1
2
3
4
5
zoo1.example.com 서버에서는 /var/lib/zookeeper/ 경로에 
myid 라는 파일이 존재해야 하고, myid 의 내용은 1 이 있어야 한다.

$ vi /var/lib/zookeeper/myid
1

여기까지 진행 후 5대의 서버에서 주키퍼를 각각 띄우면 앙상블 구성은 성공적으로 진행된 것이다.
(systemctl 를 이용하여 주키퍼를 서비스 등록 후 사용하는 것이 편리하다)

kafka 소개

카프카는 메시지 발행/구독 시스템이고 분산환경에 특화되어 설계되어 있다는 특징을 가지고 있다. 카프카의 데이터는 지속해서 저장하고 읽을 수 있고 시스템 장애에 대비하고 확장에 따른 성능 저하를 방지하기 위해 데이터가 분산 처리될 수 있다.

카프카의 데이터 기본 단위는 메시지(message)이고 이는 데이터베이스의 행(row)이나 레코드(record)에 비유될 수 있다. 카프카의 메시지는 바이트 배열의 데이터로 간주하므로 특정 형식이나 의미를 갖지 않는다.

카프카의 메시지 데이터는 토픽(topic)으로 분류된 파티션(partition)에 수록되는데, 이 때 데이터를 수록할 파티션을 결정하기 위해 일괸된 해시 값으로 키를 생성한다. 따라서 같은 키를 갖는 메시지는 항상 같은 파티션에 수록된다.

하나의 토픽은 여러 개의 파티션을 갖지만, 메시지 처리 순서는 토픽이 아닌 파티션별로 유지 관리된다. 또한 각 파티션은 서로 다른 서버에 분산되어 수평적인 확장이 가능하고 그렇기 때문에 단일 서버로 처리할 때보다 성능이 훨씬 우수하다.


카프카의 클라이언트는 기본적으로 프로듀서와 컨슈머라는 두 가지 형태가 있다.

  • 프로듀서는 새로운 메시지를 생성하고, 생성된 메시지는 특정 토픽의 파티션에 저장된다. 이 때 프로듀서는 메시지가 어떤 파티션에 수록되는지는 관여하지 않는다.
  • 컨슈머는 하나 이상의 토픽을 구독하여 메시지가 생성된 순서로 읽으며, 메시지의 오프셋(offset)을 유지하여 읽는 메시지의 위치를 알 수 있다. 그리고 주키퍼나 카프카에서는 각 파티션에서 마지막에 읽은 메시지의 오프셋을 저장하고 있으므로 컨슈머가 읽기를 중단했다 하더라도 다시 언제든 그 다음 메시지를 읽을 수 있다.

카프카의 장점은 다음과 같다

  • 다중 프로듀서: 여러 클라이언트가 많은 토픽을 사용하거나 같은 토픽을 같이 사용해도 카프카는 무리 없이 많은 프로듀서의 메시지를 처리할 수 있다. 즉, 여러 프로듀서가 하나의 토픽에 메시지 발급을 동시에 할 수 있다는 말이다.
  • 다중 컨슈머: 여러 컨슈머가 상호 간섭 없이 어떤 메시지 스트림도 읽을 수 있다. 이는 특정 메시지를 소비하면 다른 클라이언트에서 그 메시지를 사용할 수 없는 큐(queue) 시스템과는 다르다.
  • 디스크 기반의 보존: 카프카는 다중 컨슈머를 처리할 수 있을 뿐만 아니라 지속해서 메시지를 보존할 수도 있다. 따라서 컨슈머 애플리케이션이 항상 실시간으로 실행되지 않아도 된다. 메시지는 보존 옵션(보존 기간이나 토픽 크기)에 따라 디스크에 저장되어 보존된다. 또한 토픽별로 보존 옵션을 선택할 수도 있으므로 컨슈머의 요구에 맞게 메시지 보존 옵션을 가질 수 있다. 따라서 처리가 느리거나 접속 폭주로 인해 컨슈머가 메시지를 읽는 데 실패하더라도 데이터가 유실될 위험이 없다.
  • 확장성: 처음에는 검증의 목적으로 하나의 브로커로 시작하되 점차적으로 브로커의 수를 늘려 클러스터의 크기를 키울 수 있다. 확장 작업은 시스템 전체의 사용에 영향을 주지 않고 클러스터가 온라인 상태일때 도 수행될 수 있다.

> 카프카 핵심 가이드 4 ~ 11p 참고 (제이펍출판사)


kafka 설치 및 구성

설치 대상 서버에 아래와 같은 작업을 진행한다 (서버 6 ~ 8)

1
2
3
4
5
6
7
8
9
10
11
12
$ wget http://apache.mirror.cdnetworks.com/kafka/2.3.0/kafka_2.11-2.3.0.tgz 
$ tar -xvf kafka_2.11-2.3.0.tgz
$ mv kafka_2.11-2.3.0 /usr/local/kafka
$ mkdir /data/kafka-logs
$ vi /usr/local/kafka/config/server.properties

[server.properties]
broker.id=1
log.dirs=/data/kafka-logs
zookeeper.connect=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181,zoo4.example.com:2181,zoo5.example.com:2181/kafka
num.recovery.threads.per.data.dir=4
log.retention.hours=672
  • broker.id: 모든 카프카 브로커는 broker.id 에 설정하는 정수로 된 번호(식별자)를 가져야 한다. 단, 하나의 카프카 클러스터 내에서는 고유한 값이어야 한다.
  • log.dirs: 카프카는 모든 메시지를 로그 세그먼트(segment) 파일을 모아서 디스크에 저장한다. 이 때 쉼표를 이용하여 여러 경로를 지정할 수 있다.
  • zookeeper.connect: 브로커의 메타데이터를 저장하기 위해 사용되는 주키퍼의 위치를 나타낸다. 기본 구성값은 localhost:2181 이고, 호스트이름:포트/경로 의 형식으로 지정할 수 있다. 예시의 뒷 부분에 /kafka 가 있음을 유의하자
  • num.recovery.threads.per.data.dir: 브로커의 시작 또는 종료 시의 로그를 복구할 때 사용할 스레드의 갯수를 말한다. 이 때 log.dirs 에 지정된 경로가 3 이고 num.recovery.threads.per.data.dir 가 8 이라면 전체 스레드의 갯수는 24가 된다.
  • log.retention.hours: 카프카가 얼마 동안 메시지를 보존할지를 설정한다.

이후 각 서버에서 카프카를 실행하면 3대의 브로커로 이루어지는 카프카 클러스터가 형성된다고 볼 수 있다.

1
2
$ /usr/local/kafka/bin/kafka-server-start.sh \
-daemon /usr/local/kafka/config/server.properties

정상적인 구성의 확인을 위해 카프카 클러스터에 토픽을 생성하고 메시지를 읽고 쓰는 테스트를 진행하면 다음과 같다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- 1. 토픽 생성
$ /usr/local/kafka/kafka-topics.sh --create \
--zookeeper zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181,zoo4.example.com:2181,zoo5.example.com:2181/kafka \
--replication-factor 1 --partitions 1 --topic test

- 2. 토픽 확인
$ /usr/local/kafka/kafka-topics.sh \
--zookeeper zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181,zoo4.example.com:2181,zoo5.example.com:2181/kafka \
--describe --topic test

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2

- 3. 메시지 저장
$ /usr/local/kafka/kafka-console-producer.sh \
--broker-list kafka-server1:9092,kafka-server2:9092,kafka-server3:9092 \
--topic test

Test Message1
Test Message2
^C

- 4. 메시지 읽기
$ /usr/local/kafka/kafka-console-consumer.sh
--bootstrap-server kafka-server1:9092,kafka-server2:9092,kafka-server3:9092 \
--topic test --from-beginning

Test Message 1
Test Message 2

여기까지 문제 없이 진행되었다면 3대의 브로커로 이루어진 카프카 클러스터가 정상적으로 설치되었다고 볼 수 있다.
(systemctl 를 이용하여 카프카를 서비스 등록 후 사용하는 것이 편리하다)


logback 과 kafka 를 활용한 로그 수집

이제 kafka 를 메시지 브로커로 활용하여 서비스의 로그 및 메트릭 수집을 진행하도록 하자

우선 해당 포스팅에서는

  1. 로깅용 kafka topic 을 생성한 후에
  2. logback 의 kafka appender 를 활용하여 kafka 에 로그를 저장

하는 과정을 진행한다.

이렇게 저장된 로그는 추후 logstash 를 통하여 elasticsearch 쪽으로 전달되고 kibana 를 통하여 visualizing 된 로그를 보게 될 것이다.

로깅용 kafka topic 생성

카프카 클러스터 내에서 토픽의 크기가 확장되는 방법이 파티션이다. 따라서 브로커가 추가될 때 클러스터 전체에 걸쳐 메시지가 고르게 저장되도록 파티션 갯수를 설정하는 것이 중요한데, 보통 클러스터의 브로커 수와 같게 하거나 배수로 토픽의 파티션 개수를 설정한다. 이렇게 하면 브로커마다 파티션이 고르게 분산될 수 있으며, 저장 메시지도 고르게 분산될 것이다.

대개 프로듀서는 컨슈머보다 훨씬 빠르게 처리되므로 처리량을 조사하지 않아도 무방하고, 파티션 하나는 항상 한 컨슈머가 소비한다. 따라서 컨슈머의 처리 속도와 목표 처리량을 산정하여 파티션 개수를 산정해야 한다.

필자는 우선 브로커 3개에서 토픽당 파티션 3개, 복제 팩터도 3을 설정하여 로깅용 토픽을 구성하였다. 지난 포스팅에서 기술했듯이 초당 800여건 / 분당 5만여건의 메시지를 무리없이 처리하는 것을 확인하였고, 메트릭 수집을 통하여 지속적으로 확인할 것이기에 현재 구성에서 큰 이슈는 없을 것으로 판단하고 있다.

아래의 명령어로 로깅용 토픽을 생성한다.

1
2
3
$ /usr/local/kafka/kafka-topics.sh --create \
--zookeeper zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181,zoo4.example.com:2181,zoo5.example.com:2181/kafka \
--replication-factor 3 --partitions 3 --topic log-v1

logback 의 kafka appender 를 활용한 로그 저장

필자가 주로 사용하는 개발 환경은 java + spring boot 라서 logback 설정으로 kafka 에 로그를 저장하는 내용을 기술하지만, 카프카에 메시지를 저장하는 방법은 다양할 것이므로… 알아서 하실 것으로 기대한다.
도움이 되실 내용만 확인하셔도 된다.

logback 에서 사용 가능한 custom appender 를 구현하려면 AppenderBase 를 상속하여 자신만의 appender 를 구현하는 방법이 간단하지만, 이미 검증되어 충분히 사용되고 있는 누군가의 구현체를 활용하는 것도 아주 좋은 방법이다. (나도 언젠가는 오픈소스 생태계에 약간이나마 공헌할 날이 오겠지)

필자의 시스템에서는 logback 을 통하여 kafka 에 메시지를 저장하는 logback-kafka-appender 를 활용한다.

우선 build.gradle 에 아래와 같이 선언한다.

1
2
3
4
5
dependencies {
implementation(group = "com.github.danielwegener", name = "logback-kafka-appender", version = "0.1.0")
implementation(group = "net.logstash.logback", name = "logstash-logback-encoder", version = "6.2")
...
}

이후 logback.xml 에서 아래와 같이 선언한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<!-- logback 예시  -->
<appender name="LOG-KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
{
"@timestamp":"%d{"yyyy-MM-dd'T'HH:mm:ss.SSS'+09:00'"}",
"service":"example-server",
"phase":"live",
"level":"%level",
"thread":"%thread",
"eventId":"%mdc{key-event-id:--}",
"src":"%logger{0}.%M\(%L\)",
"message":"%msg",
"exception": "%ex{200}",
"host": "%contextName"
}
</pattern>
</layout>
</encoder>
<topic>log-v1</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>compression.type=snappy</producerConfig>
<!-- <producerConfig>linger.ms=100</producerConfig>-->
<!-- <producerConfig>batch.size=327680</producerConfig>-->
<producerConfig>retries=1</producerConfig>
<producerConfig>max.block.ms=1000</producerConfig>
<producerConfig>bootstrap.servers=kafka1.server:9092,kafka2.server:9092,kafka3.server:9092</producerConfig>
</appender>

...

<logger name="com.example.service" level="INFO" additivity="false">
<appender-ref ref="LOG-KAFKA"/>
</logger>

logback 설정에서 topic 은 kafka 에서 로깅 목적으로 생성한 토픽명을 기술하고, producerConfig 중 bootstrap.servers 는 kafka 브로커의 주소를 기술한다.
그 외 자세한 설정은 다음의 링크를 참고한다.

logback 설정 이후 서버를 실행하여 지정된 로거로 로깅이 될 때, KafkaAppender 를 통하여 카프카 클러스터에 메시지가 전달되고, 추후 logstash 를 통하여 kafka 에서 메시지를 꺼낼 수 있게 된다. 이렇게 획득한 로그 데이터를 elasticsearch 에 실시간으로 저장한다면, 목적하는 로깅 및 모니터링이 가능한 기반이 시작된다고 할 수 있다.


다음 포스팅부터는 elasticsearch 및 logstash, kibana 에 대해 이야기하겠다.

참고자료 및 사이트