RxJS Operator 01 - Creation, Filtering

RxJS Operator 01 - Creation, Filtering

RxJS에 대한 개념과 동작 방식 등을 간단하게 알아봤고, RxJS의 핵심이라고 볼 수 있는, Operator 사용에 대해서 몇 가지만 간단하게 뽑아서 정리해보려고 한다. 이번 글에서는 Creation Operator, Filtering Operator를 확인 보려고 한다.

Creation Operator"rxjs/operators"에서 가져오지 않고, "rxjs"에서 가져와서, 그냥 생성 함수라고 설명하는 경우도 있는 것 같다.

Creation Operator

from

옵저버블로 변환이 가능한 객체를 옵저버블로 바꿔주는 오퍼레이터이다. 가능한 객체 타입 종류는 다음과 같다.

  • Observable
  • Array
  • Promise
  • Iterable
  • String
1
2
3
4
5
6
7
8
9
10
11
12
import { from } from "rxjs";

from([1, 2, 3, 4]).subscribe({
next: console.log,
complete: () => console.log("complete"),
});

// 1
// 2
// 3
// 4
// complete

아래는 이터러블을 사용한 예시이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { from } from 'rxjs';
import { take } from 'rxjs/operators';

function* generateDoubles(seed: number) {
let i = seed;
while (true) {
yield i;
i = 2 * i;
}
}

const iterator = generateDoubles(3);
const observable = from(iterator).pipe(take(10));

observable.subscribe(console.log);

// Logs:
// 3
// 6
// 12
// 24
// 48
// 96
// 192
// 384
// 768
// 1536

Promise의 경우에 resolve 되는 값을 next처럼 동작하게 하고, reject 되는 값을 error 쪽에서 동작하게 만든다. 비동기로 동작하게 된다. 함수 스케줄러를 두 번째 인자 값으로 넣어서 비동기로 동작하게 만들 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { from, asyncScheduler } from 'rxjs';

console.log('start');
const array = [10, 20, 30];
const result = from(array, asyncScheduler);
result.subscribe(console.log);

console.log('end');

// Logs:
// start
// end
// 10
// 20
// 30

fromEvent

EventEmitter 객체 또는 브라우저 이벤트를 Observable로 바꿔야 하는 경우에 사용한다.

1
2
3
4
5
6
7
8
import { fromEvent } from "rxjs";

fromEvent(document.querySelector("btn")!, "click").subscribe({
next: (event) => {
// ... 이벤트 처리
},
// ... observer 내용
});

of

나열된 인자 값들을 순서대로 발행하도록 옵저버블을 만들어준다. 간단한 경우에는 new Observable을 사용하는 것 보다 훨씬 간단하게 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { of } from "rxjs";

const complete = () => console.log("complete");

of(1, 2, "a", "b", 3, 4, ["string", 10]).subscribe({
next: console.log,
complete
});

// 1
// 2
// a
// b
// 3
// 4
// [ 'string', 10 ]
// complete

range

일정 범위 안에 있는 숫자 값을 next 값으로 발행하는 Observable을 만든다. 반복문을 실행하는 것과 같은 형태이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { range } from "rxjs";

range(1, 5).subscribe(console.log);
// 1
// 2
// 3
// 4
// 5

range(2, 5).subscribe(console.log);
// 2
// 3
// 4
// 5
// 6

range(5, 1).subscribe(console.log);
// 5

헷갈릴 수 있는 부분은 range(start, end) 형태가 아니라, range(start, count) 형태인 점이다. 즉, 시작 지점과 몇 번 동작하게 되는지를 인자 값으로 넣는 것이다.

다른 Creation Operator 처럼 스케줄러를 마지막 인자값으로 넣어줄 수 있다. 즉, range(start, count, scheduler?) 형태를 인자로 받는다.

interval

ms 단위의 값을 인자 값으로 넣으면, 그 텀마다 (첫 시작도 기다린 후 첫 값을 내보냄) 음이 아닌 정수를 차례대로 반환하는 함수이다.

1
2
3
4
import { interval } from "rxjs";

interval(1000).subscribe(console.log);
// 1초 뒤에 0, 그 이후로 1초마다 1씩 늘어난 값을 내보낸다.

Creation Operator 역시, 스케줄러를 두 번째 인자 값으로 넣을 수 있다.

timer

지정한 시간이 지난 다음 값을 한 개 내보내는 함수이다. 두 번째 값으로 그 다음 값에 대한 주기를 줄 수 있다.

1
2
3
4
5
6
7
8
9
import { timer } from "rxjs";

timer(1000).subscribe(console.log);
// 0

timer(1000, 500).subscribe(console.log);
// 0
// 1
// 2 ...

첫 번째 값은 number | Date를 입력 받을 수 있다. 어느 정도 시간 만큼 기다릴 지 설정하는 것이고, 두 번째는 첫 값 발행 후 어느 정도 시간 간격을 둘지이다. 만약 인자를 하나만 넣으면 값은 한 개만 나오게 된다.

throwError

