[Logstash] Input Kafka plugin, IllegalStateException: No entry found for connection xxx

개요
다음 메트릭 데이터 수집 파이프라인 구성 중에 발생한 문제이다.
- CollectD(x3) -> logstash -> kafka(x3)
운영 환경은 다음과 같다.
- OS: Amazon Linux2 (x7, collectd x3, logstash, kafka x3)
- instance type: m5.large
- Kafka의 모든 노드는 kafka, Zookeeper가 설치되어 있으며 클러스터로 구성되어 있다.
문제 상황
Kafka
에 제대로 된 데이터 수집이 안되고 있어서 Logstash
의 로그를 확인해 보았다. (/var/log/logstash/input.log
에서 CollectD
에 데이터가 수집되는 것을 확인하였다.)
$ tail -f /var/log/logstash/logstash-plain.log
...
[2021-09-30T08:50:34,929][ERROR][org.apache.kafka.clients.producer.internals.Sender] [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread:
java.lang.IllegalStateException: No entry found for connection 2
at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
...
현재 문제가 되는 Logstash
의 구성 파일은 다음과 같다.
/etc/logstash/conf.d/collectd-to-kafka.conf
input {
udp {
port => 25826
buffer_size => 1452
codec => collectd { }
}
}
output {
file {
path => "/var/log/logstash/input.log"
codec => rubydebug
}
kafka {
bootstrap_servers => "10.0.0.10:9092,10.0.0.11:9092,10.0.0.12:9092"
codec => json
topic_id => "system_metric"
}
}
문제 해결
다행히, 같은 문제에 대해서 원인을 찾고 해결한 문서를 찾을 수 있었다. 결론만 내리자면, Logstash
가 사용하는 카프카 클라이언트 라이브러리는 Kafka
의 노드들을 찾을 때, 구성했던 노드 이름으로 찾게 된다. 내가 구성했던 카프카 클러스터의 정보는 다음과 같다.
/home/ec2-user/apps/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node1:9092
zookeeper.connect=node1:2181,node2:2181,node3/test
log.dirs=/tmp/kafka-logs-0
이 때, node*
은 /etc/hosts
에 private ip와 매핑되게끔 설정해두었다. 따라서 Logstash
의 호스트에서도 같은 작업을 진행하면 된다. /etc/hosts
에 private ip와 Kafka
노드의 이름을 알맞게 매핑시켜준다.
/etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost6 localhost6.localdomain6
10.0.0.11 node01
10.0.0.12 node02
10.0.0.13 node03
그 후 구성 파일을 다음과 같이 수정한다.
/etc/logstash/conf.d/collectd-to-kafka.conf
input {
udp {
port => 25826
buffer_size => 1452
codec => collectd { }
}
}
output {
# 이 설정은 디버깅 용이다. 나중에 삭제하는 것이 좋다.
file {
path => "/var/log/logstash/input.log"
codec => rubydebug
}
kafka {
bootstrap_servers => "node1:9092,node2:9092,node3:9092" # kafka 클러스터의 모든 노드
codec => json
topic_id => "system_metric"
}
}
그 후 Logstash
를 재시작한다.
$ sudo systemctl restart logstash
이제 다시 로그를 확인해보면 에러 문구 없이 정상적으로 CollectD
에서 수집한 데이터를 Kafka
로 넘기는 것을 확인할 수 있다.
$ tail -f /var/log/logstash/logstash-plain.log
....
[2021-09-30T08:54:52,674][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2021-09-30T08:54:52,675][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2021-09-30T08:54:52,728][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2b737e75 run>"}
[2021-09-30T08:54:52,814][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-09-30T08:54:52,950][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:25826"}
[2021-09-30T08:54:53,045][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:25826", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
[2021-09-30T08:54:53,161][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-09-30T08:54:54,743][INFO ][logstash.outputs.file ] Opening file {:path=>"/var/log/logstash/input.log"}
[2021-09-30T08:54:55,096][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: CSLOEmwKT72WIXFuiNNGBg