wanna be dev 🧑‍💻

Cool 하고 Sick한 개발자가 되고 싶은 uzun입니다

A.K.A. Kick-snare, hyjhyj0901, h_uz99 solvedac-logo

Android/Study

🌊 Flow Api 안드로이드 라이브러리 파헤치기

Kick_snare 2022. 8. 20. 20:37
728x90

Flow를 소개하기 전에

Undirectional Data Flow

https://uzun.dev/132

 

안드로이드 단방향 데이터 플로우 Undirectional Data Flow (UDF)

단방향 Data Flow란 안드로이드 앱에서 상태는 사용자 이벤트에 대한 응답으로 업데이트가 이루어진다. Event : 사용자 또는 프로그램의 다른 부분에 의해 생성됨 Update State : 이벤트 핸들러가 UI 에

uzun.dev

 

단방향 데이터 흐름 패턴을 모른다면 위 포스팅을 먼저 읽고 가면 도움이 될 것이다.

💡 Live Data (AAC)란?
View가 ViewModel을 관찰할 때, 그 관찰 대상이 되는 데이터 홀더 클래스 Activity, Fragment 및 Services와 같은 구성요소에서 관찰할 수 있는 데이터 집합을 보유 수명주기를 인식하여 활성화 시에만 데이터를 게시하여 메모리 누수를 방지

💡 LiveData VS Flow
LiveData 는 UI 에 밀접하게 연관되어 있기 때문에 Data Layer 에서 비동기 방식으로 데이터를 처리하기에 자연스러운 방법이 없다. 또한 LiveData 는 안드로이드 플랫폼에 속해 있기 때문에 순수 Java / Kotlin 을 사용해야 하는 Domain Layer 에서 사용하기에 적합하지 않다. 반면 Flow 는 스스로 안드로이드 생명주기에 대해 알지 못함. 그래서 라이프사이클에 따른 중지나 재개가 어렵다.

Flow란 무엇인가?

Flow란 무엇일까? Flow는 코루틴의 데이터 스트림이다.

코루틴 상에서 Reactive 프로그래밍을 지원하기 위한 구성요소이다.

💡 Reactive 리액티브 프로그래밍이란?
데이터가 변경될 때 이벤트를 발생시켜 데이터를 계속해서 전달한다.
명령형 : 사용자는 데이터를 요청하고 일회성으로 결과값을 수신데이터가 필요할 때 마다 결과값을 요청해야함 → 비효율적
반응형 : 데이터를 발행하는 주체가 있고 소비자는 구독을 함발행자는 새로운 데이터가 들어오면 소비자에서 지속적으로 발행

Flow

  • 코루틴 flow는 코루틴 상에서 리액티브 프로그래밍을 지원하기 위해 만들어진 구현체
  • 코루틴에서 데이터 스트림을 구현하기 위해서는 Flow를 사용해야한다

Flow의 사용을 알아보기 위해 데이터 스트림을 살펴보자. 위 데이터 스트림은 아래 세가지 구성요소를 가진다.

  • Producer (생산자)
  • Intermediary (중간 연산자)
  • Consumer (소비자)

데이터 스트림의 구성요소가 어떻게 작동하는 지 알아보면서 Flow의 사용법을 알아보자

DataStream : Producer

생산자는 데이터를 발행한다. 주로 Local 또는 Remote의 DataSource에서 데이터를 가져온다.

Flow에서의 Producer는 emit()을 통해 데이터를 생성한다.

class RemoteDataSource(
        private val remoteApi: RemoteApi
) {
        // 먼저 flow scope를 선언
        fun getObjectFlow(): Flow<List<Object>> = flow {
                while(true) {
                        val objs = remoteApi.fetchLastedObject() // remote 서버로 부터 데이터를 받아옴
                        emit(objs)   // emit으로 데이터를 발행
                        delay(60000) // 60초 마다 반복
                }
        }
}

DataStream : Intermediary

생산자가 데이터를 생성했으면 중간 연산자는 생성된 데이터를 수정한다.