값을 발행 하다가, 특정 에러를 발생시키고 종료 해야 하는 상황에 사용할 수 있는 함수이다. 다른 생성 함수 처럼, scheduler를 두 번째 인자 값으로 넣을 수 있다.

1
2
3
4
5
6
7
8
9
import { throwError } from "rxjs";

throwError(new Error("error")).subscribe({
next: console.log,
error: (err) => console.error(`error: ${err.message}`),
complete: () => console.log("complete"),
});

// error: error

error 상황을 만들어내는 것을 알 수 있다.

Filtering Operator

Observable이 값을 발행할 때, 필터링을 해주는 작업을 하는 연산자를 Filtering Operator라고 한다.

filter

배열의 filter 함수처럼, 조건을 통과 하면 값을 발행하도록 만든다.

1
2
3
4
5
6
7
8
9
import { range } from "rxjs";
import { filter } from "rxjs/operators";

range(1, 5)
.pipe(filter((x) => x % 2 === 0))
.subscribe(console.log);

// 2
// 4

필터링의 인자값으로 사용되는 함수를 predicate 함수라고 한다.

last

마지막 값 한 개만 내보내는 Filtering Operator이다. next로 내보내지는 값을 모아두다가, complete이 호출되기 바로 전의 값을 내보낸다. 다만, complete이 없는 Observable에서는, 값을 내보내지 않는 상태가 된다.

last 함수도 인자로 predicate를 받을 수 있다. 해당 조건을 만족하는 값을 내부적으로 최신 값으로 유지하다가, 마지막에 최신 값을 내보내는 방식으로 동작한다.

1
2
3
4
5
6
7
8
9
10
import { range } from "rxjs";
import { last } from "rxjs/operators";

range(1, 10).pipe(last()).subscribe(console.log);
// 10

range(1, 10)
.pipe(last((x) => x <= 3))
.subscribe(console.log);
// 3

take

정해진 개수만큼구독하고 구독을 해제하게 해준다. 별도로 unsubscribe를 동작하지 않아도 되기 때문에, 코드가 간결해지고, 동작 파악이 비교적 쉽다.

1
2
3
4
5
6
7
8
9
import { interval } from "rxjs";
import { take } from "rxjs/operators";

interval(1000).pipe(take(5)).subscribe(console.log);
// 1
// 2
// 3
// 4
// 5

interval은 무한히 실행될 수 있는 연산자인데, take(5) 파이핑을 통해서 5개 까지만 한정하도록 하는 모습이다.

takeUntil

take은 개수 제한을 두는 형태로 동작하지만, takeUntil은 조건 제한을 두는 형태이다. 특정 조건이 만족하면, unsubscribe 한다. 주의할 점은 인자 값으로 받는 타입이 Observable이다.

1
2
3
4
5
6
7
import { fromEvent, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(console.log);

위 코드는 interval로 값이 내보내지는 것에서, 페이지를 클릭하게 되면, 구독을 멈추게 된다.

takeWhile

이 연산자는 filter 연산자처럼, predicate를 인자로 받는다. 만족하는 동안은 구독을 하고 있다가, 만족하지 않게 되면, unsubscribe 하는 구조이다.

1
2
3
4
5
6
7
8
import { interval } from "rxjs";
import { takeWhile } from "rxjs/operators";

interval(1000)
.pipe(
takeWhile((x) => x <= 10)
)
.subscribe(console.log);

1부터 차례대로 값을 내보내다가 10을 초과하게 되면 구독을 해제한다.

skip

원하는 만큼 내보내지는 값을 생략하고 그 다음 값부터 내보내지도록 한다.

1
2
3
4
5
6
7
8
interval(250).pipe(skip(3)).subscribe(console.log);

// 3
// 4
// 5
// 6
// 7
// ...

0, 1, 2 값은 생략하고 그 다음 값부터 내보낸다.

skipUntil

takeUntil에서처럼, 인자 값으로 Observable을 받고, 인자로 받은 Observable이 구독 시작되는 조건일 때부터 값을 내보낸다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import { interval, fromEvent } from 'rxjs';
import { skipUntil } from 'rxjs/operators';

const intervalObservable = interval(1000);
const click = fromEvent(document, 'click');

const emitAfterClick = intervalObservable.pipe(
skipUntil(click)
);

const subscribe = emitAfterClick.subscribe(value => console.log(value));
// clicked at 4.6s. output: 5...6...7...8........ or
// clicked at 7.3s. output: 8...9...10..11.......

클릭이 일어난 시점 이후부터 값을 내보내기 시작한다.

skipWile

predicate로 들어가는 인자 함수가 만족하면 값을 건너 뛴다.

1
2
3
4
5
6
7
8
9
10
11
import { interval } from "rxjs";
import { skipWhile } from "rxjs/operators";

interval(300)
.pipe(skipWhile((x) => x < 4))
.subscribe(console.log);
// 4
// 5
// 6
// 7
// ...

4 보다 작은 수는 생략하고, 그 다음 값부터 내보내는 결과를 확인할 수 있다.

Reference

#

댓글

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×