我正在尝试使用alamofire和rxswift来消费api。我编写了方法,但观察者的onNext只被调用一次。我试图通过递归调用来做到这一点。这有什么问题? Api将根据时间戳一次返回10个对象。所以我正在检查返回的数组是否包含10个对象。如果是,那么还有更多,如果不是那么那就结束了。
func fetchPersonalization(fromList:[Personalization],timeStamp:Int) -> Observable<PersonalizationContainer>
{
let dictHeader = ["Accept":"application/json","regid" : pushtoken , "os" : "ios" , "token" : token , "App-Version" : "1324" , "Content-Type" : "application/json"]
return fetchPersonalizationUtil(dictHeader: dictHeader, timeStamp: timeStamp)
.flatMap { (perList) -> Observable<PersonalizationContainer> in
let persoList:[Personalization] = perList.list
let finalList = fromList + persoList
if(persoList.count==10){
let newTimeStamp = persoList.last!.lastModifiedAt! - 1
return Observable.merge(Observable.just(PersonalizationContainer(l: finalList, d: perList.data)),
self.fetchPersonalization(fromList:finalList,timeStamp: newTimeStamp)
)
//self.fetchPersonalization(fromList:finalList,timeStamp: newTimeStamp)
}else {
return Observable.just(PersonalizationContainer(l: finalList, d: Data()))
}
}
}
func fetchPersonalizationUtil(dictHeader:[String:String],timeStamp:Int) -> Observable<PersonalizationContainer>
{
return Observable<PersonalizationContainer>.create({ (observer) -> Disposable in
Alamofire.request("https://mranuran.com/api/hubs/personalization/laterthan/\(timeStamp)/limit/10/" ,headers: dictHeader).responseData { response in
if let json = response.result.value {
//print("HUBs JSON: \(json)")
do {
let list = try JSONDecoder().decode([Personalization].self, from: json)
let pContainer = PersonalizationContainer(l: list, d: json)
print("ANURAN \(list[0].name)")
observer.onNext(pContainer)
observer.onCompleted()
}catch {
print(error)
observer.onError(error)
}
}
else{
observer.onError(response.result.error!)
}
}
return Disposables.create()
})
}
我在onNext方法上设置了一个断点,它似乎只被调用了一次。坚持了几个小时和RxSwift的官方github回购中的GithubRepo示例,我无法弄清楚他们在做什么。我的流程有什么问题?
我用Promises写了一段时间,这里是使用Singles。
你传入:
它最终返回一个包含所有结果数组的Single。如果任何内部网络调用错误,则会出错。
func accumulateWhile<T, U>(seed: U, pred: @escaping (T) -> U?, producer: @escaping (U) -> Single<T>) -> Single<[T]> {
return Single.create { observer in
var disposable = CompositeDisposable()
var accumulator: [T] = []
let lock = NSRecursiveLock()
func loop(_ u: U) {
let product = producer(u)
let subDisposable = product.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .success(value):
accumulator += [value]
if let u = pred(value) {
loop(u)
}
else {
observer(.success(accumulator))
}
case let .error(error):
observer(.error(error))
}
}
_ = disposable.insert(subDisposable)
}
loop(seed)
return disposable
}
}
我不认为锁实际上是必要的,但我把它放在以防万一。