目前我正在nestjs中构建一个api网关,它使用一个微服务(也在nestjs中)负责从远程服务器获取文件作为流。
检查nestjs文档,我找到了StreamableFile类,但据我了解,这是直接在网关上使用。
我的微服务上的可读流已准备好返回到 api 网关,但是这应该以块的形式发送到 api 网关,并且该网关本身也会将这些块流式传输到前端。我不想在微服务或我的 api 网关中分配内存中的文件。
我的问题是,为了与微服务通信,我使用 ClientProxy,并且微服务的返回必须是可观察的。
考虑到这一点,我一直在尝试以下方法但没有成功:
NestJS 微服务:
@MessagePattern('get_file_by_key')
getFileStreamByKey(
keyFileName : string,
): Observable<any> {
const stream = this.remoteServerService.getFile(keyFileName).createReadStream();
return from(stream);
}
NestJS 网关(微服务消费者)
@Get('/file')
public getFile(): StreamableFile {
return new StreamableFile(this.clientProxy.send('get_file_by_key', 'theFileName'));
}
上面只是表达我对此的想法,因为代码甚至无法编译,因为 clientProxy 的 send 方法返回一个可观察值,并且我需要 StreamableFile 构造函数的流。
不是寻找立即的解决方案,而是至少提供一些关于如何实施并从中学习的指导。我需要在 Nestjs 微服务上使用 gRPC 吗?
提前谢谢您
我发现的解决方案是使用额外的库将流转换为 rxjs observable,然后使用 npm 库 rxjs-stream 将 rxjs observable 转换为流。
我遇到了同样的问题,花了一天时间才找到如何处理它。
NestJS 微服务:
import { HttpStatus, Injectable, StreamableFile } from '@nestjs/common';
import { AppError, ServerError } from '../utils';
import { ConfigService } from '@nestjs/config';
import * as path from 'path';
import * as fs from 'fs';
import { promises as fsPromises } from 'fs';
@Injectable()
export class AppService {
constructor(
private readonly configService: ConfigService,
) {}
async getImage({ dto }: { dto: GetImageRequestDto }): Promise<StreamableFile> {
const fileName = dto.fileName;
const storeDir = this.configService.get<string>('app.storeDir');
if (!storeDir) {
throw ServerError({
message: 'Image store directory is not configured.',
status: HttpStatus.INTERNAL_SERVER_ERROR,
});
}
const filePath = path.resolve(storeDir, fileName);
if (!fs.existsSync(filePath)) {
throw AppError(this.i18n, {
status: HttpStatus.NOT_FOUND,
identifiers: ['image.not_found'],
});
}
const readStream = fs.createReadStream(filePath);
return new StreamableFile(readStream);
}
}
import { Controller, ValidationPipe } from '@nestjs/common';
import { AppService } from './app.service';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { streamToRx } from 'rxjs-stream';
@Controller()
export class AppController {
constructor(private readonly service: AppService) {}
@MessagePattern('image.get')
async findOneImage(
@Payload(new ValidationPipe({ whitelist: true, transform: true })) dto: GetImageRequestDto,
) {
const stream = await this.service.getImage({ dto });
return streamToRx(stream.getStream());
}
}
NestJs 网关:
控制器:
import { Controller, Get, HttpCode, HttpStatus, Param, Res, UseGuards, } from '@nestjs/common';
import { ApiBearerAuth, ApiTags } from '@nestjs/swagger';
import { ImageService } from './image.service';
import { RolesGuard, SetRequiredRoles } from '../guards/roles.guard';
import { AuthenticationGuard } from '../guards/auth.guard';
import { USER_ROLE } from '../user/enums/user-roles.enum';
import { Response } from 'express';
import { firstValueFrom } from 'rxjs';
import { map } from 'rxjs/operators';
import { isServiceResponseContract } from '../contracts';
import { Logger } from '../utils';
@ApiBearerAuth()
@ApiTags('Image')
@Controller({
path: 'images',
version: '1',
})
export class ImageController {
constructor(
private readonly service: ImageService,
) { }
@Get(':fileName')
@HttpCode(HttpStatus.OK)
@SetRequiredRoles(USER_ROLE.AUTHOR)
@UseGuards(AuthenticationGuard, RolesGuard)
async getImage(
@Param('fileName') fileName: string,
@Res() res: Response,
) {
const responseObservable = await this.service.getImage(fileName);
const responseObject = await firstValueFrom(responseObservable);
if (isServiceResponseContract(responseObject)) {
return responseObject;
}
responseObservable
.pipe(
map((chunk) => {
return !(chunk instanceof Buffer) ? Buffer.from(chunk) : chunk;
}),
)
.subscribe({
next: (chunk) => res.write(chunk),
error: (error) => {
Logger.logError({
functionName: 'ImageController.getImage',
message: 'Failed to retrieve image',
error,
});
res.status(HttpStatus.INTERNAL_SERVER_ERROR).send('Failed to retrieve image');
},
complete: () => res.end(),
});
}
}
服务:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class ImageService {
constructor(
@Inject('IMAGE_SERVICE') private readonly imageServiceClient: ClientProxy,
){}
async getImage(fileName: string) {
const fileStreamObservable = this.imageServiceClient.send('image.get', { fileName });
return fileStreamObservable;
}
}