여기서 생성자가 A라는 객체의 데이터를 발행했지만 B라는 객체 데이터가 필요한 경우 Flow에서 지원하는 중간 연산자를 이용해 A객체를 B객체로 바꿀 수 있다.

  • map, filter ,onEach
class ObjectRepository(
        private val objectRemoteDataSorce: ObjectRemoteDataSorce
) {
        fun getObjectOfViewItem(locale : Locale) =
                objectRemoteDataSorce.getObjectFlow().map{ it.filter (this.prop == prop) }

View에 모든 처리가 완료된 가공된 데이터만을 전달하는 것이 좋다.

이를 위해 Intermediary에서 전달하기 위해 데이터를 가공한다.

DataStream : Consumer

중간연산자가 생산자가 생성한 데이터를 가공하여 소비자로 데이터를 전달한다.

안드로이드에서 소비자라 함은 UI 구성요소를 생각하면 된다.

Flow에서는 collect를 이용해 전달된 데이터를 소비할 수 있다.

class ObjectViewModel(
        private val objectRepository: ObjectRepository
) : ViewModel() {
        fun collectObjectOf(prop: Prop) =
                viewModelScope.launch {
                        dustRepository.getObjectFlow().collect { obj ->
                                text = obj.prop ...
                        }
                }
        }
}

받은 object 데이터를 이용하여 viewModel 에서 필요한 처리를 하고, View에서 사용하면 된다.

StateFlow

Flow 의 한계

Flow 는 데이터의 흐름이다. Flow는 데이터의 흐름을 발생시키지만, 데이터를 저장할 수 없다.

따라서 flow만을 이용해 UI state를 업데이트하기 위해서는 collect한 데이터를 viewModel에 저장해놓는 방법이 있을 것이다.

flow는 데이터를 구독하고 데이터 홀더 변수는 flow에서 마지막으로 발행한 데이터를 저장하고 있으면 된다. 따라서 UI는 flow에서 값을 발행하기 전에는 데이터 홀더 변수의 데이터를 사용하면 된다.

하지만 이는 보일러 플레이트 코드를 만들어내므로 가독성을 저해한다.

StateFlow

이를 위한 해결책 StateFlow

StateFlow는 flow의 데이터 홀더 역할을 하면서 flow의 데이터 스트림 역할 까지 하는 1+1 상품

  • 현재 상태와 새로운 상태 업데이트를 collector에 내보낼 수 있는 obseverable flow
  • value 프로퍼티로 현재 상태의 값을 읽어서 사용할 수 있다.

Recompose 이 일어날 때 UI는 StateFlow만 구독하고 있으면 OK!

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                _uiState.value = LatestNewsUiState.Success(favoriteNews)
            }
        }
    }
}

sealed class LatestNewsUiState {
    data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(exception: Throwable): LatestNewsUiState()
}

MutableStateFlow 클래스에 새 값을 할당하여 초기화하여 사용가능하다.

StateFlow는 Hot flow이다.

  • flow에서 collect해도 생산자 코드가 트리거 되지 않음
  • 항상 활성 상태이고 메모리 내에 존재
💡
Cold Flow (Stream) : 소비할 때 마다 flow 블록이 재실행된다
 Hot Flow (Stream) : 소비하더라도 flow 블록이 호출되지 않는다
class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                latestNewsViewModel.uiState.collect { uiState ->
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

View는 viewModel에서 stateFlow를 받아와 수신 대기함

uiState의 value에 따라 UI를 업데이트한다!!

StateIn

우리는 Reactive 프로그래밍을 할 때 여러 데이터 흐름을 하나로 합쳐 하나의 흐름으로 만들어 낸다.

EX) 영화 평점 앱에서 영화정보, 사용자정보, 사용자영화평점을 가져와 하나의 객체로 flow 해야함

하나로 만들어진 Flow는 UI에서 사용되기 위해 StateFlow로 변환되어야 한다.

