NestJS - 使用微服务获取文件流并使用 Observable 返回到网关

问题描述 投票:0回答:2

目前我正在nestjs中构建一个api网关,它使用一个微服务(也在nestjs中)负责从远程服务器获取文件作为流。

检查nestjs文档,我找到了StreamableFile类,但据我了解,这是直接在网关上使用。

我的微服务上的可读流已准备好返回到 api 网关,但是这应该以块的形式发送到 api 网关,并且该网关本身也会将这些块流式传输到前端。我不想在微服务或我的 api 网关中分配内存中的文件。

我的问题是,为了与微服务通信,我使用 ClientProxy,并且微服务的返回必须是可观察的。

考虑到这一点,我一直在尝试以下方法但没有成功:

NestJS 微服务:

@MessagePattern('get_file_by_key')
getFileStreamByKey(
    keyFileName : string,
): Observable<any> {
    const stream = this.remoteServerService.getFile(keyFileName).createReadStream();
    return from(stream);
}

NestJS 网关(微服务消费者)

@Get('/file')
public getFile(): StreamableFile {        
    return new StreamableFile(this.clientProxy.send('get_file_by_key', 'theFileName'));
}

上面只是表达我对此的想法,因为代码甚至无法编译,因为 clientProxy 的 send 方法返回一个可观察值,并且我需要 StreamableFile 构造函数的流。

不是寻找立即的解决方案,而是至少提供一些关于如何实施并从中学习的指导。我需要在 Nestjs 微服务上使用 gRPC 吗?

提前谢谢您

node.js observable microservices nestjs streaming
2个回答
0
投票

我发现的解决方案是使用额外的库将流转换为 rxjs observable,然后使用 npm 库 rxjs-stream 将 rxjs observable 转换为流。


0
投票

我遇到了同样的问题,花了一天时间才找到如何处理它。

NestJS 微服务:

  • 服务:
import { HttpStatus, Injectable, StreamableFile } from '@nestjs/common';
import { AppError, ServerError } from '../utils';
import { ConfigService } from '@nestjs/config';
import * as path from 'path';
import * as fs from 'fs';
import { promises as fsPromises } from 'fs';

@Injectable()
export class AppService {

  constructor(
    private readonly configService: ConfigService,
  ) {}

  async getImage({ dto }: { dto: GetImageRequestDto }): Promise<StreamableFile> {

      const fileName = dto.fileName;
      const storeDir = this.configService.get<string>('app.storeDir');
      if (!storeDir) {
        throw ServerError({
          message: 'Image store directory is not configured.',
          status: HttpStatus.INTERNAL_SERVER_ERROR,
        });
      }

      const filePath = path.resolve(storeDir, fileName);
      if (!fs.existsSync(filePath)) {
        throw AppError(this.i18n, {
          status: HttpStatus.NOT_FOUND,
          identifiers: ['image.not_found'],
        });
      }

      const readStream = fs.createReadStream(filePath);
      return new StreamableFile(readStream);
  }
}
  • 控制器:
import { Controller, ValidationPipe } from '@nestjs/common';
import { AppService } from './app.service';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { streamToRx } from 'rxjs-stream';

@Controller()
export class AppController {
  constructor(private readonly service: AppService) {}
@MessagePattern('image.get')
async findOneImage(
    @Payload(new ValidationPipe({ whitelist: true, transform: true })) dto: GetImageRequestDto,
) {
    const stream = await this.service.getImage({ dto });
    return streamToRx(stream.getStream());
  }
}

NestJs 网关:

控制器:

import { Controller, Get, HttpCode, HttpStatus, Param,  Res,  UseGuards, } from '@nestjs/common';
import { ApiBearerAuth,  ApiTags } from '@nestjs/swagger';
import { ImageService } from './image.service';
import { RolesGuard, SetRequiredRoles } from '../guards/roles.guard';
import { AuthenticationGuard } from '../guards/auth.guard';
import { USER_ROLE } from '../user/enums/user-roles.enum';
import { Response } from 'express';
import { firstValueFrom } from 'rxjs';
import { map } from 'rxjs/operators';
import { isServiceResponseContract } from '../contracts';
import { Logger } from '../utils';

@ApiBearerAuth()
@ApiTags('Image')
@Controller({
  path: 'images',
  version: '1',
})
export class ImageController {
  constructor(
    private readonly service: ImageService,
  ) { }

@Get(':fileName')
  @HttpCode(HttpStatus.OK)
  @SetRequiredRoles(USER_ROLE.AUTHOR)
  @UseGuards(AuthenticationGuard, RolesGuard)
  async getImage(
    @Param('fileName') fileName: string,
    @Res() res: Response,
  ) {
    const responseObservable = await this.service.getImage(fileName);
    const responseObject = await firstValueFrom(responseObservable);

    if (isServiceResponseContract(responseObject)) {
      return responseObject;
    }

    responseObservable
      .pipe(
        map((chunk) => {
          return !(chunk instanceof Buffer) ? Buffer.from(chunk) : chunk;
        }),
      )
      .subscribe({
        next: (chunk) => res.write(chunk),
        error: (error) => {
          Logger.logError({
            functionName: 'ImageController.getImage',
            message: 'Failed to retrieve image',
            error,
          });
          res.status(HttpStatus.INTERNAL_SERVER_ERROR).send('Failed to retrieve image');
        },
        complete: () => res.end(),
      });
  }
}

服务:

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class ImageService {
    constructor(
      @Inject('IMAGE_SERVICE') private readonly imageServiceClient: ClientProxy,
    ){}
  
    async getImage(fileName: string) {
      const fileStreamObservable = this.imageServiceClient.send('image.get', { fileName });
      return fileStreamObservable;
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.