본문 바로가기
flutter/Package of the Week

Flutter[플러터] / async 패키지 사용법 (더 많은 비동기 함수, 라이브러리, 유틸리티, pub.dev, 어싱크, package, AsyncMemoizer, CancelableOperation, FutureGroup, StreamGroup) async (Flutter Package of the Week)

by ch5c 2025. 7. 10.
반응형

package:async

dart:async비동기 계산을 처리하는 스타일의 유틸리티 클래스를 포함합니다.

https://youtu.be/r0tHiCjW2w0


우리가 Dart로 코드를 짤 때 필연적으로 필요한 것이 바로 비동기 기능인데 우리는 그러한 기능을 사용할 때 외부 패키지가 아닌 Dart SDK에 내장된 비동기 지원 기능 라이브러리인 dart:async를 임포트 해서 사용하게 된다. 이렇게 임포트 해서 사용하면 async, await, Future, Stream등 다양한 비동기 클래스와 함수들을 사용할 수 있게 된다. 하지만 여기서 더 나아가서 더 많은 기능을 내포하고 있는 Dart의 공식 패키지(publisher:dart.dev)가 있다면 어떻게 될까? 바로 그 패키지가 async 패키지이다.

async 패키지는 Dart의 비동기 프로그래밍을 지원하는 핵심 라이브러리로 이 패키지는 Future, Stream, 그리고 이들과 상호작용하기 위한 다양한 유틸리티를 제공해 준다. 바로 알아보자.

주요 기능
기능 사용 예시 설명
AsyncMemoizer 화면 초기화 시 필요한 데이터 캐싱 비동기 작업을 단 한 번만 실행하고, 이후 호출 시 이전 결과를 재사용
CancelableOperation 긴 실행 중인 요청, 타임아웃 처리 Future를 취소 가능하게 감쌈. cancel() 호출 시 작업 중단
FutureGroup 배치 API 호출, 모든 작업 완료 기다림 여러 Future를 그룹화하고 모든 작업이 완료될 때까지 기다림
StreamGroup 여러 이벤트 소스를 하나로 합침 여러 Stream을 하나의 스트림으로 병합
LazyStream 무거운 스트림 생성 지연 .listen() 호출 시에만 스트림 생성. 초기 비용 최소화
NullStreamSink 이벤트를 무시하고 안정성 확보 StreamSink의 구현체로 이벤트를 모두 무시함
RestartableTimer 유휴 타이머 초기화, 타임아웃 제어 Timer에 reset 기능을 추가해 재시작 가능
DelegatingFuture / DelegatingStream Stream이나 Future에 로깅 등 확장 기능 추가 원래 객체의 기능을 유지하면서 기능을 덧붙일 수 있는 래퍼 클래스
StreamQueue Stream을 이벤트 단위로 pull Stream 이벤트를 하나씩 순차적으로 받아서 처리
StreamSplitter 동일 스트림 여러 위젯에서 구독 하나의 Stream을 여러 개로 복제해 독립적으로 구독 가능
StreamZip 여러 스트림의 동기화 이벤트 처리 여러 스트림에서 동시 발생한 값을 묶어서 전달
Result / ResultFuture / ErrorResult 에러와 성공 결과를 통합 처리 Future/Stream의 성공과 실패를 한 객체로 반환

 

자 일단은 package:async의 기능을 살펴보기 전에 기본 내장 패키지인 dart:async의 주요 클래스에 대해 살펴보고 넘어가겠다.

Future

'비동기' 하면 가장 먼저 생각나는 클래스로 미래에 완료될 값을 나타내주는 역할을 한다. 보통 비동기 작업인 API 호출, 파일 읽기 등을 처리할 때 사용된다.

Future<String> fetchData() async {
  await Future.delayed(Duration(seconds: 1));
  return 'Data loaded';
}
Stream

여러 개의 값을 시간에 따라 비동기적으로 제공해 주는 클래스로 보통 센서 데이터, WebSocket 메시지 등을 처리할 때 사용된다.