그렇다면 이 UI에서는 항상 최신 data를 발행 받게 된다.

  • Flow를 StateFlow로 변환
  • StateFlow를 항상 구독하고 있으면 메모리 누수가 생김
    • 이 StateFlow가 살아있어야 하는 CoroutineScope를 명시

How?

StateIn()

fun <T> Flow<T>.stateIn(scope: CoroutineScope,
                        started: SharingStarted,
                        initialValue: T): StateFlow<T>

를 이용하면 Flow를 StateFlow로 변환할 수 있다.

  • scope : StateFlow가 Flow로부터 데이터를 구독받을 CoroutineScope를 명시
  • started : Flow로부터 언제부터 구독을 할지 명시
  • initialValue : StateFlow에 저장될 초기값을 설정
// 1초 마다 string을 발행하는 simple Flow
val stringFlow: Flow<String> = flow {
        for(i in 0..1000) {
                emit("integer: $i")
                delay(1000)
        }
}

// stringFlow를 stateIn 함수를 통해 StateFlow로 변환
val stateFlow = stringFlow.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubScribed(5000), // collector가 없어지고 5초에 멈춤
        initialValue = "integer 0"
)

started의 WhileSubscribed 는 collecter가 없어졌을 때 지정된 시간 이후 StateFlow가 동작을 멈추도록 만드는 값이다.

stateFlow 는 초기값이 “interger 0”이고 구독 후 5초 후 에 처음 발행 받으며 ViewModel의 생명주기만큼 구독을 지속하는 StateFlow임

SharedFlow

Event

어플리케이션은 정의된 상태들을 전환해 가면서 실행된다. 이러한 상태와 함께 개발할 때 고려해야하는 다른 한가지는 사용자 이벤트이다.

상태는 늘 기본 값을 가지지만 이벤트는 기본 값없이 특정 상황이 발생하였을 때 구독자들에서 이벤트가 발생한 상황을 전달한다. 이벤트는 상태와 달리 가장 최근 값이 아니라, 구독 이후 발생한 값을 받아야한다.

Codes

@HiltViewModel
class MainViewModel(
    @Assisted private val savedStateHandle: SavedStateHandle,
) : ViewModel() {
    private val _systemEvent: MutableSharedFlow<Unit> =
        MutableSharedFlow(
                        replay = 0, 
                        extraBufferCapacity = 1, 
                        onBufferOverflow = BufferOverflow.DROP_OLDEST
                )
    val systemEvent = _systemEvent.asSharedFlow()

    init {
        viewModelScope.launch {
            systemEvent.collect { systemEvent ->
                when(systemEvent) {
                    is SystemEvent.MemoryWarning -> { showMemoryWarning() }
                    is SystemEvent.StorageWarning -> { showStorageWarning() }
                    else -> { showElse() }
                }
            }
        }
    }

    fun reportSystemEvent(systemEvent: SystemEvent) {
        _systemEvent.emit(systemEvent)
    }
}

전체적인 구조는 StateFlow를 사용할 때와 유사하다.

MutableSharedFlow 생성 시 몇 가지 파라미터를 전달하여 동작을 재정의 한다.

  • replay : 새로운 구독자에게 이전 이벤트를 몇 개 전달할 것 인지
  • extraBufferCapacity : 추가 버퍼를 몇 개 생성할 것 인지
  • onBufferOverflow : 버퍼가 가득 찼을 때의 프로토콜

Cases

CleanArchitecture, MVVM, JetPackCompose, Dagger Hilt 등을 이용한 NoteApp의 코드를 살펴보자

How to Make a Clean Architecture Note App (MVVM / CRUD / Jetpack Compose) - Android Studio Tutorial

AddEditNote는 새로운 노트를 추가하는 뷰이다. 이 중에서 노트를 edit하고 저장하는 이벤트인 AddEditNoteEvent.SaveNote 의 경우를 처리하는 부분을 보자

