.Net上的Rx库地址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)
可以有0次、1次或多次数据产生,也就是数据流; 除了数据之外,还有能够标识错误和完成(正常结束); 数据流和数据流、数据流和过程的组合复杂度很高; 在上面的基础上,需要处理整个过程中线程切换、并发同步、数据缓冲等问题。
typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()
// 数据怎么回调,什么情况结束,onError和onComplete分别在什么情况回调,保证有且仅有一次回调
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
let orderServices = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
// 记录整体请求的完成状态
var listOrderFinish = false
var queryUserFinish = false
// 记录各个请求的结果
var listOrderResults = orderServices.map{_ in false}
var queryUserResults = [Bool]()
for (index, orderService) in orderServices.enumerated() {
orderService.listOrders { orders in
// 已结束不处理
if (listOrderFinish) {
return;
}
let index = queryUserResults.count
queryUserResults[index] = false
if let userService = getUserService(site: orderService.site){
let userIds = orders.map { order in
order.userId
}
userService.queryUserInfo(userIds: userIds) { userInfoDict in
if (listOrderFinish && queryUserFinish) {
return;
}
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData?(orderObjects)
} onError: { error in
// 如果是第一个错误,直接回调,同时标记为结束
if (!listOrderFinish || !queryUserFinish) {
listOrderFinish = true
queryUserFinish = true
onError?(error)
}
} onComplete: {
// 外层结束,内层也结束,才是最终结束
if (!listOrderFinish || !queryUserFinish) {
queryUserResults[index] = true
// 所有都结束,回调
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
}
} else {
let orderObjects = orders.map { order in
OrderObject(order: order)
}
onData?(orderObjects)
queryUserResults[index] = true
// 所有都结束,回调
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
} onError: { error in
// 如果是第一个错误,直接回调,同时标记为结束
if (!listOrderFinish) {
listOrderFinish = true
onError?(error)
}
} onComplete: {
// 注意,即使所有的请求都结束了,也不能回调结束,因为这里的结束只是代表Order请求结束,userInfo请求不一定结束
if (!listOrderFinish) {
listOrderResults[index] = true
// 所有都结束,回调
if (!listOrderResults.contains(false)) {
listOrderFinish = true
}
}
}
}
}
什么时候回调错误?
什么时候回调完成?
func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
Observable<Data>.create { observer in
f { data in
observer.onNext(data)
} _: { error in
observer.onError(error)
} _: {
observer.onCompleted()
}
return Disposables.create()
}
}
func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
if let userInfoService = userInfoService {
// 核心是对queryUserInfo的userIds参数进行偏应用
let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
let userIds = orders.map{$0.userId}
userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData(orderObjects)
}, onError: onError, onComplete: onComplete)
}
return userInfoF
} else {
return { onData, onError, onComplete in
onData(orders.map{OrderObject(order: $0)})
onComplete()
}
}
}
func rxLoad() -> Observable<[OrderObject]> {
let orderService = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
// 通过map构造Observable,通过flatMap对listOrder和queryUserInfo进行复合
let observables = orderService.map { orderService in
makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
return makeObservable(f: userLoadF)
}
}
// merge两个平台的Observable
return Observable.merge(observables)
}
// 使用方调用
rxLoad().subscribe { orderObjects in
// onNext闭包中处理数据
} onError: { error in
// onError闭包中处理错误
} onCompleted: {
// onCompleted闭包中处理完成
} onDisposed: {
}
迭代器与序列
异步序列
for try await data in asyncDataList {
print("async get data : + \(data)")
}
// Combine的同步调用
for try await data in publisher.values {
print("async get publiser value \(data)")
}
CPS变换
响应式编程的同步形式
func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream<Data, Error>{ continuation in
f { data in
continuation.yield(data)
} _: { error in
continuation.finish(throwing: e)
} _: {
continuation.finish()
}
}
}
//多个AsyncSequence merge成一个AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
{ onData, onError, onComplete in
Task {
do {
for try await data in ats {
onData(data)
}
onComplete()
} catch {
onError(error)
}
}
}
}
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
{ onData, onError, onComplete in
var finish = false
var results = fs.map{_ in false}
for (index, f) in fs.enumerated() {
f { data in
if (!finish) {
onData(data)
}
} _: { e in
// 如果是第一个错误,直接回调,同时标记为结束
if (!finish) {
finish = true
onError(e)
}
} _: {
// 注意,即使所有的请求都结束了,回调成功
if (!finish) {
results[index] = true
// 所有都结束,回调
if (!results.contains(false)) {
finish = true
onComplete()
}
}
}
}
}
}
func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
let orderService = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
// 通过map构造AsyncSequence,通过flatMap对listOrder和queryUserInfo进行复合
let streams = orderService.map { orderService in
makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
}
}
// merge两个平台的AsyncSequence
return mergeSequence(seqs: streams)
}
for try await orderObject in asLoad() {
print("async get orderObject \(orderObject.first?.order.orderId)")
}
AsyncSequence替换Combine的相关讨论地址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370)
技 术 好 文
🔥通过部署流行 Web 框架掌握 Serverless 技术
🔥解析 RocketMQ 业务消息——“事务消息”
在阿里做前端程序员,我是这样规划的
三端一体计算方案:Unify SQL Engine
企 业 案 例
🔥 企业上云|数字化转型经验分享
阿里云主长春:助力“专精特新”,数字科技陪伴企业成长
云钉低代码新模式、新能力、新机遇
推文科技:AI 解决方案助力内容出海
云 专 栏

