在 spring webflux 或 project reactor 中,使用 `mergewith` 时需注意其不可变性——它不会原地修改流,而是返回新流;错误地忽略返回值会导致数据丢失,正确做法是用 `flatmap` 或链式 `fold` 累积合并。
在响应式编程中,常见的误区之一是将命令式思维(如 for 循环 + 累加变量)直接套用于 Reactor 的 Flux 操作。你提供的代码:
val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
val events: Flux = eventStore.readEvents(id)
allEventFlux.mergeWith(events) // ❌ 错误:返回新 Flux,但未赋值!
} 问题核心在于:mergeWith 是纯函数式操作,返回一个全新的 Flux,而非修改原流。因此 allEventFlux.mergeWith(events) 执行后,结果被丢弃,allEventFlux 始终保持为初始的空流 Flux.empty()。

val allEvents: Flux= Flux.fromIterable(repository.findIds()) .map { it.ekycId } .flatMap { id -> eventStore.readEvents(id) }
val ids = repository.findIds().map { it.ekycId }
val allEvents: Flux = ids.fold(Flux.empty()) { acc, id ->
acc.mergeWith(eventStore.readEvents(id))
} val allEventsInOrder: Flux= ids.fold(Flux.empty ()) { acc, id -> acc.concatWith(eventStore.readEvents(id)) // ✅ 严格串行:ID1 → ID2 → ... }
| 场景 | 推荐操作符 | 特点 |
|---|---|---|
| 高吞吐、事件无需严格 ID 顺序 | flatMap | 并发执行,自动背压,最常用 |
| 各 ID 事件需严格串行输出 | concatWith(配合 fold) | 顺序执行,延迟高,适合强序要求 |
| 多流静态合并(已知固定数量) | Flux.merge(flux1, flux2, flux3) | 更直观,适用于编译期确定流数 |
始终牢记:Reactor 是声明式、不可变的响应式流模型——每一次操作都在定义“未来如何处理数据”,而非立即执行。 正确理解这一范式,是写出健壮响应式代码的前提。