@HiltViewModel
class AddEditNoteViewModel @Inject constructor(
    private val noteUseCases: NoteUseCases,
    savedStateHandle: SavedStateHandle
) : ViewModel() {

        **private val _eventFlow = MutableSharedFlow<UiEvent>()
            val eventFlow = _eventFlow.asSharedFlow()**

        fun onEvent(event: AddEditNoteEvent) {
        when(event) {
            is AddEditNoteEvent.SaveNote -> {
                viewModelScope.launch {
                    try {
                        noteUseCases.addNote(
                            Note(
                                title = noteTitle.value.text,
                                content = noteContent.value.text,
                                timestamp = System.currentTimeMillis(),
                                color = noteColor.value,
                                id = currentNoteId
                            )
                        )
                        **_eventFlow.emit(**UiEvent.SaveNote**)**

                    } catch (e: InvalidNoteException) {
                        **_eventFlow.emit(**
                            UiEvent.ShowSnackbar(
                                message = e.message ?: "Couldn't save note"
                            )
                        **)**
                    }
                }
            }
        }
    }

    sealed class UiEvent {
        data class ShowSnackbar(val message: String): UiEvent()
        object SaveNote: UiEvent()
    }
}

ViewModel에서 eventFlow를 sharedFlow로 만들어 발행하는 모습을 볼 수 있다.

UIEvent에서 처리 결과에 따라 에러 메세지 스낵바를 보여주는 이벤트 또는 정상 처리된 이벤트가 발행된다.

다음 코드는 AddEditNote의 composable UI 부분이다.

LaunchedEffect(key1 = true) {
        viewModel.eventFlow.collectLatest { event ->
            when(event) {
                is AddEditNoteViewModel.UiEvent.ShowSnackbar -> {
                    scaffoldState.snackbarHostState.showSnackbar(
                        message = event.message
                    )
                }
                is AddEditNoteViewModel.UiEvent.SaveNote -> {
                    navController.navigateUp()
                }
            }
        }
    }

collectLastest 라는 소비방법으로 발행된 이벤트에 따라 다른 결과를 처리하고 있다!!!

GOOOOOOOD

SharedIn

마찬가지로 Flow를 SharedFlow로 변환할 수 있다.

StateIn()

fun <T> Flow<T>.shareIn(scope: CoroutineScope,
                        started: SharingStarted,
                        replay: Int = 0): SharedFlow<T>

Flow, SharedFlow, StateFlow

지금 까지 살펴본 Flow들은 다음과 같은 계층 구조를 가진다

Flow ← SharedFlow ← StateFlow

StateFlow 를 상속하는 StateFlow는 추가로 기본 값을 가지고 있으며, replaceCache는 가장 최근 값 하나를 갖는 리스트로 재정의 한다.

이러한 Hot Flow들 중에서 주로 이벤트의 경우 SharedFlow, 상태의 경우 StateFlow를 사용한다

Collect

Flow는 데이터를 발행시키고,

이는 collect의 action 파라미터에 의해 소비된다.

public suspend inline fun<T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = 
        collect(object : FlowCollector<T> {
                override suspend fun emit(value: T) = action(value)
        }
)

action은 flow에서 발행된 데이터를 순차적으로 받아 suspend fun을 수행한다.

💡 suspend funtion 이 뭐더라?
  • 시작하고, 멈추고, 다시 시작할 수 있는 함수
  • suspend란 비동기 실행을 위한 중단 지점
  • thread를 block하지 않고 실행 → 하나의 thread에서 여러 개의 코루틴을 실행 가능

하지만 무지성으로 flow emit, collect만을 사용한다면 여러가지 애로사항이 발생한다.

이를 해결하기 위한 여러가지 방법들을 알아보자

solution : Buffer

flow의 collect를 사용하면 하나의 코루틴에서 발행과 소비가 같이 일어난다.

데이터가 발행된 후 소비가 끝나고 나서 다시 다음 데이터가 발행된다. 즉 발행과 소비가 순차적으로 일어남

발행의 지연이 소비에 지연을 일으킨다 ⇒ 비효율적

