Reactive MongoDB driver #
MongoDB driver #
- MongoDB사에서 공식적인 2가지 java driver를 제공
- Sync Driver
- Reactive Streams Driver
Sync driver #
- 동기적으로 동작
- 클라이언트 요청을 보내면 응답이 돌아오기 전까지 쓰레드가 blocking
- 메서드가 응답 객체를 바로 반환 -> 직관적
- 쓰레드 동시성 문제 발생 가능성
Reactive Streams driver #
- 비동기적으로 동작
- 클라이언트가 요청을 보내면 쓰레드는 non-blocking
- 모든 응답이 publisher를 이용해서 전달되기 때문에 처리하기 어렵다.
- Spring reactive stack과 함께 사용되어 높은 성능, 안정성 제공
Spring Data MongoDB Reactive, REactive Streams MongoDB Driver #
Mongo Reactive streams driver #
MongoCollection 획득 #
- MongoDB의 MongoClient, MongoDatabase, MongoCollection
- MongoClient
- MongoDB 클러스터를 가리키는 객체 (MongoDatabase factory 역할)
- MongoDatabase
- Mongo의 Database를 가리킨다.
- Codec, WriteConcern, ReadPreference 등의 정보를 포함
- collection 이름을 인자로 받고 MongoCollection 제공
- MongoCollection
- MongoDB의 Collection
- mongodb commands를 실행
MongoCollection 획득 예제 #
- ConnectionString을 이용해서 MongoDB 연결 정보를 String 형태로 제공
- MongoClientSettings builder에 Connection 정보를 전달
- MongoClientSettings로 MongoClient 생성
- MongoClient로 MongoDatabase 접근
- MongoDatabase로 MongoCollection 접근
MongoCollection - count #
- ClientSession을 통해서 multi document transaction 제공
- 다양한 수행건들의 트랜잭션을 ClientSession을 사용해서 하나로 묶을 수 있음
- Bson 구현체 (BsonDocument 등)로 filter 제공
- CountOptions로 hint, limit, skip, maxTime, collation 등의 정보 제공
// CountOptions를 사용하여 옵션 설정
CountOptions countOptions = new CountOptions();
countOptions.skip(10); // 처음 10개 문서 제외하고 count
countOptions.limit(20); // 최대 20개의 문서만 count
countOptions.maxTime(5000); // 최대 수행 시간 (5초)
// countDocuments 메서드에 CountOptions 적용
long documentCount = collection.countDocuments(countOptions);
System.out.println("Total documents in the collection: " + documentCount);
MongoCollection - find #
- Filters helper 클래스를 통해서 filter 설정 가능
- eq, ne, gt, gte, lt, lte, in, nin, and, or, not, nor, exists, type, mod, regex, text 등의 기본 연산자 제공
- geoWithin, geoWithinBox 등의 geo 연산자도 제공
// Filters를 사용한 쿼리 조건 생성
// 예: age가 30 이상인 문서 검색
FindIterable<Document> result = collection.find(Filters.gte("age", 30));
// 결과 출력
for (Document document : result) {
System.out.println(document.toJson());
}
aggregate #
- pipeline을 생성하고 mongo shard 전체에 대해서 필터, 집계, 그룹 등의 연산을 수행
MongoCollection - aggregate #
- Aggregates helper 클래스를 통해서 aggregate pipeline 제공
- addFields, set, bucket, bucketAuto
- count, match, project, sort, sortByCount
- skip, limit, lookup
- facet, graphLookup, group, unionWith, unwind, out, merge, replaceRoot, replaceWith, sample 등
- AggregatePublisher를 반환
// MongoDB 연결 설정
AggregatePublisher<Document> publisher = MongoClients.create()
.getDatabase("your_database_name")
.getCollection("your_collection_name", Document.class)
.aggregate(Arrays.asList(
new Document("$group", new Document("_id", "$city").append("totalPopulation", new Document("$sum", "$population"))),
new Document("$sort", new Document("totalPopulation", -1)),
new Document("$limit", 5)
));
// 결과 처리
publisher.subscribe(
document -> System.out.println(document.toJson()),
throwable -> System.err.println("Error: " + throwable.getMessage()),
() -> System.out.println("Aggregation completed")
);
MongoCollection - watch #
- Aggregates helper 클래스를 통해서 aggregate pipeline 제공
- addFields, match, project, replaceRoot, replaceWith, redact, set, unset 지원
- ChangeStreamPublisher를 반환하고 해당 Publisher를 subscribe
- ChangeStreamDocument를 onNext로 전달
- resumeToken, 변경사항이 발생한 document 혹은 _id
// ChangeStreamPublisher를 얻어옴
Publisher<ChangeStreamDocument<Document>> changeStreamPublisher = mongoCollection.watch();
// Publisher를 사용하여 Subscriber에게 변경 사항을 알림
changeStreamPublisher.subscribe(new ExampleSubscriber());
MongoCollection - bulkWrite #
- Delete, Insert, Replace, Update 등을 모아서 한번에 실행하는 operation
- WriteModel
- DeleteManyModel: 조건을 만족하는 document를 모두 삭제
- DeleteOneModel: 조건을 만족하는 document를 최대 1개만 삭제
- InsertOneModel: 하나의 document를 추가
- ReplaceOneModel: 조건을 만족하는 document를 최대 1개만 대체
- UpdateManyModel: 조건을 만족하는 document를 모두 수정
- UpdateOneModel: 조건을 만족하는 document를 최대 1개만 수정
// BulkWrite 작업 생성
List<WriteModel<Document>> bulkOperations = new ArrayList<>();
// 삽입 작업
Document document1 = new Document("_id", 1).append("name", "John");
InsertOneModel<Document> insertOneModel1 = new InsertOneModel<>(document1);
bulkOperations.add(insertOneModel1);
// 업데이트 작업
Document filter = new Document("_id", 2);
Document update = new Document("$set", new Document("age", 25));
UpdateOneModel<Document> updateOneModel = new UpdateOneModel<>(filter, update);
bulkOperations.add(updateOneModel);
// 삭제 작업
DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<>(Filters.eq("_id", 3));
bulkOperations.add(deleteOneModel);
// BulkWrite 수행
BulkWriteResult result = mongoCollection.bulkWrite(bulkOperations);
MongoCollection - insert #
- 하나 혹은 여러 document를 추가하는 operation
- InsertOneOptions, InsertManyOptions
- validation 우회 여부를 결정
- InsertManyOptions라면 insert의 순서를 보장할지 결정
- InsertOneResult, InsertManyResult
- wasAcknowledged() : write 성공 여부
- getInsertedIds() : write된 id들을 제공
// 삽입할 문서 생성
Document document = new Document("_id", 1)
.append("name", "John")
.append("age", 30)
.append("city", "New York");
// 검사 우회 옵션 추가
Document options = new Document("bypassDocumentValidation", true);
// 컬렉션에 문서 삽입 (검사 우회)
mongoCollection.insertOne(document, options);
InsertOneOptions
// InsertOneOptions를 사용한 단일 문서 삽입 옵션 설정
InsertOneOptions insertOneOptions = new InsertOneOptions();
insertOneOptions.bypassDocumentValidation(true); // 데이터 유효성 검사 우회
// 단일 문서 삽입
Document document = new Document("_id", 1)
.append("name", "John")
.append("age", 30);
mongoCollection.insertOne(document, insertOneOptions);
InsertManyOptions
// InsertManyOptions를 사용한 다수의 문서 삽입 옵션 설정
InsertManyOptions insertManyOptions = new InsertManyOptions();
insertManyOptions.ordered(false); // 순서 무시
// 다수의 문서 삽입
List<Document> documents = Arrays.asList(
new Document("_id", 2).append("name", "Jane").append("age", 25),
new Document("_id", 3).append("name", "Bob").append("age", 35)
);
mongoCollection.insertMany(documents, insertManyOptions);
MongoCollection - update #
- 하나 혹은 여러 document를 수정하는 operation
- Filters helper 클래스를 통해서 filter 설정 가능
- Updates helper 클래스를 통해 update 설정 가능
- UpdateOptions를 통해서 upsert, hint, collation, variables 등 제공
// 업데이트 옵션 설정
UpdateOptions options = new UpdateOptions();
options.upsert(true); // 일치하는 문서가 없을 경우 삽입 (upsert)
// 업데이트 작업 수행
mongoCollection.updateOne(eq("name", "John"), set("age", 31), options);
MongoCollection - atomic #
- findOneAndDelete, findOneAndReplace, findOneAndUpdate 등 find와 write를 묶어서 atomic한 operation 제공
- 트랜잭션 내에서 수행되는 각 작업은 clientSession을 통해 수행
MongoCollection - index #
-
Collection에서 특정 필드들에 대한 index 생성, 조회, 삭제 가능
-
Indexes helper 클래스를 통해서 다양한 index 제공
-
IndexModel과 IndexOptions를 통해서 어떤 필드들에 대해서 어떻게 Index를 적용할 것인지 설정 가능
- background: index의 생성을 background에서 진행할지 여부
- unique: unique index를 생성할지 여부
- name: index에 name 설정
- partialFilterExpression: 특별한 조건을 충족한 경우에만 index를 걸고 싶은 경우 설정
// IndexModel을 사용하여 색인 정의
IndexModel indexModel = new IndexModel(
new Document("field1", 1), // 1은 오름차순, -1은 내림차순
new Document("field2", -1)
);
// IndexOptions를 사용하여 색인 옵션 설정
IndexOptions indexOptions = new IndexOptions();
indexOptions.name("custom_index_name"); // 색인의 이름 설정
indexOptions.unique(true); // 고유 색인 설정
indexOptions.background(true); // 비동기적으로 색인 생성
// 색인 생성
mongoCollection.createIndex(indexModel, indexOptions);
- 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지_