Implementation of Reactive Extensions, supporting multiple platforms, schedulers, and interoperability with coroutines and RxJava. Offers `Observable`, `Maybe`, `Single`, `Completable`, various subjects, and sophisticated subscription management through `DisposableScope`.
val defaultScheduler = Dispatchers.Default.asScheduler()
val computationDispatcher = computationScheduler.asCoroutineDispatcher()
Subscription management with DisposableScope
Reaktive provides an easy way to manage subscriptions: DisposableScope.
Take a look at the following examples:
val scope =
disposableScope {
observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed
doOnDispose {
// Will be called when the scope is disposed
}
someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
}
// At some point later
scope.dispose()
Reaktive provides Plugin API, something similar to RxJava plugins. The Plugin API provides a way to decorate Reaktive sources. A plugin should implement the ReaktivePlugin interface, and can be registered using the registerReaktivePlugin function and unregistered using the unregisterReaktivePlugin function.
Convert suspend functions to/from Single, Maybe and Completable
Convert Flow to/from Observable
Convert CoroutineContext to Scheduler
Convert Scheduler to CoroutineDispatcher
Interoperability with RxJava2 and RxJava3
Conversion of sources and schedulers between Reaktive and RxJava
Reaktive 1.x and the old (strict) memory model
The old (strict) Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed.
Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:
Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and
are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:
Any callback (and any captured objects) submitted to a Scheduler will be frozen
subscribeOn freezes both its upstream source and downstream observer,
all the Disposables (upstream's and downstream's) are frozen as well,
all the values (including errors) are not frozen by the operator
observeOn freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables,
upstream source is not frozen by the operator
Other operators that use scheduler (like debounce, timer, delay, etc.) behave same as observeOn in most of the cases
Thread local tricks to avoid freezing
Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI.
Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:
Use threadLocal operator:
val values = mutableListOf<Any>()
var isFinished = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.threadLocal()
.doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list
.doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
.subscribe()
Set isThreadLocal flag to true in subscribe operator:
val values = mutableListOf<Any>()
var isComplete = false
observable<Any> { emitter ->
// Background job
}
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.subscribe(
isThreadLocal = true,
onNext = { values += it }, // Callback is not frozen, we can update the mutable list
onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
)
In both cases subscription (subscribe call) must be performed on the Main thread.
classMyPresenter(
privateval view: MyView,
privateval longRunningAction: Completable
) : DisposableScope by DisposableScope() {
init {
doOnDispose {
// Will be called when the presenter is disposed
}
}
funload() {
view.showProgressBar()
// Subscription will be disposed when the presenter is disposed
longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
}
}
classMyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
overridefunonCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
MyPresenter(...).scope()
}
overridefunonDestroy() {
dispose()
super.onDestroy()
}
}