소비와 발행의 코루틴을 분리함으로써 이를 해결

class MainViewModel: ViewModel() {
        val flow = flow<Int> {
                for(i in 0..10) {
                        emit(i)
                        delay(1000)
                }
        }

        init {
                viewModelScope.launch {
                        flow.onEach { 
                                println("number >> emit $it") 
                        }
                        .buffer().collect {
                                delay(3000)
                                println("number >> consume $it")
                        }
                }
        }
}
/* 
[ console ]
--------------------------------
number >> emit 0
number >> emit 1
number >> emit 2
number >> consume 0
number >> emit 3
number >> emit 4
number >> emit 5
number >> consume 1
...
--------------------------------
*/

flow의 발행부와 소비부 양쪽에 지연이 생길 때 buffer을 이용하여 최적화 시킬 수 있다.

solution : CollectLast

flow를 사용할 때 잘못하면 데이터 처리에 문제가 생길 수 있다.

flow에서 특정 데이터를 처리하는데 많은 시간이 걸리는 경우 위 그림의 데이터 3이 UI에 표시되는데 지연이 발생

해결법 중 한 가지로 최신 데이터가 들어오면 이전 데이터를 수행하던 suspend fun를 취소하고 새로운 데이터로 수행하도록 함. 이것이 바로 collectLastest

collectLast 는 항상 최신 데이터를 소비한다. collect도중 지연되더라도 새로운 데이터가 발행되면 이를 취소하고 새롭게 최신 데이터를 받아온다. UI를 구성하는데는 GOOOOD

하지만 collectLast 또한 한계가 존재한다.

만약 suspend fun이 수행하는 시간이 데이터 발행 간격시간 보다 길다면?

→ 새로 데이터가 발행되는 동안은 중간 데이터를 하나도 받지 못한다. 계속해서 취소하고 새로운 데이터를 처리하기 때문. 마지막 데이터만을 소비하게 될 것이다.

solution : Confate

collectLast 의 문제점을 해결하는 방법은 한번 시작된 데이터 소비는 끝날 때 까지 하고, 끝난 시점에서의 가장 최신의 데이터를 다시 소비하는 것.

conflate는 한 번 시작된 suspend fun이 최소되지 않고 끝까지 수행되며, 데이터 소비 완료 시점에서 가장 최신 데이터가 다시 소비되도록 한다.

class MainViewModel: ViewModel() {
        val flow = flow<Int> {
                for(i in 0..10) {
                        emit(i)
                        delay(1000)
                }
        }

        init {
                viewModelScope.launch {
                        flow.onEach { 
                                println("number >> emit $it") 
                        }
                        .conflate().collect {
                                delay(3000)
                                println("number >> consume $it")
                        }
                }
        }
}
/* 
[ console ]
--------------------------------
number >> emit 0
number >> emit 1
number >> consume 0
number >> emit 2
number >> emit 3
number >> emit 4
number >> consume 3
number >> emit 5
...
--------------------------------
*/

중간에 발행된 데이터 1, 2는 건너뛰어지고 3이 소비되는 것을 확인 할 수 있다.

이러한 방법을 이용하면 suspend fun 이 수행하는 시간이 오래 잡아먹어도 계속해서 최신 데이터를 발행받을 수 있다.

Flattening Operator

코드 상에서 데이터 파이프라인은 그 자체로 사용되는 경우는 거의 없으며 보통 다른 데이터 파이프라인들과 합쳐져 하나의 파이프라인을 완성하는 경우가 많다. flow 또한 여러 flow가 합쳐져 하나의 flow로 만들어지기 위한 연산자를 제공한다.

이를 Flattening Operator라고 한다. 밑에서 여러 종류의 연산자를 살펴보도록 하자.

FlatMapConcat

flatMapConcat은 여러 flow를 연결하는 연산자이다.

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

이 연산자는 두 가지 과정을 통해 수행된다.

  1. transform 변수에 대한 map을 수행하여 새로운 flow를 만들어 낸다.
  2. flattenConcat 을 통해 하나의 flow로 합쳐진다
