본문 바로가기

DevOps/Elastic Stack

[Logstash] Logstash를 활용한 PostgreSQL 데이터 Elasticsearch에 Bulk Insert 작업 설정 - 컴도리돌이

728x90
728x90

운영 환경에서 PostgreSQL 데이터를 Elasticsearch로 전송하는 과정에서 Bulk Insert 작업이 어떻게 이루어지는지 궁금할 때가 있습니다. 데이터는 정상적으로 조회되지만 정작 인스턴스 내부에서는 명확한 실행 스크립트나 배치 파일이 보이지 않더라고요.🤔

이러한 경우, 데이터가 어떤 방식으로 처리되는지 파악하는 것이 중요하기 때문에 이번 기회에 로그스태시에서 기존 DB에 있는 데이터를 Elasticsearch에 대량 추가하는 방법을 알아보려고 합니다. 

 

우선, /etc/logstach/conf.d/ 디렉터리를 확인하는 것이 가장 먼저 해야 할 일입니다. 로그스태시는 데이터를 수집하고 변환한 후 Elasticsearch로 전달하는 역할을 합니다. 이 디렉터리 내 .conf 파일을 살펴보면, input, filter, output 섹션이 정의되어 있으며, 특히 output 부분에서 elasticsearch 설정을 확인할 수 있어요. 

PostgreSQL에서 데이터 가져오기 (input 설정)

Logstash에서 데이터를 PostgreSQL로부터 가져오기 위해서는 jdbc 플러그인을 사용해야 합니다. 이를 통해 SQL 쿼리를 작성하여 데이터베이스에서 필요한 데이터를 주기적으로 조회할 수 있습니다.

input {
  jdbc {
    jdbc_connection_string => "jdbc:postgresql://your-database-host:5432/your_database"
    jdbc_user => "your_user"
    jdbc_password => "your_password"
    jdbc_driver_library => "/path/to/postgresql/driver.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value"
    schedule => "*/5 * * * *"  # 5분마다 실행
  }
}

 

이 설정은 Logstash가 PostgreSQL에서 데이터를 주기적으로 조회하도록 구성합니다. statement 부분에서 updated_at > :sql_last_value 조건을 사용하여 마지막으로 동기화된 시간 이후의 데이터만 가져오도록 설정할 수 있습니다.

  • jdbc_connection_string: PostgreSQL 데이터베이스에 연결하기 위한 JDBC URL을 지정합니다. your-database-host와 your_database는 실제 데이터베이스의 호스트와 이름으로 교체해야 합니다.
  • jdbc_userjdbc_password: 데이터베이스에 접근할 수 있는 사용자 정보입니다.
  • jdbc_driver_library: PostgreSQL JDBC 드라이버의 경로를 지정합니다. 이를 통해 Logstash가 PostgreSQL에 연결할 수 있도록 합니다.
  • statement: SQL 쿼리를 작성하여 데이터를 추출합니다. :sql_last_value는 마지막 동기화 시점을 기준으로 데이터를 조회할 수 있도록 하며, Logstash는 이 값을 저장하고 다음 실행 시 사용합니다. 이렇게 하면 데이터를 중복으로 전송하지 않게 됩니다.
  • schedule: 주기적인 실행을 설정합니다. */5 * * * *는 매 5분마다 실행되도록 합니다.

Elasticsearch로 데이터 전송하는 Output 설정

Logstash가 PostgreSQL에서 데이터를 조회한 후, 이 데이터를 Elasticsearch로 전송하기 위한 output 설정은 다음과 같습니다.

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "your_index"
    document_id => "%{id}"
    action => "index"  # Bulk Insert가 기본값
  }
}

 

위 설정에서 hosts는 Elasticsearch의 주소를 의미하며, index는 데이터를 저장할 인덱스를 지정합니다. document_id는 데이터의 고유한 ID를 설정하여 중복 삽입을 방지할 수 있습니다. action 값이 index이므로 기본적으로 Bulk Insert 방식으로 동작하게 됩니다.

  • hosts: Elasticsearch의 주소를 지정합니다. 여기서는 로컬호스트를 사용하지만 실제 환경에서는 Elasticsearch 서버의 IP 또는 DNS를 지정해야 합니다.
  • index: 데이터를 저장할 Elasticsearch의 인덱스를 지정합니다. 이 인덱스는 데이터를 검색할 때 중요한 역할을 합니다.
  • document_id: Elasticsearch에서 문서의 고유 ID를 지정할 수 있습니다. "%{id}"처럼 Logstash 필드 값을 활용하여 동적으로 설정할 수 있습니다. 이를 통해 데이터 중복을 방지할 수 있습니다.
  • action: 데이터 삽입 작업을 정의하는 파라미터입니다. 기본값인 index는 데이터를 Elasticsearch에 삽입할 때 Bulk Insert 방식으로 처리합니다. 즉, 여러 개의 데이터를 한 번에 효율적으로 처리합니다.

데이터 변환을 위한 Filter 설정

때때로, 데이터가 Elasticsearch에 적합한 형태로 바로 전달되지 않을 수 있습니다. 이럴 때는 filter를 사용하여 데이터를 변환할 수 있습니다. 예를 들어, 필드명을 변경하거나 데이터 타입을 변환할 수 있습니다.

filter {
  mutate {
    rename => { "old_field_name" => "new_field_name" }
    convert => { "numeric_field" => "integer" }
  }
}

 

위 설정을 통해 old_field_namenew_field_name으로 변경하고, numeric_field 값을 정수형(integer)으로 변환할 수 있습니다.

설정 파일을 수정한 후에는 Logstash를 다시 시작해야 합니다. 

  • mutate: Logstash에서 필드를 변형할 때 사용합니다. rename을 통해 기존 필드명을 새로운 이름으로 변경하고, convert를 통해 데이터 타입을 변환합니다. 이 예시에서는 numeric_field 필드를 정수형(integer)으로 변환하는 설정을 보여줍니다.
  • 데이터 변환을 통해 Elasticsearch에 적합한 형태로 데이터를 맞출 수 있습니다.

Logstash 재시작

sudo systemctl restart logstash

 

또한 설정이 정상적으로 적용되었는지 확인하려면 Logstash 로그 파일을 확인하는 것이 중요합니다. 

tail -f /var/log/logstash/logstash-plain.log

 

이렇게 설정을 점검하고, 적절한 필터링 및 변환 과정을 거치면 PostgreSQL 데이터를 Elasticsearch로 효율적으로 전송할 수 있습니다. 


728x90
728x90