001 Reactive Mongodb

Reactive MongoDB driver #

MongoDB driver #

  • MongoDB사에서 공식적인 2가지 java driver를 제공
    • Sync Driver
    • Reactive Streams Driver img.png

Sync driver #

  • 동기적으로 동작
  • 클라이언트 요청을 보내면 응답이 돌아오기 전까지 쓰레드가 blocking
  • 메서드가 응답 객체를 바로 반환 -> 직관적
  • 쓰레드 동시성 문제 발생 가능성 img_1.png

Reactive Streams driver #

  • 비동기적으로 동작
  • 클라이언트가 요청을 보내면 쓰레드는 non-blocking
  • 모든 응답이 publisher를 이용해서 전달되기 때문에 처리하기 어렵다.
  • Spring reactive stack과 함께 사용되어 높은 성능, 안정성 제공 img_2.png

Spring Data MongoDB Reactive, REactive Streams MongoDB Driver #

img_3.png

Mongo Reactive streams driver #

MongoCollection 획득 #

  • MongoDB의 MongoClient, MongoDatabase, MongoCollection
  1. MongoClient
  • MongoDB 클러스터를 가리키는 객체 (MongoDatabase factory 역할)
  1. MongoDatabase
  • Mongo의 Database를 가리킨다.
    • Codec, WriteConcern, ReadPreference 등의 정보를 포함
    • collection 이름을 인자로 받고 MongoCollection 제공
  1. MongoCollection
  • MongoDB의 Collection
    • mongodb commands를 실행 img_4.png

MongoCollection 획득 예제 #

  • ConnectionString을 이용해서 MongoDB 연결 정보를 String 형태로 제공
  • MongoClientSettings builder에 Connection 정보를 전달
  • MongoClientSettings로 MongoClient 생성
  • MongoClient로 MongoDatabase 접근
  • MongoDatabase로 MongoCollection 접근 img_5.png img_6.png

MongoCollection - count #

  • ClientSession을 통해서 multi document transaction 제공
    • 다양한 수행건들의 트랜잭션을 ClientSession을 사용해서 하나로 묶을 수 있음
  • Bson 구현체 (BsonDocument 등)로 filter 제공
  • CountOptions로 hint, limit, skip, maxTime, collation 등의 정보 제공 img_7.png img_8.png
// CountOptions를 사용하여 옵션 설정
CountOptions countOptions = new CountOptions();
countOptions.skip(10);  // 처음 10개 문서 제외하고 count
countOptions.limit(20); // 최대 20개의 문서만 count
countOptions.maxTime(5000); // 최대 수행 시간 (5초)

// countDocuments 메서드에 CountOptions 적용
long documentCount = collection.countDocuments(countOptions);
System.out.println("Total documents in the collection: " + documentCount);

MongoCollection - find #

  • Filters helper 클래스를 통해서 filter 설정 가능
    • eq, ne, gt, gte, lt, lte, in, nin, and, or, not, nor, exists, type, mod, regex, text 등의 기본 연산자 제공
    • geoWithin, geoWithinBox 등의 geo 연산자도 제공 img_9.png img_10.png
// Filters를 사용한 쿼리 조건 생성
// 예: age가 30 이상인 문서 검색
FindIterable<Document> result = collection.find(Filters.gte("age", 30));

// 결과 출력
for (Document document : result) {
    System.out.println(document.toJson());
}

aggregate #

  • pipeline을 생성하고 mongo shard 전체에 대해서 필터, 집계, 그룹 등의 연산을 수행 img_11.png img_12.png

MongoCollection - aggregate #

  • Aggregates helper 클래스를 통해서 aggregate pipeline 제공
    • addFields, set, bucket, bucketAuto
    • count, match, project, sort, sortByCount
    • skip, limit, lookup
    • facet, graphLookup, group, unionWith, unwind, out, merge, replaceRoot, replaceWith, sample 등
  • AggregatePublisher를 반환 img_13.png img_14.png
// MongoDB 연결 설정
AggregatePublisher<Document> publisher = MongoClients.create()
        .getDatabase("your_database_name")
        .getCollection("your_collection_name", Document.class)
        .aggregate(Arrays.asList(
                new Document("$group", new Document("_id", "$city").append("totalPopulation", new Document("$sum", "$population"))),
                new Document("$sort", new Document("totalPopulation", -1)),
                new Document("$limit", 5)
        ));

// 결과 처리
publisher.subscribe(
        document -> System.out.println(document.toJson()),
        throwable -> System.err.println("Error: " + throwable.getMessage()),
        () -> System.out.println("Aggregation completed")
);

MongoCollection - watch #

  • Aggregates helper 클래스를 통해서 aggregate pipeline 제공
    • addFields, match, project, replaceRoot, replaceWith, redact, set, unset 지원
  • ChangeStreamPublisher를 반환하고 해당 Publisher를 subscribe
    • ChangeStreamDocument를 onNext로 전달
    • resumeToken, 변경사항이 발생한 document 혹은 _id img_15.png
// ChangeStreamPublisher를 얻어옴
Publisher<ChangeStreamDocument<Document>> changeStreamPublisher = mongoCollection.watch();

// Publisher를 사용하여 Subscriber에게 변경 사항을 알림
changeStreamPublisher.subscribe(new ExampleSubscriber());

