netflow
0.6.0indexedLightweight, flexible network library offering a clean, intuitive API for handling network requests with support for LiveData, Flow, object deserialization, customizable headers, and local data integration.
Lightweight, flexible network library offering a clean, intuitive API for handling network requests with support for LiveData, Flow, object deserialization, customizable headers, and local data integration.
A lightweight networking library for Kotlin Multiplatform that provides a simple API for Flow and direct suspending calls with optional Jetpack Paging 3 support.
dependencies {
implementation("io.github.kmpbits:netflow-core:<latest_version>")
}
Adds responsePaginated with Jetpack Paging 3 support.
dependencies {
implementation("io.github.kmpbits:netflow-core:<latest_version>")
implementation("io.github.kmpbits:netflow-paging:<latest_version>")
}
Check the latest versions on Maven Central.
val client = netFlowClient {
baseUrl = "https://api.example.com"
header(Header(HttpHeader.custom("custom-header"), "value"))
header(Header(HttpHeader.CONTENT_TYPE), "application/json")
}
val response = client.call {
path = "/users"
method = HttpMethod.Get
}.response()
val user: User = client.call {
path = "/users/1"
}.responseToModel<User>()
When your DTO and domain model are the same type, pass only one type parameter:
val flow = client.call {
path = "/users/1"
}.responseFlow<UserDto>()
When ApiType and DisplayType differ, pass transform as the first argument. The compiler enforces this — forgetting it is a build error, not a runtime crash.
val flow = client.call {
path = "/users/1"
}.responseFlow<UserDto, User>(transform = { it.toModel() })
val usersFlow = client.call {
path = "/users"
method = HttpMethod.Get
}.responseFlow<UserDto, User>(transform = { it.toModel() }) {
onNetworkSuccess { dto ->
queries.insertUser(dto.toEntity())
}
local({ observe { queries.getUser() } }, transform = { it.toModel() })
}
The transform inside local() maps from the database entity type directly to DisplayType — it drives what gets shown while the network call is in flight. The transform on the function maps the network ApiType to DisplayType once the response arrives.
local({
onlyLocalCall = true
call { queries.getAllUsers() }
}, transform = { it.toModel() })
For APIs that return { "data": { ... } } instead of a plain object:
// Same type
responseWrappedFlow<UserDto>()
// Different types
responseWrappedFlow<UserDto, User>(transform = { it.toModel() })
Or set wrappedResponse = true inside the builder when using responseFlow:
responseFlow<UserDto, User>(transform = { it.toModel() }) {
wrappedResponse = true
}
// Same type
responseListFlow<UserDto>()
responseWrappedListFlow<UserDto>()
// Different types
responseListFlow<UserDto, User>(transform = { it.toModel() })
responseWrappedListFlow<UserDto, User>(transform = { it.toModel() })
lifecycleScope.launch {
usersFlow.collectLatest { state ->
when (state) {
is ResultState.Loading -> showLoading()
is ResultState.Success -> showUsers(state.data)
is ResultState.Error -> showError(state.error.message)
}
}
}
For one-shot suspending calls (no observation needed).
suspend fun deleteUser(id: Int): AsyncState<Unit> {
return client.call {
path = "users/$id"
method = HttpMethod.Delete
}.responseAsync<Unit> {
onNetworkSuccess { queries.deleteUser(id) }
}
}
suspend fun getUser(id: Int): AsyncState<User> {
return client.call {
path = "users/$id"
}.responseAsync<UserDto, User>(transform = { it.toModel() })
}
// Same type
responseListAsync<UserDto>()
responseWrappedListAsync<UserDto>()
// Different types
responseListAsync<UserDto, User>(transform = { it.toModel() })
responseWrappedListAsync<UserDto, User>(transform = { it.toModel() })
// Same type
responseWrappedAsync<UserDto>()
// Different types
responseWrappedAsync<UserDto, User>(transform = { it.toModel() })
responsePaginated integrates Jetpack Paging 3, supporting both network-only and remote+local strategies.
Your API response model must implement PagingModel:
@Serializable
data class PostDto(
val id: Int,
val title: String,
override var page: Int = 0,
override var lastUpdatedTimestamp: Long = 0L
) : PagingModel()
fun getPosts(): Flow<PagingData<Post>> = client.call {
path = "/posts"
}.responsePaginated<PostDto, Post> {
onlyApiCall = true
networkTransform { it.toModel() }
}
There are two ways to configure the local data source.
localQuery (recommended, no custom PagingSource needed)Pass countQuery, itemsQuery, and an invalidation flow. The library creates and manages the PagingSource internally. The invalidation flow triggers a reload whenever the underlying data changes — SQLDelight users pass query.asFlow(), Room users pass their Flow<List<T>>.
fun getPosts(): Flow<PagingData<Post>> = client.call {
path = "/posts"
}.responsePaginated<PostDto, Post> {
localQuery(
countQuery = { database.postQueries.countPosts().executeAsOne() },
itemsQuery = { limit, offset -> database.postQueries.selectPosts(limit, offset).executeAsList() },
invalidation = database.postQueries.selectAllPosts().asFlow(),
transform = { it.toModel() }
)
deleteOnRefresh = false
insertAll(transform = { it.toEntity() }) { posts ->
database.postQueries.transaction {
database.postQueries.deleteAll()
posts.forEach { database.postQueries.insertPost(it) }
}
}
firstItemDatabase(
itemDatabase = { database.postQueries.getFirstPost().executeAsOneOrNull() },
timestamp = { it.lastUpdatedTimestamp }
)
}
localSource / localSourceLong (custom PagingSource)Use this when you need full control over how data is loaded locally. You provide your own PagingSource<Int, E> (or PagingSource<Long, E> via localSourceLong).
Then wire it up:
fun getPosts(): Flow<PagingData<Post>> = client.call {
path = "/posts"
}.responsePaginated<PostDto, Post> {
localSource(
pagingSource = { PostPagingSource(database) },
transform = { it.toModel() }
)
// ...
}
For SQLDelight sources that use Long keys (e.g. QueryPagingSource), use localSourceLong instead — keys are bridged to Int internally.
When using a custom PagingSource (Option B), it is critical to register a listener on your database query to trigger invalidation. Without this, the UI will not update when data changes (e.g., after a network refresh or a local deletion).
If you are using SQLDelight, follow the pattern in the example above (and in the sample app's TodoPagingSource):
Query.Listener that calls invalidate() and removes itself.init.This ensures that whenever the underlying data changes, the PagingSource is marked as invalid, and the Pager will create a new one and reload the data.
val posts = repository.getPosts().cachedIn(viewModelScope)
val posts = viewModel.posts.collectAsLazyPagingItems()
LazyColumn {
items(count = posts.itemCount, key = posts.itemKey { it.id }) { index ->
posts[index]?.let { PostItem(it) }
}
}
netflow-paging ships PagingCollectionViewController — a KMP class that bridges paging data to Swift. It is designed to be used with SKIE for async sequence support.
ViewModel (Swift)
View (SwiftUI)
MockNetFlowClient implements NetFlowClient and intercepts all requests without making any real network calls. It supports response delays, request recording, and assertion helpers.
val mockClient = MockNetFlowClient { request ->
when {
request.path == "posts" && request.method == HttpMethod.Get ->
NetFlowMockResponse.success("""[{"id":1,"title":"Hello","completed":false}]""")
request.path.startsWith("posts/") && request.method == HttpMethod.Delete ->
NetFlowMockResponse.success()
request.path == "posts" && request.method == HttpMethod.Post ->
NetFlowMockResponse.success("""{"id":101,"title":"New Post","completed":false}""")
else -> NetFlowMockResponse.notFound()
}
}
NetFlowMockResponse.success(
body = """[...]""",
delay = 2.seconds // simulates slow network
)
NetFlowMockResponse.error(code = 401, errorBody = "Unauthorized")
NetFlowMockResponse.serverError("Something went wrong")
NetFlowMockResponse.notFound()
// Called at least once
mockClient.assertCalled("posts", HttpMethod.Get)
// Called exactly N times
mockClient.assertCalledTimes("posts/1", HttpMethod.Delete, times = 1)
// Never called
mockClient.assertNotCalled("posts", HttpMethod.Post)
val request = mockClient.recordedRequests.first()
assertEquals(HttpMethod.Post, request.method)
assertEquals(mapOf("title" to "New Post"), request.body)
mockClient.clearRecordedRequests()
@Test
fun `delete removes item from local database`() = runTest {
val mockClient = MockNetFlowClient { _ -> NetFlowMockResponse.success() }
val repository = PostRepositoryImpl(mockClient, database)
repository.deletePost(id = 1)
mockClient.assertCalled("posts/1", HttpMethod.Delete)
}
All helpers accept an optional delay: Duration parameter.
client.call {
path = "/secure-endpoint"
header(Header(HttpHeader.custom("Authorization"), "Bearer $token"))
}.responseFlow<SecureDataDto, SecureData>(transform = { it.toModel() })
client.call {
path = "/users"
parameter("role" to "admin")
parameter("active" to true)
}.responseFlow<UserDto, User>(transform = { it.toModel() })
client.call {
path = "/unstable-endpoint"
retry {
times = RetryTimes.THREE
delay = 1.seconds
retryOn = { it is IOException }
}
}.responseFlow<DataDto, Data>(transform = { it.toModel() })
responseToModel is the only extension that throws — all other extensions return a sealed state.
try {
val response = client.call {
path = "/might-fail"
}.responseToModel<Data>()
} catch (e: NetFlowException) {
when (e) {
is NetworkException -> { /* handle network issues */ }
is SerializationException -> { /* handle parsing errors */ }
is HttpException -> {
val code = e.code
val errorBody = e.errorBody
}
}
}
single {
netFlowClient {
baseUrl = "https://api.example.com"
}
}
netflow-core and netflow-paging modulesThis project is licensed under the Apache License, Version 2.0.
netflow-paging)ApiType) from display type (DisplayType) — no trailing .map neededwrappedResponse flag for APIs that return { "data": ... } envelopesMockNetFlowClient for testing — no real network calls, with response delays and request historyPagingCollectionViewControllerclass PostPagingSource(private val database: AppDatabase) : PagingSource<Int, PostEntity>() {
private val query = database.postQueries.selectPosts()
private val listener = object : Query.Listener {
override fun queryResultsChanged() {
invalidate()
query.removeListener(this)
}
}
init {
query.addListener(listener)
}
override fun getRefreshKey(state: PagingState<Int, PostEntity>): Int? {
return state.anchorPosition?.let { anchor ->
state.closestPageToPosition(anchor)?.prevKey?.plus(1)
?: state.closestPageToPosition(anchor)?.nextKey?.minus(1)
}
}
override suspend fun load(params: LoadParams<Int>): LoadResult<Int, PostEntity> {
// your load implementation
}
}
| Property | Default | Description |
|---|
defaultPageSize | 20 | Items loaded per page |
pageQueryName | "page" | URL query parameter name for the page number |
onlyApiCall | false | true for network-only (no local DB) |
wrappedResponse | false | true when API returns { "data": [...] } |
deleteOnRefresh | true | Clear local DB before inserting on REFRESH. Set to false when handling delete inside insertAll |
refresh | false | Force refresh on start, ignoring cache timeout |
cacheTimeout | 1 hour | How long before re-fetching from the network |
import netflowCore // or your KMP framework name
final class PostListViewModel: ObservableObject {
private let viewModel = // your KMP ViewModel from DI
private(set) var posts: [Post] = []
private(set) var isLoading: Bool = false
private let delegate = PagingCollectionViewController<Post>()
init() {
observeData()
observeLoadStates()
observePagingData()
}
func loadNextPage() { delegate.loadNextPage() }
private func observePagingData() {
Task {
for await pagingData in viewModel.posts {
delegate.submitData(pagingData: pagingData)
}
}
}
private func observeData() {
Task {
for await _ in delegate.onPagesUpdatedFlow {
self.posts = delegate.getItems()
self.isLoading = false
}
}
}
private func observeLoadStates() {
Task {
for await loadState in delegate.loadStateFlow {
guard let loadState else { continue }
switch loadState.refresh {
case _ as Paging_commonLoadStateLoading:
self.isLoading = true
default:
self.isLoading = false
}
}
}
}
deinit { delegate.clearScope() }
}
struct PostListView: View {
private var viewModel = PostListViewModel()
var body: some View {
NavigationStack {
Group {
if viewModel.isLoading && viewModel.posts.isEmpty {
ProgressView()
.frame(maxWidth: .infinity, maxHeight: .infinity)
} else {
List {
ForEach(viewModel.posts, id: \.id) { post in
PostItemView(post: post)
.onAppear {
if post.id == viewModel.posts.last?.id {
viewModel.loadNextPage()
}
}
}
}
.listStyle(.plain)
}
}
.navigationTitle("Posts")
}
}
}
| Helper | Code | Description |
|---|
NetFlowMockResponse.success(body) | 200 | Successful response with optional body |
NetFlowMockResponse.error(code, errorBody) | custom | Client error |
NetFlowMockResponse.notFound() | 404 | Not found |
NetFlowMockResponse.serverError(errorBody) | 500 | Server error |
Surfaced from shared tags and platforms — no rankings paid for.