티스토리 뷰
https://www.mongodb.com/ko-kr/docs/manual/changeStreams/
mongodb 5.1 부터 변경스트림이 더 효율적으로 리소스를 사용하고 더 빠르게 동작한다고 합니다
변경스트림을 통해 어플리케이션이 수동으로 oplog를 테일링하지 않고도 실시간 데이터 변경에 접근할수 있습니다
단일 컬렉션이나 데이터베이스의 모든 데이터 변경사항을 구독하고, 즉시 대응할수 있습니다
여기서의 데이터 변경사항은 코드로 변경하거나, cli를 통해 변경하거나 gui를 통해 변경한 모든 데이터를 말합니다
당연히 트랜잭션의 경우에도 트랜잭션이 커밋될때 변경사항이 출력됩니다
개인적으로는 transactional-outbox에서 어떻게 사용할수 있을지 고민중이기도 합니다
변경스트림은 replica set 과 샤딩된 클러스터에서 사용할수 있습니다
mongodb가 단독실행되는 환경에서는 변경스트림을 사용할수 없습니다
그래서 로컬로 테스트하실때 만약 환경을 구성하기 어려우시다면
https://www.mongodb.com/ko-kr/cloud/atlas/register
mongodb의 클라우드플랫폼인 atlas에서 무료 계정을 만들어 사용하는 방법도 있습니다
이런식으로 활용할 수 있습니다
db.collection.watch()
db.watch()
다음은 inventory 컬렉션의 변경 이벤트를 수신하는 예입니다
const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});
mongoose에서도 당연히 가능합니다
const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));
// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
on('change', data => console.log(data));
// Insert a doc, will trigger the change stream handler above
await Person.create({ name: 'Axl Rose' });
방법은 어렵지 않습니다
changeStream은 EventEmitter를 확장한 방식입니다
변경스트림을 구성할때 파이프라인을 사용하여 변경스트림의 출력을 제어할수도 있습니다
const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});
특정 조건을 만족하는 데이터만 출력하도록 하는것이지요
변경 이벤트 문서는 다음과 같은 형태입니다
{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}
여기에 포함된 _id 값은 재개 토큰(resume token)이라고 합니다
이 값을 어떻게 사용하냐면 앱을 중지하고 다시 실행한다음에 중지한 부분부터 이후의 업데이트를 받기 위해서 사용됩니다
const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});
watch의 옵션으로 resumeAfter 값에 resumeToken을 전달하면
해당 지점 이후의 이벤트들을 다시 수신할 수 있습니다
코드를 재배포하게되어 앱을 중지하고 다시 실행해야한다거나 이럴때 사용할수 있겠습니다
다만 내부적으로 oplog를 테일링 하는 방식이기 때문에 oplog가 존재하지 않는다면 수신되지 않는다고 하네요
변경스트림을 이용해서 CDC에서 사용하는것과 같이 ETL이나, 도메인간 동기화, 협업기능, 알림등을 구현할때 유용해보입니다
실제 사용예를 더 가져오려고 했는데 생각보다 자료는 많지 않네요
'BACKEND' 카테고리의 다른 글
transactional outbox 패턴 (1) | 2024.12.17 |
---|---|
querystring을 통해 배열도 받을수 있나요? (0) | 2024.08.13 |
cloudfront에 일괄 cache-control 적용하기 (0) | 2024.05.03 |
nestjs swagger가 아니라 redoc 적용하기 (0) | 2024.03.11 |
POST 메서드 apache bench(ab) 사용하기 (0) | 2024.02.07 |
- Total
- Today
- Yesterday
- 자바스크립트
- 앱
- android
- 벤처
- 안드로이드
- 스마트폰
- 공모전
- 대학생
- 경진대회
- 앱스토어
- 애플
- 소프트웨어
- AWS
- 창업
- CSS
- 아이폰
- Apple
- 어플리케이션
- iPhone
- php
- 아이디어
- 구글
- 트위터
- 모바일
- 웹표준
- 게임
- 네이버
- JavaScript
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |