운영 환경에서 PostgreSQL 데이터를 Elasticsearch로 전송하는 과정에서 Bulk Insert 작업이 어떻게 이루어지는지 궁금할 때가 있습니다. 데이터는 정상적으로 조회되지만 정작 인스턴스 내부에서는 명확한 실행 스크립트나 배치 파일이 보이지 않더라고요.🤔
이러한 경우, 데이터가 어떤 방식으로 처리되는지 파악하는 것이 중요하기 때문에 이번 기회에 로그스태시에서 기존 DB에 있는 데이터를 Elasticsearch에 대량 추가하는 방법을 알아보려고 합니다.
우선, /etc/logstach/conf.d/ 디렉터리를 확인하는 것이 가장 먼저 해야 할 일입니다. 로그스태시는 데이터를 수집하고 변환한 후 Elasticsearch로 전달하는 역할을 합니다. 이 디렉터리 내 .conf 파일을 살펴보면, input, filter, output 섹션이 정의되어 있으며, 특히 output 부분에서 elasticsearch 설정을 확인할 수 있어요.
PostgreSQL에서 데이터 가져오기 (input 설정)
Logstash에서 데이터를 PostgreSQL로부터 가져오기 위해서는 jdbc 플러그인을 사용해야 합니다. 이를 통해 SQL 쿼리를 작성하여 데이터베이스에서 필요한 데이터를 주기적으로 조회할 수 있습니다.
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://your-database-host:5432/your_database"
jdbc_user => "your_user"
jdbc_password => "your_password"
jdbc_driver_library => "/path/to/postgresql/driver.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value"
schedule => "*/5 * * * *" # 5분마다 실행
}
}
이 설정은 Logstash가 PostgreSQL에서 데이터를 주기적으로 조회하도록 구성합니다. statement 부분에서 updated_at > :sql_last_value 조건을 사용하여 마지막으로 동기화된 시간 이후의 데이터만 가져오도록 설정할 수 있습니다.
- jdbc_connection_string: PostgreSQL 데이터베이스에 연결하기 위한 JDBC URL을 지정합니다. your-database-host와 your_database는 실제 데이터베이스의 호스트와 이름으로 교체해야 합니다.
- jdbc_user와 jdbc_password: 데이터베이스에 접근할 수 있는 사용자 정보입니다.
- jdbc_driver_library: PostgreSQL JDBC 드라이버의 경로를 지정합니다. 이를 통해 Logstash가 PostgreSQL에 연결할 수 있도록 합니다.
- statement: SQL 쿼리를 작성하여 데이터를 추출합니다. :sql_last_value는 마지막 동기화 시점을 기준으로 데이터를 조회할 수 있도록 하며, Logstash는 이 값을 저장하고 다음 실행 시 사용합니다. 이렇게 하면 데이터를 중복으로 전송하지 않게 됩니다.
- schedule: 주기적인 실행을 설정합니다. */5 * * * *는 매 5분마다 실행되도록 합니다.
Elasticsearch로 데이터 전송하는 Output 설정
Logstash가 PostgreSQL에서 데이터를 조회한 후, 이 데이터를 Elasticsearch로 전송하기 위한 output 설정은 다음과 같습니다.
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "your_index"
document_id => "%{id}"
action => "index" # Bulk Insert가 기본값
}
}
위 설정에서 hosts는 Elasticsearch의 주소를 의미하며, index는 데이터를 저장할 인덱스를 지정합니다. document_id는 데이터의 고유한 ID를 설정하여 중복 삽입을 방지할 수 있습니다. action 값이 index이므로 기본적으로 Bulk Insert 방식으로 동작하게 됩니다.
- hosts: Elasticsearch의 주소를 지정합니다. 여기서는 로컬호스트를 사용하지만 실제 환경에서는 Elasticsearch 서버의 IP 또는 DNS를 지정해야 합니다.
- index: 데이터를 저장할 Elasticsearch의 인덱스를 지정합니다. 이 인덱스는 데이터를 검색할 때 중요한 역할을 합니다.
- document_id: Elasticsearch에서 문서의 고유 ID를 지정할 수 있습니다. "%{id}"처럼 Logstash 필드 값을 활용하여 동적으로 설정할 수 있습니다. 이를 통해 데이터 중복을 방지할 수 있습니다.
- action: 데이터 삽입 작업을 정의하는 파라미터입니다. 기본값인 index는 데이터를 Elasticsearch에 삽입할 때 Bulk Insert 방식으로 처리합니다. 즉, 여러 개의 데이터를 한 번에 효율적으로 처리합니다.
데이터 변환을 위한 Filter 설정
때때로, 데이터가 Elasticsearch에 적합한 형태로 바로 전달되지 않을 수 있습니다. 이럴 때는 filter를 사용하여 데이터를 변환할 수 있습니다. 예를 들어, 필드명을 변경하거나 데이터 타입을 변환할 수 있습니다.
filter {
mutate {
rename => { "old_field_name" => "new_field_name" }
convert => { "numeric_field" => "integer" }
}
}
위 설정을 통해 old_field_name을 new_field_name으로 변경하고, numeric_field 값을 정수형(integer)으로 변환할 수 있습니다.
설정 파일을 수정한 후에는 Logstash를 다시 시작해야 합니다.
- mutate: Logstash에서 필드를 변형할 때 사용합니다. rename을 통해 기존 필드명을 새로운 이름으로 변경하고, convert를 통해 데이터 타입을 변환합니다. 이 예시에서는 numeric_field 필드를 정수형(integer)으로 변환하는 설정을 보여줍니다.
- 데이터 변환을 통해 Elasticsearch에 적합한 형태로 데이터를 맞출 수 있습니다.
Logstash 재시작
sudo systemctl restart logstash
또한 설정이 정상적으로 적용되었는지 확인하려면 Logstash 로그 파일을 확인하는 것이 중요합니다.
tail -f /var/log/logstash/logstash-plain.log
이렇게 설정을 점검하고, 적절한 필터링 및 변환 과정을 거치면 PostgreSQL 데이터를 Elasticsearch로 효율적으로 전송할 수 있습니다.