rxjs - observable什么都不发出(没有完成?)

问题描述 投票:0回答:2

我有一个角度6应用程序,我最初设置我的后端与休息api,但我开始转换部件使用socket.io。

当我从我的rest api返回数据时,以下工作:

this.http.get(api_url + '/versions/entity/' + entityId).pipe(
  mergeMap((versions:IVersion[]) => versions),
  groupBy((version:IVersion) => version.type),
  mergeMap(group => group.pipe(
    toArray(),
    map(versions=> {
      return {
        type: group.key,
        versions: versions
      }
    }),
    toArray()
  )),
  reduce((acc, v) => acc.concat(v), [])
);

快递路线:

router.get('/entity/:entityId', (req, res) => {
  const entityId = req.params.entityId;

  Version.getVersionsByEntity(entityId, (err, versions) => {
    if (err) {
      res.json({success: false, msg: err});
    } else {
      res.json(versions);
    }
  });
});

哪个用mongoose从我的mongo数据库中获取数据:

export function getVersionsByEntity(entityId, callback) {
  console.log('models/version - get versions by entity');

  Version.find({'entity.entityId': entityId})
          .exec(callback);
}

但是,当我使用socket.io进行完全相同的调用时,observable不会返回任何内容。我猜它是因为它永远不会完成?成功传输数据时,http呼叫是否发送“完整”消息?

套接字从此服务发送:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    // create observable to list to refreshJobs message
    let observable = new Observable(observer => {
      this._socketService.socket.on('versionsByEntity', (data) => {
        observer.next(data);
      });
    });

    this._socketService.event('versionsByEntity', entityId);

    return <Observable<IVersion[]>> observable;
  }

从服务器调用相同的mongoose函数。

从(套接字)服务返回的observable确实包含数据,只有当我添加toArray()函数时它才会在我订阅时从未打印任何内容。

有人可以帮我解决这个问题并解释其背后的理论吗?它没有完成吗? http是否使用Angular发送“已完成”消息?

编辑:我已经创建了一个简单的stackblitz,它正在做我想要的但我想从_dataService中删除take(1)因为我可能想要更新数据所以我想保持observable open - https://stackblitz.com/edit/rxjs-toarray-problem

编辑2:我接近用扫描操作符替换为Array,但它似乎是为阵列发射两次。 reduce()发出正确的数据,但似乎只发出完整(如toArray)所以它没有更好 - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu

angular group-by rxjs toarray
2个回答
1
投票

处理事件流有点不同。我试图提出一些代码:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    return defer(() => {
        this._socketService.event('versionsByEntity', entityId);

        return fromEvent(this._socketService.socket, 'versionsByEntity')
            .pipe(take(1))
    }) as Observable<IVersion[]>;
}

主要思想是将所有内容包装在defer中,这将使Observable变得懒惰,并且只在订阅时调用socketService.event(您的原始实现是通过立即调用socketService.event而急切的)。实施急切可能会产生意想不到的后果 - 如果Observable订阅得太晚,很容易错过事件。

我还建议使用fromEvent Observable工厂 - 处理事件监听器设置和拆除。

最后在第一次发射后完成Observable我添加了take(1) - 这将限制排放数量为1并取消订阅Observable。


1
投票

除了@ m1ch4ls所说的,你必须考虑toArray的工作原理。 toArray将Observable通知的所有数据转换为此类数据的数组。为了使其工作,Observable必须完成。

Angular http客户端返回的Observable总是在第一次通知后完成,因此toArray工作。一个socket.io流在关闭时完成,因此在这样的流上使用toArray可能会导致在流未关闭的情况下永远不会从Observable中获取任何值。

另一方面,如果您想在一次通知后关闭流,如果您使用take(1)会发生这种情况,那么您最好考虑使用http请求。套接字流被认为是一种长生活频道,因此如果你必须在传输一条消息后不得不关闭它们,它就不适合它们的性质。

评论后更新版本

这是没有take的代码

 getData() {
    return this._dataService.getVersions().pipe(
      map(
        (versions: Array<any>) => versions.reduce(
          (acc, val) => {
            const versionGroup = (acc.find(el => el.type === val.type));
            if (versionGroup) {
              versionGroup.versions.push(val)
            } else {
              acc.push({type: val.type, versions: [val]})
            }
            return acc;
          }, []
        )
      ),
    )

要理解的关键点是,您的服务正在为您返回一个Array的东西。为了实现您的结果,您可以使用Array方法直接在Array上工作,而您不需要使用Observable运算符。请不要将我上面使用的分组逻辑的代码视为正确的实现 - 真正的实现可能会有利于使用像lodash这样的东西进行分组,但我不想让事情复杂化。

这是你的原始代码

getData() {
    return this._dataService.getVersions().pipe(
      mergeMap((versions: Array<any>) => versions),
      groupBy((version:IVersion) => version.type),
      mergeMap(group => group.pipe(
        toArray(),
        map(versions=> {
          return {
            type: group.key,
            versions: versions
          }
        }),
        toArray()
      )),
      reduce((acc, v) => acc.concat(v), [])
    )
  }

这是做什么的

  1. 您创建一个对象流,将第一个mergeMap应用于服务返回的Array
  2. 通过groupBy运算符,您可以创建一个新的Observable,它可以发出
  3. 根据你的逻辑分组的对象然后你输入第二个更复杂的mergeMap,它接受groupBy发出的数组,将它们变成一个对象流,只是立即通过第一个toArray将它们转换为数组,然后转换为类型为{type:string,versions:[]}的Object,然后最终再次调用toArray来创建一个Array数组
  4. 你要做的最后一件事是运行reduce来创建你的最终数组

为什么它只适用于take?因为groupBy仅在其源Observable完成时才会被执行,这是有道理的,你想要对一组有限的东西进行分组。同样,reduce Observable运算符。

take是一种完成源Observable的方法。

© www.soinside.com 2019 - 2024. All rights reserved.