ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Logstash] Input Kafka plugin, IllegalStateException: No entry found for connection xxx
    Troubleshooting 2021. 9. 30. 18:27
    반응형

    개요

    다음 메트릭 데이터 수집 파이프라인 구성 중에 발생한 문제이다.

    • CollectD(x3) -> logstash -> kafka(x3)

    운영 환경은 다음과 같다.

    • OS: Amazon Linux2 (x7, collectd x3, logstash, kafka x3)
    • instance type: m5.large
    • Kafka의 모든 노드는 kafka, Zookeeper가 설치되어 있으며 클러스터로 구성되어 있다.

    문제 상황

    Kafka에 제대로 된 데이터 수집이 안되고 있어서 Logstash의 로그를 확인해 보았다. (/var/log/logstash/input.log에서 CollectD에 데이터가 수집되는 것을 확인하였다.)

    $ tail -f /var/log/logstash/logstash-plain.log
    ...
    [2021-09-30T08:50:34,929][ERROR][org.apache.kafka.clients.producer.internals.Sender] [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread:
    java.lang.IllegalStateException: No entry found for connection 2
        at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
    ...

     

    현재 문제가 되는 Logstash의 구성 파일은 다음과 같다.

     

    /etc/logstash/conf.d/collectd-to-kafka.conf

    input {
      udp {
         port => 25826
         buffer_size => 1452
         codec => collectd { }
      }
    }
    
    
    output {
      file {
        path => "/var/log/logstash/input.log"
        codec => rubydebug
      }
      kafka {
        bootstrap_servers => "10.0.0.10:9092,10.0.0.11:9092,10.0.0.12:9092" 
        codec => json
        topic_id => "system_metric"
      }
    }

    문제 해결

    다행히, 같은 문제에 대해서 원인을 찾고 해결한 문서를 찾을 수 있었다. 결론만 내리자면, Logstash가 사용하는 카프카 클라이언트 라이브러리는 Kafka의 노드들을 찾을 때, 구성했던 노드 이름으로 찾게 된다. 내가 구성했던 카프카 클러스터의 정보는 다음과 같다.

     

    /home/ec2-user/apps/kafka/config/server.properties

    broker.id=0
    listeners=PLAINTEXT://:9092
    advertised.listeners=PLAINTEXT://node1:9092
    zookeeper.connect=node1:2181,node2:2181,node3/test
    log.dirs=/tmp/kafka-logs-0

     

    이 때, node*/etc/hosts에 private ip와 매핑되게끔 설정해두었다. 따라서 Logstash의 호스트에서도 같은 작업을 진행하면 된다. /etc/hosts에 private ip와 Kafka 노드의 이름을 알맞게 매핑시켜준다.

     

    /etc/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost6 localhost6.localdomain6
    
    10.0.0.11 node01
    10.0.0.12 node02
    10.0.0.13 node03

     

    그 후 구성 파일을 다음과 같이 수정한다.

     

    /etc/logstash/conf.d/collectd-to-kafka.conf

    input {
      udp {
         port => 25826
         buffer_size => 1452
         codec => collectd { }
      }
    }
    
    
    output {
      # 이 설정은 디버깅 용이다. 나중에 삭제하는 것이 좋다.
      file {
        path => "/var/log/logstash/input.log"
        codec => rubydebug
      }
      kafka {
        bootstrap_servers => "node1:9092,node2:9092,node3:9092" # kafka 클러스터의 모든 노드
        codec => json
        topic_id => "system_metric"
      }
    }

     

    그 후 Logstash를 재시작한다.

    $ sudo systemctl restart logstash

     

    이제 다시 로그를 확인해보면 에러 문구 없이 정상적으로 CollectD에서 수집한 데이터를 Kafka로 넘기는 것을 확인할 수 있다.

    $ tail -f /var/log/logstash/logstash-plain.log
    ....
    [2021-09-30T08:54:52,674][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
    [2021-09-30T08:54:52,675][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
    [2021-09-30T08:54:52,728][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2b737e75 run>"}
    [2021-09-30T08:54:52,814][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2021-09-30T08:54:52,950][INFO ][logstash.inputs.udp      ] Starting UDP listener {:address=>"0.0.0.0:25826"}
    [2021-09-30T08:54:53,045][INFO ][logstash.inputs.udp      ] UDP listener started {:address=>"0.0.0.0:25826", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
    [2021-09-30T08:54:53,161][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    [2021-09-30T08:54:54,743][INFO ][logstash.outputs.file    ] Opening file {:path=>"/var/log/logstash/input.log"}
    [2021-09-30T08:54:55,096][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: CSLOEmwKT72WIXFuiNNGBg

    참고

Designed by Tistory.