elasticsearch & spring boot & webflux
elasticsearch 6.3.0
elasticsearch 가 6.3.0 으로 올라갔다. 여러가지 기능들이 추가되었는데, 흥미로운 부분은 내부 client 로 SQL 문법을 지원한다는것이다. 생각보다 놀랍게 동작한다! 메인 버전이 올라가면서 java client 버전도 함께 올라갔는데 드디어 java rest high level client 가 쓸만한 모듈이 되었다는 점이다. 예전까지는 java client 만 사용해왔었는데 이젠 슬슬 넘어갈 수 있을 것같아서 후딱 한번 사용해보았다.
RestHighLevelClient
이전 포스트 에서 살짝 훑어만 봤었는데 elasticsearch java client 가 6.3.0 으로 업데이트 되면서 대부분의 기능들을 사용할 수 있게 되었다. 언제나 릴리즈되나 눈빠지게 기다리고 있었는데 6.3.0 이 릴리즈 되자마자 한번 적용해보았다.
async
RestHighLevelClient 에서는 대부분의 메소드에 async 를 지원한다. spring webFlux 를 공부하면서 elasticsearch 가 async 를 지원하지 않아서 반쪽짜리로 사용하고 있었는데, 함께 사용해보니 궁합이 꽤나 잘맞았다.
오늘의 목표
이번 포스팅은 webFlux 와 elasticsearch 가 짬뽕이 되었다. 두 꼭지에서 꼭 알고 가야하는 부분만 짚고 가보자. 사용할 제원(?)은 다음과 같다.
- spring boot 2.0.3
- spring boot webFlux
- elasticsearch RestHighLevelClient (6.3.0)
shut up and code
setting
build.gradle
buildscript {
ext {
springBootVersion = '2.0.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
...
dependencies {
// for webFlux
compile('org.springframework.boot:spring-boot-starter')
compile('org.springframework.boot:spring-boot-starter-webflux') // 1
compileOnly("org.springframework.boot:spring-boot-configuration-processor")
// for elasticsearch
compile('org.elasticsearch.client:elasticsearch-rest-high-level-client:6.3.0')
compile('org.apache.logging.log4j:log4j-core:2.9.1')
compile('org.elasticsearch:elasticsearch:6.3.0') //2
}
프로젝트는 gradle 로 구성했다. 개인적으로는 maven 보다 가독성이 좋고 문법이 간결하다고 생각한다. xml 을 그닥 좋아하지 않는 경향도 있고, 뎁스가 깊어지거나 설정자체가 길어지면 한눈에 어떤 dependency 가 걸려있는지 파악하기가 쉽지 않기 때문이다. 위의 설정에서 짚고 넘어갈 부분이 있다.
- spring boot starter 에는 web 과 webflux 가 있다. web 은
org.springframework.boot:spring-boot-starter-web
이렇게 사용하는데 web 은 기본적으로 tomcat 으로 동작하고, webflux 는 netty 로 was 가 동작한다. 여기에서 주의할 점은 web과 wenflux 를 함께 넣으면 기본적으로 web 으로 동작하고 webflux 는 무시된다. 해당 내용은 여기 에서 확인할 수 있다. org.elasticsearch.client:elasticsearch-rest-high-level-client:6.3.0
만 추가하면 기본적으로 elasticsearch 5.6.x 버전의 client 가 dependency 로 따라온다. 기본적으로 사용함에 있어서는 큰 문제는 없지만 일부 기능들이 호환이 잘 안되서 엉뚱하게 동작하는 경우가 있다. 이는 rest client 로 사용했을 때에도 비슷한 상황이 있었는데 명시적으로 elasticsearch client 버전을 명시해주면 원하는 버전의 client 를 사용할 수 있다. 되도록 rest high level client 와 동일한 버전을 사용하는걸 추천한다.
ElasticsearchConfig.java
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient client(ElasticsearchProperties properties) {
return new RestHighLevelClient(
RestClient.builder(properties.hosts())
);
}
}
configuration 의 기본 설정은 rest client 와 기본 골격은 비슷하게 생겼다. (설정이나 다른 부분들이 몇가지 있지만) 크게 다른 점을 하나 꼽자면 rest high level client 부터는 tcp connection 이 아닌 http connection 을 맺는다. 그래서 기본 (elasticsearch port 인) 9200 을 사용한다. rest client 에서는 InetSocketTransportAddress
를 생성해서 builder 에 전달을 했었지만 restHighLevelClient 부터는 HttpHost
를 전달해서 생성하는 부분도 눈여겨 볼만한 부분이다. elasticsearch 설정 관련 클래스는 ElasticsearchProperties 로 묶었다. @Value
나 ConfigurationProperties
annotation 으로 configuration 에서 직접 설정도 가능하지만, 특정 설정값을 service 에서 사용하기 편하기 위해 클래스를 분리했다.
ElasticsearchProperties.java
@Component
@Setter
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticsearchProperties {
private List<String> hosts;
public HttpHost[] hosts() {
return hosts.stream().map(HttpHost::create).toArray(HttpHost[]::new);
}
}
@Value
annotation 으로 설정해주어도 되지만, 이름이 완전히 같을 때만 설정값을 읽어들인다. @ConfigurationProperties
annotation 은 Relaxed Binding 이라는 느슨한(?) 룰이 있어 각자 입맛에 맞는 포멧으로 써도 properties class 에서 찰떡같이 알아 먹는다. (자신만의 스타일을 고집하는 팀에서는 나름 유용하게 사용될지는 모르겠지만 남용은 하지 말자. 버그가 생길 수 있는 포인트이다.)
application.yml
elasticsearch:
hosts: http://localhost:9200
설정파일은 이전보다 심플해졌다. 심지어 cluster name 을 묻지도 따지지도 않는다. 연결을 맺을 때 굳이 너의 이름을 몰라도 상관없다는 뜻이였을까.
service
document 의 생성과 조회를 만들어 보자. (update 와 delete 는 index 와 get/match 와 비슷하기 때문에 생략한다.)
index
public Mono<Void> index(String index, String type, String userName, String message) {
Gson gson = GsonUtil.gson();
User user = new User();
user.setUser(userName);
user.setMessage(message);
user.setPostDate(new Date());
IndexRequest indexRequest = new IndexRequest(index, type);
indexRequest.source(gson.toJson(user), XContentType.JSON);
return Mono.create(sink -> { //1
restHighLevelClient.indexAsync(indexRequest, new ActionListener<IndexResponse>() { //2
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("index success : "+indexResponse.toString());
sink.success();
}
@Override
public void onFailure(Exception e) {
log.error("index error ", e);
sink.error(e);
}
});
});
}
index 메소드는 특정한 반환값을 필요로 하지 않기 때문에 Mono<Void>
를 사용한다. client 에서는 기존의 index 메소드와 indexAsync 를 제공한다. (대부분의 메소드들이 동일하게 제공) 위의 동작은 공식문서의 index 부분의 동작을 코드로 작성한 부분이다. url 로 풀어 쓰면 다음과 같다.
$ curl -X PUT "localhost:9200/twitter/_doc/1" -H 'Content-Type: application/json' -d'
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
'
indexAsync 메소드를 사용했고 이를 MonoSink 로 감싸주었다. Mono 는 0..1 의 상태이므로 하나의 index 결과에 대한 성공여부를 publish 해준다. controller 에서는 다음과 같이 받아 처리해준다.
controller.java
@RequestMapping(value = "/index/{index}/{type}", method = {RequestMethod.POST})
public Mono<Void> index(@PathVariable("index") String index,
@PathVariable("type") String type,
@RequestParam(value = "user_name") String userName,
@RequestParam(value = "message") String message) {
return elasticsearchService.index(index, type, userName, message)
.onErrorResume(error -> {
log.error("index error ", error);
return Mono.empty();
});
}
index 동작 이후 특정한 값을 반환하고자 한다면 Mono 안의 타입을 원하는 클래스를 정의해주자.
get
document 를 넣었으면 값이 잘 들어갔는지 확인이 필요하겠다. get api 를 추가해보자.
$ curl -X GET "localhost:9200/twitter/_doc/0"
이 api 를 rest high level client 로 작성하면 아래와 같다.
public Mono<User> getUser(String index, String type, String id) {
final Gson gson = GsonUtil.gson();
GetRequest getRequest = new GetRequest(index, type, id);
return Mono.create(sink -> {
restHighLevelClient.getAsync(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse documentFields) {
User user = gson.fromJson(documentFields.getSourceAsString(), User.class);
sink.success(user);
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
sink.error(e);
}
});
});
}
index 에서와 같이 Mono 로 감싸서 반환을 하였고 이번에는 필요한 값인 User 객체를 사용하였다.
@GetMapping("get/{index}/{type}/{id}")
public Mono<User> getAsync(@PathVariable("index") String index,
@PathVariable("type") String type,
@PathVariable("id") String id) {
return elasticsearchService.getUser(index, type, id)
.onErrorResume(error -> {
User defaultUser = new User();
defaultUser.setUser("default");
defaultUser.setPostDate(new Date());
defaultUser.setMessage("default message");
return Mono.just(defaultUser);
})
.defaultIfEmpty(new User());
}
service 에서 Mono<User>
를 받아 반환을 해준다. onErrorResume
이나 defaultIfEmpty
는 service 에서 정상동작을 하지 않았을 경우의 후처리 이다. 여기에서 짚고 넘어갈 부분이라면, 우리는 restful Api 를 만들고 있기 때문에 controller 가 @RestController
이거나 mapping method 에 @ResponseBody
annotation 이 걸려있으면 위와 같이 바로 Mono
를 반환해도 된다.
하지만 @Controller
annotation 을 사용하고 있다면 http status 값이 넘어가지 않기 때문에 mapping method 에 @ResponseBody
를 걸어주거나 ResponseEntity
객체로 한번 더 감싸서 반환해주어야 한다. 다음과 같이 감싸줄 수 있겠다.
@GetMapping("get/{index}/{type}/{id}")
public Mono<ResponseEntity<User>> getAsync2(@PathVariable("index") String index,
@PathVariable("type") String type,
@PathVariable("id") String id) {
return elasticsearchService.getUser(index, type, id)
.map(ResponseEntity::ok)
.onErrorResume(error -> Mono.just(ResponseEntity.badRequest().build()))
.defaultIfEmpty(ResponseEntity.status(HttpStatus.OK).body(new User()));
}
취향과 상황에 맞게 선택하도록 하자.
match all
match all api 도 추가해보자.
$ curl -X GET "localhost:9200/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
}
}
'
코드로 작성하면 다음과 같다.
public Flux<User> matchAll(String index) {
final Gson gson = GsonUtil.gson();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(searchSourceBuilder);
return Flux.create((FluxSink<User> sink) -> {
restHighLevelClient.searchAsync(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
searchResponse.getHits().forEach(item -> {
User user = gson.fromJson(item.getSourceAsString(), User.class);
sink.next(user);
});
sink.complete();
}
@Override
public void onFailure(Exception e) {
log.error("matchAll error ", e);
sink.error(e);
}
});
});
}
@GetMapping("/match_all/{index}")
public Flux<User> matchAll(@PathVariable("index") String index) {
return elasticsearchService.matchAll(index).onErrorResume((Throwable error) -> {
log.error("err", error);
User user = new User();
user.setPostDate(new Date());
user.setUser("default User");
user.setMessage("default message");
return Flux.just(user);
});
}
복수개의 값을 반환할 수 있으므로 Flux 로 반환해준다. 물론 List 로 묶어서 Mono 로 반환해야할 수도 있다. MonoSink 와의 차이점은 next 로 데이터를 넘기다가 모든 데이터가 넘어가면 complete 를 호출 해주는 부분이다.
결론
webflux 와 restHighLevelClient 사용하여 비동기 api 를 제공해줄 수 있다. webflux 는 꽤나 멋지게 동작하지만 아직까지는 모든 서드파티들이 지원을 해주고 있지 않아 하드하게 사용하기에는 주저되는 부분이 있다. (jdbc 를 사용하지 않는다면 추천!) restHighLevelClient 로 넘어오면서 각 클래스들의 역할이 명확해진 느낌이고 선언과 사용도 좀 더 명확해져서 편하다는 인상을 많이 받았다. 위의 예제코드는 여기 에서 확인할 수 있다.
여담
elasticsearch 에 kibana 를 얹으면 devtool 을 이용해서 (무려 자동완성이 제공되는) 쿼리를 날려볼 수 있다. 지금껏 sense 나 다른 툴들을 조금은 불편하게 사용해왔었는데, 반길만한 부분이다. 다만 개인적으로는 redis-cli 처럼 CLI tool 이 제공되면 좋았을 거라는 생각에 elasticsearch-cli 를 만들어 보는 중이다. 물론 elastic 진영에서는 curator 을 제공해주고 있긴 하지만 모니터링 툴이라 간단한 match query 나 analyze 등을 사용할 수는 없다. 나름 일하면서 편하게 사용하는중이다. ㅎㅎ