Stream<int> countStream() async* {
  for (int i = 0; i < 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}
Completer

이 클래스는 Future를 외부에서 수동으로 제어하고 완료시킬 수 있게 도와주는 클래스로 비동기 작업의 완료 시점을 직접 관리해야 할 때 사용된다.

Future<String> completeLater() {
  final completer = Completer<String>();
  Timer(Duration(seconds: 1), () {
    completer.complete('Done');
  });
  return completer.future;
}
Timer

우리가 흔히 생각하는 모든 타이머와 관련된 일을 해주는 클래스로 일정 시간 후에 작업을 실행하거나 주기적으로 실행해 주는 기능을 갖고 있다. 생성자인 Timer.periodic을 사용하면 계속해서 호출되는 타이머를 만들 수도 있게 된다.

void runAfterDelay() {
  Timer(Duration(seconds: 2), () {
    print('Executed after delay');
  });
}
StreamController

커스텀 스트림을 생성하고 이벤트를 수동으로 추가할 수 있게 해주는 클래스로 실시간 데이터 스트림 처리 시 유용하게 사용된다.

final controller = StreamController<int>();

void startStream() {
  Timer.periodic(Duration(seconds: 1), (timer) {
    controller.sink.add(timer.tick);
    if (timer.tick >= 5) {
      controller.close();
      timer.cancel();
    }
  });
}

이제 어느 정도는 기본 비동기 처리 라이브러리 클래스를 알아보았으니 바로 async 패키지에 대해서 알아보겠다.

일단 아주 기본적인 것부터 시작하자.

먼저 펍데브(pub.dev)에서 async 패키지를 가져와 pubspec.yaml 파일에 넣어주자.

굳이 이렇게 하기 싫다면 async는 퍼블리셔가 dart.dev이기 때문에 그냥 터미널에서 바로 pup add 시켜도 된다.

flutter pub add async

암튼 어떻게든 간에 가져와서 pubsepc.yaml파일에 정상적으로 들어가 있다면 이제 사용해 줄 수 있다.

AsyncMemoizer

 

가장 먼저 사용해 볼 것은 AsyncMemoizer이다.

이 클래스는 비동기 작업을 단 1회만 실행시켜 주는 동작을 갖고 있는데 어디에 쓰이는지 감이 오는가? 그렇다. 보통 중복 호출을 방지하거나 초기화 작업을 캐싱(재사용)할 때 사용된다.

final _memorizer = AsyncMemoizer<String>();

지금 위의 코드처럼 인스턴스를 생성해 주면 바로 사용해 줄 수 있다.

하지만 조금 생략을 해서 작성해도 되는데 원래 AsyncMemoizer의 타입은 AsyncMemoizer<T>이기 때문에 위의 코드와 같이 제네릭을 확실하게 넣어주는 게 코드안정성 측면에선 좋긴 하다. 하지만 그냥 무시 까고  AsyncMemoizer(); 이런 식으로 사용해 줘도 문제는 없긴 하다. 암튼 이제 사용해 주면 된다.

Future<String> fetchOnce() {
  return _memorizer.runOnce(() async {
    await Future.delayed(Duration(seconds: 2));
    return 'Fetched once';
  },);
}

함수 자체는 비동기 문자열 결과를 리턴해주는 함수이다.

근데 이제 안에서 반환을 우리가 위에서 만든 _memorizer 인스턴스로 하고 있는데 그 상수로 runOnce를 사용해주고 있다. 여기서 runOnce가 핵심인데 이 함수는 내부 비동기 함수를 단 한 번만 실행하고 이후에는 그 결과를 재사용할 수 있도록 도와주는 함수이다.

이제 그런 후 다시 안에서 문자열을 반환시켜 주면 runOnce 콜백이 종료되며 위에서 말한 동작이 실행된다.

이게 좀 이해가 안 될 수 있는데 아래를 보면 이해가 편하게 될 것이다.

// 첫 호출
await fetchOnce(); // 실행됨

// 두 번째 호출
await fetchOnce(); // 실행 안 됨, 결과만 리턴됨

이렇게 사용해 주면 앱 초기 설정에서 딱 한 번만 로딩을 해줄 수 있고 비용이 큰 작업을 반복하는 것을 방지해 주면 API  요청 시 캐싱 효과를 누릴 수 있게 되는 것이다. 꽤 괜찮지 않은가? 아래의 다트 패드를 실행해 보자. 처음에는 2초를 기다리는 동작을 하지만 두 번째 실행부터는 바로 값을 반환시킬 것이다.

 

CancelableOperation

 

이 클래스는 작업이 오래 걸릴 경우 중간에 취소할 수 있게끔 해준다. 벌써 설명만 들었는데도 엄청 유용하게 보이는 마법의 클래스이다. 이것도 마찬가지로 인스턴스를 생성해 줄 것인데 이번에는 함수 안에서 만들어보자.

먼저 실행시킬 함수를 만들어준다.

Future<void> cancellableExample() async {}

여기 안에서 사용해 줄 것이다.

final operation = CancelableOperation.fromFuture(
  Future.delayed(Duration(seconds: 5), () => print('Done')),
);

이렇게 operation 인스턴스를 만들어줬다. 이제 이걸 사용해 주면 되겠다.

await Future.delayed(Duration(seconds: 2));
operation.cancel(); // 2초 후 작업 취소

이렇게 사용을 해줬다면 이제 동작이 완성이 됐다. 현재 실행되는 동작은 이렇게 된다.

먼저 5초 후 'Done'이 출력되는 Future가 있는데 그것을 CancelableOperation으로 감싸줬다.

그런 후 2초가 지났는데도 아직 반환이 되지 않았다면 operation.cancel();가 호출되면서 취소를 시도하는 것이다.

보이는 것처럼 CancelableOperation은 Future를 감싸서 중간에 취소할 수 있도록 도움을 주는데 여기서 중요한 점이 하나 있다.

바로 기본적으로 Future본질적으로 취소할 수 없는 구조이기 때문에 CancelableOperation은 콜백 실행을 막을 수 있는 경우에만 효과적이게 되는 것이다.

근데 지금 위의 예제에서는 Future.delayed 내부의 print('Done')은 취소되지 않는다. 그 이유로는 Future.delayed 자체가 시작과 동시에 예약되기 때문인데 이걸 진짜로 사용하려면 아래와 같이 onCancel 취소 콜백을 설정해줘야 한다.

final operation = CancelableOperation.fromFuture(
  longRunningTask(),
  onCancel: () {
    print('Cancelled!');
  },
);

이것만 보면 이해가 안 될 수 있으니 아래의 다트 패드의 동작을 한번 직접 실행해 보길 바란다.

 

 

FutureGroup

 

세 번째로 알아볼 클래스는 FutureGroup이다. 이 클래스는 그래도 위 클래스들에 비해 비교적 동작이 쉬운데 그 동작은 바로 여러 비동기 작업이 모두 완료될 때까지 기다려주는 동작이다. 근데 이 동작.. 어디서 많이 봤을 것이다. 바로 기본 Future의 생성자인 Future.wait의 동작과 비슷하다. 그렇다면 이 FutureGroup을 왜 사용해줘야 할까? 그 이유는 명확하다.

FutureGroup은 단순 Future.wait()보다 조금 더 유연하고 안전한 컨트롤을 제공하기 때문으로 FutureGroup은 개별 Future를 하나씩 추가 가능하고 동작으로 추가하고 닫기(close)도 가능하다. 거기에 에러 처리 제어도 유연하게 가능하고 사용 시점도 Future.wait()은 즉시 실행하는데 반해 제어 가능하다. 암튼 봐보자.

Future<void> runAll() async {}

일단 이것도 마찬가지로 함수 내에서 사용해 주면 된다.

또한 마찬가지로 인스턴스를 생성해 주면 되는데 여기서 FutureGroup의 타입은 FutureGroup<T>이기 때문에 <String>을 넣어줬다.

final group = FutureGroup<String>();

이제 여기에서 비동기 작업들을 그룹에 추가해 주겠다.

각각 1초, 2초 후에 'A', 'B'를 반환하게 되겠다.

group.add(Future.delayed(Duration(seconds: 1), () => 'A',));
group.add(Future.delayed(Duration(seconds: 2), () => 'B',));

이렇게 add 함수로 Future를 추가가 가능하다.

근데 이제 여기서 반드시 호출해야 하는 함수가 있는데 바로 close 함수이다.

group.close(); // 더 이상 Future 추가 못하게 닫음

.close를 호출해야 group.futre가 resolve 되게 된다. (안 하면 대기 상태가 유지된다.)

그다음에 그룹에 있는 모든 Future가 완료될 때까지 기다려주고 전체 완료가 된다면 출력해 주면 동작이 완성이 되겠다,

final result = await group.future; // 모든 Future가 완료될 때까지 기다림
print(result);

이제 실행해 보면 2초 후에 result가 찍힐 것이다.

자 그렇다면 이제 이것을 어디다가 쓰냐가 문제인데 바로 사용자가 고른 항목들마다 API 요청을 보내야 할 때 사용하거나 동적으로 여러 작업을 조건에 따라 모아두고, 끝나면 한꺼번에 처리할 때 유용하게 사용되게 된다.

아래는 실시간 선택 기반으로 Future를 추가하는 동작의 예제이다.

FutureGroup.add()를 사용하여 사용자가 고른 작업만 동적으로 추가하고 위에서 언급했듯이 추가 완료 후 반드시 group.close()를 호출해야 .future가 완료된다.

 

StreamGroup

여러 스트림을 하나로 합쳐서 다룰 수 있게 해주는 클래스로 StreamGroup<T>는 여러 개의 Stream<T>를 병합해서 하나의 스트림으로 만들어주는 동작을 해준다. 이렇게 병합된 스트림은 각 스트림의 이벤트를 동시에 받을 수 있게 되는데 이러한 동작으로 병렬로 발생하는 스트림 이벤트를 한 채널에서 수신할 수 있게 된다.

바로 사용해 볼 것인데 먼저 1초마다 0,1을 내보내는 스트림을 하나 만들어 주겠다.

Stream<int> stream1() async* {
  yield* Stream.periodic(Duration(seconds: 1), (x) => x).take(2);
}

코드 설명을 하자면 Stream.periodic()을 사용하여 현재 코드에서 1초마다 값을 만들어 내고 있는데 어떠한 값을 만들고 있느냐? (x) => x가 0부터 시작하는 증가값을 생성하게 해주고 있고 .take(2)가 앞의 2개 값만 방출하게 해주고 있다. 마지막으로 yield*가 이 스트림을 위임해서 내보내주고 있다. 이게 stream1()이고 하나 더 만들어주겠다.

Stream<int> stream2() async* {
  yield* Stream.periodic(Duration(milliseconds: 700), (x) => x + 100).take(3);
}

아까랑 동작은 같다. 단지 값이 다를 뿐. 0.7초마다 100, 100, 102를 내보내는 스트림이고 (x) => x + 100으로 인하여 시작 숫자가 100부터 된 모습이다. 그 후 .take(3)가 처음 3개를 방출하게 해주고 있다.

이제 그 밑에서 StreamGroup을 사용해 줄 것이다. 먼저 여러 개의 스트림을 하나로 합칠 수 있는 그룹 함수를 생성해 준다.

Stream<int> combinedStream() {}

그 후 여타 다른 클래스들과 마찬가지로 StreamGroup도 인스턴스화 해준다.

final group = StreamGroup<int>();

그런 후 첫 번째 스트림과 두 번째 스트림을 추가해 준다.

group.add(stream1()); // 첫번 째 스트림 (0, 1)
group.add(stream2()); // 두번 째 스트림 (100, 101, 102)

이제 위에서 알아봤던 FutureGroup과 마찬가지로 .close()를 사용하여 닫아준다.

group.close();

그리고 마지막으로 병합된 스트림을 반환해 준다.

return group.stream;

이렇게 하면 5개의 값이 나올 것인데 이제 만든 이 combinedStream의 타입은 Stream<int>이기 때문에 이걸 프린트하고 싶다면 listen을 사용해서 구독해 주면 되겠다.

ElevatedButton(
  onPressed: () {
    final stream = combinedStream();
    stream.listen((event) {
      print('받은 값: $event');
    },);
  },
  child: Text('실행'),
)

이렇게 실행해 보면 두 스트림은 병렬로 실행되므로 아래와 같은 순서로 값이 나오는 것을 알 수 있다.

시간 (초0 이벤트 발생
0.7 100 from stream2
1.0 0 from stream1
1.4 101 from stream2
2.0 1 from stream1
2.1 102 from stream2

이제 이 StreamGroup을 사용하여 실시간으로 받아오는 값을 UI에 보여주는 예제를 봐보자.

StreamBuilder를 사용해서 스트림 데이터가 도착할 때마다 화면에 반영되게 하였다.

 

 

이렇게 package:async의 클래스들에 대해 간단히 알아보았다. 계속 말하지만 이 패키지 퍼블리셔가 dart.dev이다. 솔직히 이런 기능들 담고 있으면 내장 패키지로 만들어 놓지 왜 굳이 따로 빼놓은 건지 모르겠다;; 이번엔 클래스를 4개를 다뤘는데 유용한 것들 많으니 한번 공부해 보길 바란다. 도움이 되었길 바라며 마치겠다.

반응형