MongoCollection - bulkWrite #

  • Delete, Insert, Replace, Update 등을 모아서 한번에 실행하는 operation
  • WriteModel
    • DeleteManyModel: 조건을 만족하는 document를 모두 삭제
    • DeleteOneModel: 조건을 만족하는 document를 최대 1개만 삭제
    • InsertOneModel: 하나의 document를 추가
    • ReplaceOneModel: 조건을 만족하는 document를 최대 1개만 대체
    • UpdateManyModel: 조건을 만족하는 document를 모두 수정
    • UpdateOneModel: 조건을 만족하는 document를 최대 1개만 수정 img_16.png
// BulkWrite 작업 생성
List<WriteModel<Document>> bulkOperations = new ArrayList<>();

// 삽입 작업
Document document1 = new Document("_id", 1).append("name", "John");
InsertOneModel<Document> insertOneModel1 = new InsertOneModel<>(document1);
bulkOperations.add(insertOneModel1);

// 업데이트 작업
Document filter = new Document("_id", 2);
Document update = new Document("$set", new Document("age", 25));
UpdateOneModel<Document> updateOneModel = new UpdateOneModel<>(filter, update);
bulkOperations.add(updateOneModel);

// 삭제 작업
DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<>(Filters.eq("_id", 3));
bulkOperations.add(deleteOneModel);

// BulkWrite 수행
BulkWriteResult result = mongoCollection.bulkWrite(bulkOperations);

MongoCollection - insert #

  • 하나 혹은 여러 document를 추가하는 operation
  • InsertOneOptions, InsertManyOptions
    • validation 우회 여부를 결정
    • InsertManyOptions라면 insert의 순서를 보장할지 결정
  • InsertOneResult, InsertManyResult
    • wasAcknowledged() : write 성공 여부
    • getInsertedIds() : write된 id들을 제공 img_17.png
// 삽입할 문서 생성
Document document = new Document("_id", 1)
        .append("name", "John")
        .append("age", 30)
        .append("city", "New York");

// 검사 우회 옵션 추가
Document options = new Document("bypassDocumentValidation", true);

// 컬렉션에 문서 삽입 (검사 우회)
mongoCollection.insertOne(document, options);

InsertOneOptions

// InsertOneOptions를 사용한 단일 문서 삽입 옵션 설정
InsertOneOptions insertOneOptions = new InsertOneOptions();
insertOneOptions.bypassDocumentValidation(true); // 데이터 유효성 검사 우회

// 단일 문서 삽입
Document document = new Document("_id", 1)
        .append("name", "John")
        .append("age", 30);
mongoCollection.insertOne(document, insertOneOptions);

InsertManyOptions

// InsertManyOptions를 사용한 다수의 문서 삽입 옵션 설정
        InsertManyOptions insertManyOptions = new InsertManyOptions();
        insertManyOptions.ordered(false); // 순서 무시
        
// 다수의 문서 삽입
List<Document> documents = Arrays.asList(
        new Document("_id", 2).append("name", "Jane").append("age", 25),
        new Document("_id", 3).append("name", "Bob").append("age", 35)
);
mongoCollection.insertMany(documents, insertManyOptions);

MongoCollection - update #

  • 하나 혹은 여러 document를 수정하는 operation
  • Filters helper 클래스를 통해서 filter 설정 가능
  • Updates helper 클래스를 통해 update 설정 가능
  • UpdateOptions를 통해서 upsert, hint, collation, variables 등 제공 img_18.png img_19.png
// 업데이트 옵션 설정
UpdateOptions options = new UpdateOptions();
options.upsert(true);  // 일치하는 문서가 없을 경우 삽입 (upsert)

// 업데이트 작업 수행
mongoCollection.updateOne(eq("name", "John"), set("age", 31), options);

MongoCollection - atomic #

  • findOneAndDelete, findOneAndReplace, findOneAndUpdate 등 find와 write를 묶어서 atomic한 operation 제공
  • 트랜잭션 내에서 수행되는 각 작업은 clientSession을 통해 수행 img_20.png

MongoCollection - index #

  • Collection에서 특정 필드들에 대한 index 생성, 조회, 삭제 가능

  • Indexes helper 클래스를 통해서 다양한 index 제공 img_21.png

  • IndexModel과 IndexOptions를 통해서 어떤 필드들에 대해서 어떻게 Index를 적용할 것인지 설정 가능

    • background: index의 생성을 background에서 진행할지 여부
    • unique: unique index를 생성할지 여부
    • name: index에 name 설정
    • partialFilterExpression: 특별한 조건을 충족한 경우에만 index를 걸고 싶은 경우 설정 img_22.png
// IndexModel을 사용하여 색인 정의
IndexModel indexModel = new IndexModel(
        new Document("field1", 1),  // 1은 오름차순, -1은 내림차순
        new Document("field2", -1)
);

// IndexOptions를 사용하여 색인 옵션 설정
IndexOptions indexOptions = new IndexOptions();
indexOptions.name("custom_index_name");  // 색인의 이름 설정
indexOptions.unique(true);  // 고유 색인 설정
indexOptions.background(true);  // 비동기적으로 색인 생성

// 색인 생성
mongoCollection.createIndex(indexModel, indexOptions);

  1. 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지_