我有一个角度6应用程序,我最初设置我的后端与休息api,但我开始转换部件使用socket.io。
当我从我的rest api返回数据时,以下工作:
this.http.get(api_url + '/versions/entity/' + entityId).pipe(
mergeMap((versions:IVersion[]) => versions),
groupBy((version:IVersion) => version.type),
mergeMap(group => group.pipe(
toArray(),
map(versions=> {
return {
type: group.key,
versions: versions
}
}),
toArray()
)),
reduce((acc, v) => acc.concat(v), [])
);
快递路线:
router.get('/entity/:entityId', (req, res) => {
const entityId = req.params.entityId;
Version.getVersionsByEntity(entityId, (err, versions) => {
if (err) {
res.json({success: false, msg: err});
} else {
res.json(versions);
}
});
});
哪个用mongoose从我的mongo数据库中获取数据:
export function getVersionsByEntity(entityId, callback) {
console.log('models/version - get versions by entity');
Version.find({'entity.entityId': entityId})
.exec(callback);
}
但是,当我使用socket.io进行完全相同的调用时,observable不会返回任何内容。我猜它是因为它永远不会完成?成功传输数据时,http呼叫是否发送“完整”消息?
套接字从此服务发送:
getVersionsByEntity(entityId): Observable<IVersion[]> {
// create observable to list to refreshJobs message
let observable = new Observable(observer => {
this._socketService.socket.on('versionsByEntity', (data) => {
observer.next(data);
});
});
this._socketService.event('versionsByEntity', entityId);
return <Observable<IVersion[]>> observable;
}
从服务器调用相同的mongoose函数。
从(套接字)服务返回的observable确实包含数据,只有当我添加toArray()
函数时它才会在我订阅时从未打印任何内容。
有人可以帮我解决这个问题并解释其背后的理论吗?它没有完成吗? http是否使用Angular发送“已完成”消息?
编辑:我已经创建了一个简单的stackblitz,它正在做我想要的但我想从_dataService中删除take(1)因为我可能想要更新数据所以我想保持observable open - https://stackblitz.com/edit/rxjs-toarray-problem
编辑2:我接近用扫描操作符替换为Array,但它似乎是为阵列发射两次。 reduce()发出正确的数据,但似乎只发出完整(如toArray)所以它没有更好 - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu
处理事件流有点不同。我试图提出一些代码:
getVersionsByEntity(entityId): Observable<IVersion[]> {
return defer(() => {
this._socketService.event('versionsByEntity', entityId);
return fromEvent(this._socketService.socket, 'versionsByEntity')
.pipe(take(1))
}) as Observable<IVersion[]>;
}
主要思想是将所有内容包装在defer
中,这将使Observable变得懒惰,并且只在订阅时调用socketService.event
(您的原始实现是通过立即调用socketService.event
而急切的)。实施急切可能会产生意想不到的后果 - 如果Observable订阅得太晚,很容易错过事件。
我还建议使用fromEvent
Observable工厂 - 处理事件监听器设置和拆除。
最后在第一次发射后完成Observable我添加了take(1)
- 这将限制排放数量为1并取消订阅Observable。
除了@ m1ch4ls所说的,你必须考虑toArray
的工作原理。 toArray
将Observable通知的所有数据转换为此类数据的数组。为了使其工作,Observable必须完成。
Angular http客户端返回的Observable总是在第一次通知后完成,因此toArray
工作。一个socket.io流在关闭时完成,因此在这样的流上使用toArray
可能会导致在流未关闭的情况下永远不会从Observable中获取任何值。
另一方面,如果您想在一次通知后关闭流,如果您使用take(1)
会发生这种情况,那么您最好考虑使用http请求。套接字流被认为是一种长生活频道,因此如果你必须在传输一条消息后不得不关闭它们,它就不适合它们的性质。
评论后更新版本
这是没有take
的代码
getData() {
return this._dataService.getVersions().pipe(
map(
(versions: Array<any>) => versions.reduce(
(acc, val) => {
const versionGroup = (acc.find(el => el.type === val.type));
if (versionGroup) {
versionGroup.versions.push(val)
} else {
acc.push({type: val.type, versions: [val]})
}
return acc;
}, []
)
),
)
要理解的关键点是,您的服务正在为您返回一个Array
的东西。为了实现您的结果,您可以使用Array
方法直接在Array
上工作,而您不需要使用Observable
运算符。请不要将我上面使用的分组逻辑的代码视为正确的实现 - 真正的实现可能会有利于使用像lodash
这样的东西进行分组,但我不想让事情复杂化。
这是你的原始代码
getData() {
return this._dataService.getVersions().pipe(
mergeMap((versions: Array<any>) => versions),
groupBy((version:IVersion) => version.type),
mergeMap(group => group.pipe(
toArray(),
map(versions=> {
return {
type: group.key,
versions: versions
}
}),
toArray()
)),
reduce((acc, v) => acc.concat(v), [])
)
}
这是做什么的
mergeMap
应用于服务返回的ArraygroupBy
运算符,您可以创建一个新的Observable,它可以发出mergeMap
,它接受groupBy
发出的数组,将它们变成一个对象流,只是立即通过第一个toArray
将它们转换为数组,然后转换为类型为{type:string,versions:[]}的Object,然后最终再次调用toArray
来创建一个Array数组reduce
来创建你的最终数组为什么它只适用于take
?因为groupBy
仅在其源Observable完成时才会被执行,这是有道理的,你想要对一组有限的东西进行分组。同样,reduce
Observable运算符。
take
是一种完成源Observable的方法。