我有一些代码可以在较小的块中执行大文件的上传。这是我有的:
upload(project: Project): Observable<any> {
var chunks = this.chunkFiles(project);
var totalPercent = project.file
.map(file => Math.ceil(file.size / this.chunkSize))
.reduce((sum, curr) => sum + curr) * 100;
var finishedPercent = 0;
var currentPercent = 0;
return from(chunks)
.pipe(
concatMap(request =>
{
return this.http.request(request)
.pipe(
retryWhen(error =>
{
return interval(1000)
.pipe(
flatMap(count =>
{
if (count == 2)
{
return throwError(error);
}
return of(count);
})
);
}));
}),
map(event =>
{
if (event.type === HttpEventType.UploadProgress)
{
var progress = event as HttpProgressEvent;
currentPercent = Math.round(100 * progress.loaded / progress.total);
return {
type: event.type,
loaded: finishedPercent + currentPercent,
total: totalPercent
};
}
else if (event instanceof HttpResponse)
{
finishedPercent += 100;
if (finishedPercent == totalPercent)
{
return event;
}
}
}),
filter(response => response !== undefined)
);
}
目的是它开始上传文件的块,一旦其中一个块上传失败,它应该停止并且错误应该传播到调用我的上传功能的代码。该特定代码如下:
onStartUpload = this.actions$
.ofType(forRoot.START_UPLOAD).pipe(
switchMap((action: forRoot.StartUpload) =>
this.uploadService
.upload(action.project).pipe(
map((event) => {
if (event.type === HttpEventType.UploadProgress) {
const progress = event as HttpProgressEvent;
const percentDone = Math.round(100 * progress.loaded / progress.total);
return new forRoot.UploadProgress(percentDone);
} else if (event instanceof HttpResponse) {
return new forRoot.UploadSucceeded();
} else {
console.log(event);
}
}),
filter(a => a !== undefined),
catchError(error => {
if (error instanceof HttpErrorResponse && error.status === 400) {
const message = this.getMessage(error);
return of(new forRoot.UploadFailed(message || "Bad request"));
} else {
return of(new forRoot.UploadFailed("Network error"));
}
})))
);
问题是我只收到“网络错误”消息,而我期待错误状态为400的内容(所以我应该得到特定消息或“错误请求”)。
我假设我在重试时没有正确处理错误,但我尝试了一些不同的想法,但似乎都没有。我错过了什么?
编辑以显示工作代码而不重试:
这是具有正确错误消息处理的代码,但它没有重试:
return from(chunks)
.pipe(
concatMap(request =>
{
return this.http.request(request)
}),
map(event =>
{
if (event.type === HttpEventType.UploadProgress)
{
var progress = event as HttpProgressEvent;
currentPercent = Math.round(100 * progress.loaded / progress.total);
return {
type: event.type,
loaded: finishedPercent + currentPercent,
total: totalPercent
};
}
else if (event instanceof HttpResponse)
{
finishedPercent += 100;
if (finishedPercent == totalPercent)
{
return event;
}
}
}),
filter(response => response !== undefined)
);
我一直在假设有些东西可以添加到http.request调用中,在重试之间延迟重试指定的次数,然后如果它们全部失败,就像http.request那样抛出一个错误。似乎发生的是它通过concatMap代码继续到地图代码。我可以添加console.log(事件)调用,并查看从服务器返回的错误消息。我仍然是rxjs的新手,所以也许我不明白,但我一直期待错误发生时它不会执行地图代码。
问题在于:
retryWhen(error => { // << error is a stream
return interval(1000)
.pipe(
flatMap(count => {
if (count == 2){
return throwError(error); // so here a stream is thrown
}
return of(count);
})
);
})
error
参数不是单个错误,而是实际上发生的所有错误的Observable。
为了解决这个问题,你应该返回从error
派生的observable,例如:
retryWhen(errors$ =>
errors$.pipe(
delay(1000)
)
)
有限错误处理的更新示例
retryWhen(errors =>
errors.pipe(
// switchMap to add limiting logic
switchMap((error, index) =>
index == 3 // if it is a 4th error
? throwError(error) // -- we rethrow it upstream
: of(error).pipe( delay(1000) ) // -- delay a retry otherwise
)
)
)
继承人retryWhen example with exponential backoff。
还有一篇文章on error handling in rxjs(关于retryWhen
早期完成的说明)。
希望这可以帮助