val _flow = flow {
    emit(1)
    emit(5)
}

fun increaseStock() {
    lifecycleScope.launch {
        _flow.flatMapConcat { value -> 
                        flow { 
                                emit(value * 2)
                                emit(value * 3) 
                        }
                }.collect { println("number: $it") }
    }
}
// number: 2
// number: 3
// number: 10
// number: 15

_flow에서 발행된 value로 만들어진 새로운 두개의 flow가 하나로 합쳐진 것을 확인할 수 있다.

flatMapConcat은 순차적으로 처리되어 새로운 flow를 만들어 낸다. 이로인해 오래 걸리는 연산이 변환 값으로 들어올 경우 발행 시점과 변환되기 전 데이터의 발행 시점 사이에 큰 갭이 발생할 수 있다.

FlatMapLastest

flatMapLastest은 flow의 최신 데이터만을 이용해 새로운 flow로 변환한다.

mapping도중 새롭게 발행된 데이터가 들어오면 로직을 취소하고 최신의 데이터로 변환을 수행한다.

collectLastest 와 유사한 개념.

FlatMapMerge

flatMapMerge은 앞선 두 방법과 다르게 변환을 병렬로 수행한다.

들어오는 데이터들을 동시에 수집한 후 수집한 값들이 가능한 빨리 방출될 수 있도록 병렬로 처리되어야 할 때가 있다.

EX) 비용 계산을 위해 수십개의 지출 데이터를 취합하여 합치는 작업을 할 경우 병렬로 계산되어도 됨

val _flow = flow {
    emit(1)
    emit(5)
}

fun increaseStock() {
    lifecycleScope.launch {
        _flow.flatMapMerge { value -> 
                        flow { 
                                emit(value * 2)
                                delay(1000)
                                emit(value * 3) 
                        }
                }.collect { println("number: $it") }
    }
}
// number: 2
// number: 10
// number: 3
// number: 15

두 번째 발행 데이터인 5는 첫 번째 데이터인 1의 변환이 완료되기 전에 변환을 시작한다.

순차 처리는 리소스를 최대한 활용하기 어려워 시간이 오래걸린다.

순서가 중요하지 않은 변환에서는 flatMapMerge를 사용하면 연산속도를 빠르게 할 수 있다.

참고

'Coroutines/Flow' 카테고리의 글 목록

첨부 된 이미지는 출처는 kotlin world 님의 블로그입니다.

 

'Coroutines/Flow' 카테고리의 글 목록

Kotlin World는 Kotlin, Android, Spring 등을 다루는 기술 블로그입니다.

kotlinworld.com

Composing suspending functions | Kotlin

 

Composing suspending functions | Kotlin

 

kotlinlang.org

shareIn

 

shareIn

Converts a coldFlow into a hotSharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subs

kotlinlang.org

StateFlow 와 SharedFlow

 

StateFlow 와 SharedFlow

코루틴 공식 가이드 읽기 Part 9 — Dive1

myungpyo.medium.com

[Coroutine] State flow vs Shared flow with case study

 

[Coroutine] State flow vs Shared flow with case study

Flow를 사용하면서 유용하게 사용할 수 있는 state flow와 shared flow가 다른 점과 각각 어떤 상황에서 적합한지를 알기 위하여 두 개의 특성을 비교하려고 합니다. Flow builder로 생성한 flow들은 기본적

tourspace.tistory.com

flatMapConcat을 활용하여 Flow를 다른 Flow로 변환

 

flatMapConcat을 활용하여 Flow를 다른 Flow로 변환

아래 내용은 모두 해당 원글을 기반으로 요약정리한 내용입니다. 그림은 저자가 직접 제작한 것임을 알려드립니다. (출처: Kt World) Flow는 여러 Flow를 합쳐 하나의 Flow로 변환할 수 있도록 돕는 F

juyeop.tistory.com

 

 

728x90