티스토리 뷰

BACKEND

mongodb의 변경스트림(changeStream)

나를찾는아이 2024. 12. 13. 12:50
728x90
반응형

https://www.mongodb.com/ko-kr/docs/manual/changeStreams/

 

Change Streams - MongoDB 매뉴얼 v8.0

변경 스트림을 통해 애플리케이션은 사전에 복잡한 방식 및 수동으로 oplog를 테일링하는 위험 없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 단일

www.mongodb.com

 

 

mongodb 5.1 부터 변경스트림이 더 효율적으로 리소스를 사용하고 더 빠르게 동작한다고 합니다

 

변경스트림을 통해 어플리케이션이 수동으로 oplog를 테일링하지 않고도 실시간 데이터 변경에 접근할수 있습니다

 

단일 컬렉션이나 데이터베이스의 모든 데이터 변경사항을 구독하고, 즉시 대응할수 있습니다

 

여기서의 데이터 변경사항은 코드로 변경하거나, cli를 통해 변경하거나 gui를 통해 변경한 모든 데이터를 말합니다

 

당연히 트랜잭션의 경우에도 트랜잭션이 커밋될때 변경사항이 출력됩니다

 

개인적으로는 transactional-outbox에서 어떻게 사용할수 있을지 고민중이기도 합니다

 

 

 

 

변경스트림은 replica set 과 샤딩된 클러스터에서 사용할수 있습니다

 

mongodb가 단독실행되는 환경에서는 변경스트림을 사용할수 없습니다

 

그래서 로컬로 테스트하실때 만약 환경을 구성하기 어려우시다면

 

https://www.mongodb.com/ko-kr/cloud/atlas/register

 

MongoDB Atlas

MongoDB Atlas를 무료로 체험해보세요. 애플리케이션 데이터 플랫폼의 핵심 요소인 멀티 클라우드 데이터베이스 서비스를 통해 데이터를 사용하여 더욱 빠르고 손쉽게 구축할 수 있습니다.

www.mongodb.com

 

mongodb의 클라우드플랫폼인 atlas에서 무료 계정을 만들어 사용하는 방법도 있습니다

 

 

 

이런식으로 활용할 수 있습니다

db.collection.watch()
db.watch()

 

 

다음은 inventory 컬렉션의 변경 이벤트를 수신하는 예입니다

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
  // process next document
});

 

mongoose에서도 당연히 가능합니다

 

const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));

// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
  on('change', data => console.log(data));

// Insert a doc, will trigger the change stream handler above
await Person.create({ name: 'Axl Rose' });

 

 

방법은 어렵지 않습니다

 

changeStream은 EventEmitter를 확장한 방식입니다

 

 

 

변경스트림을 구성할때 파이프라인을 사용하여 변경스트림의 출력을 제어할수도 있습니다

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
  // process next document
});

 

특정 조건을 만족하는 데이터만 출력하도록 하는것이지요

 

 

변경 이벤트 문서는 다음과 같은 형태입니다

 

{
   "_id": {
      "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
    },
    "operationType": "insert",
    "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
    "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
    "wallTime": ISODate("2022-10-19T15:37:04.604Z"),
    "fullDocument": {
       "_id": ObjectId("635019a078be67426d7cf4d2"'),
       "name": "Giovanni Verga"
    },
    "ns": {
       "db": "test",
       "coll": "names"
    },
    "documentKey": {
       "_id": ObjectId("635019a078be67426d7cf4d2")
    }
}

 

여기에 포함된 _id 값은 재개 토큰(resume token)이라고 합니다

 

이 값을 어떻게 사용하냐면 앱을 중지하고 다시 실행한다음에 중지한 부분부터 이후의 업데이트를 받기 위해서 사용됩니다

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
  const resumeToken = changeStream.resumeToken;
  changeStream.close();
  newChangeStream = collection.watch([], { resumeAfter: resumeToken });
  newChangeStream.on('change', next => {
    processChange(next);
  });
});

 

watch의 옵션으로 resumeAfter 값에 resumeToken을 전달하면

 

해당 지점 이후의 이벤트들을 다시 수신할 수 있습니다

 

코드를 재배포하게되어 앱을 중지하고 다시 실행해야한다거나 이럴때 사용할수 있겠습니다

 

다만 내부적으로 oplog를 테일링 하는 방식이기 때문에 oplog가 존재하지 않는다면 수신되지 않는다고 하네요

 

 

 

변경스트림을 이용해서 CDC에서 사용하는것과 같이 ETL이나, 도메인간 동기화, 협업기능, 알림등을 구현할때 유용해보입니다

 

실제 사용예를 더 가져오려고 했는데 생각보다 자료는 많지 않네요

728x90
반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함