通过使用Http,我们调用一个执行网络调用并返回http observable的方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们采用这个可观察的并添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们想要做的是确保这不会导致多个网络请求。
这可能看起来像是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了observable以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者。
在RxJs 5中这样做的正确方法是什么?
也就是说,这似乎工作正常:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但这是在RxJs 5中这样做的惯用方式,还是我们应该做其他事情呢?
注意:根据Angular 5新的HttpClient
,所有示例中的.map(res => res.json())
部分现在都是无用的,因为现在默认采用JSON结果。
缓存数据,如果可用缓存,则返回此项,否则发出HTTP请求。
import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';
@Injectable()
export class DataService {
private url:string = 'https://cors-test.appspot.com/test';
private data: Data;
private observable: Observable<any>;
constructor(private http:Http) {}
getData() {
if(this.data) {
// if `data` is available just return it as `Observable`
return Observable.of(this.data);
} else if(this.observable) {
// if `this.observable` is set then the request is in progress
// return the `Observable` for the ongoing request
return this.observable;
} else {
// example header (not necessary)
let headers = new Headers();
headers.append('Content-Type', 'application/json');
// create the request, store the `Observable` for subsequent subscribers
this.observable = this.http.get(this.url, {
headers: headers
})
.map(response => {
// when the cached data is available we don't need the `Observable` reference anymore
this.observable = null;
if(response.status == 400) {
return "FAILURE";
} else if(response.status == 200) {
this.data = new Data(response.json());
return this.data;
}
// make it shared so more than one subscriber can get the result
})
.share();
return this.observable;
}
}
}
这个artile https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html是一个很好的解释如何缓存shareReplay
。
使用Rxjs Observer / Observable + Caching + Subscription的可缓存HTTP响应数据
见下面的代码
*免责声明:我是rxjs的新手,所以请记住,我可能会滥用可观察/观察者的方法。我的解决方案纯粹是我发现的其他解决方案的集合,是未能找到一个简单的记录良好的解决方案的结果。因此,我提供了完整的代码解决方案(正如我希望找到的那样),希望它可以帮助其他人。
*请注意,此方法基于GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在引擎盖下做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。
情况:“产品列表”组件的任务是显示产品列表。该网站是一个单页网页应用程序,其中包含一些菜单按钮,用于“过滤”页面上显示的产品。
解决方案:组件“订阅”服务方法。 service方法返回一个产品对象数组,组件通过订阅回调访问该对象。 service方法将其活动包装在新创建的Observer中并返回观察者。在这个观察者中,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出http调用以检索数据,订阅响应,您可以在其中处理该数据(例如,将数据映射到您自己的模型),然后将数据传递回订阅者。
代码
产品list.component.ts
import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';
@Component({
selector: 'app-product-list',
templateUrl: './product-list.component.html',
styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
products: Product[];
constructor(
private productService: ProductService
) { }
ngOnInit() {
console.log('product-list init...');
this.productService.getProducts().subscribe(products => {
console.log('product-list received updated products');
this.products = products;
});
}
}
product.service.ts
import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';
@Injectable()
export class ProductService {
products: Product[];
constructor(
private http:Http
) {
console.log('product service init. calling http to get products...');
}
getProducts():Observable<Product[]>{
//wrap getProducts around an Observable to make it async.
let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
//return products if it was previously fetched
if(this.products){
console.log('## returning existing products');
observer.next(this.products);
return observer.complete();
}
//Fetch products from REST API
console.log('** products do not yet exist; fetching from rest api...');
let headers = new Headers();
this.http.get('http://localhost:3000/products/', {headers: headers})
.map(res => res.json()).subscribe((response:ProductResponse) => {
console.log('productResponse: ', response);
let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
this.products = productlist;
observer.next(productlist);
});
});
return productsObservable$;
}
}
product.ts(型号)
export interface ProductResponse {
success: boolean;
msg: string;
products: Product[];
}
export class Product {
product_id: number;
sku: string;
product_title: string;
..etc...
constructor(product_id: number,
sku: string,
product_title: string,
...etc...
){
//typescript will not autoassign the formal parameters to related properties for exported classes.
this.product_id = product_id;
this.sku = sku;
this.product_title = product_title;
...etc...
}
//Class method to convert products within http response to pure array of Product objects.
//Caller: product.service:getProducts()
static fromJsonList(products:any): Product[] {
let mappedArray = products.map(Product.fromJson);
return mappedArray;
}
//add more parameters depending on your database entries and constructor
static fromJson({
product_id,
sku,
product_title,
...etc...
}): Product {
return new Product(
product_id,
sku,
product_title,
...etc...
);
}
}
以下是我在Chrome中加载页面时看到的输出示例。请注意,在初始加载时,将从http获取产品(调用我的节点休息服务,该服务在端口3000上本地运行)。然后,当我单击以导航到产品的“已过滤”视图时,可以在缓存中找到产品。
我的Chrome日志(控制台):
core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init. calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products
... [点击菜单按钮过滤产品] ...
app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products
结论:这是我发现(到目前为止)实现可缓存的http响应数据的最简单方法。在我的角度应用程序中,每次导航到产品的不同视图时,产品列表组件都会重新加载。 ProductService似乎是一个共享实例,因此ProductService中的“products:Product []”的本地缓存在导航期间保留,随后对“GetProducts()”的调用返回缓存的值。最后一点,我已经阅读了关于在完成防止“内存泄漏”时需要关闭observable / subscriptions的评论。我没有把它包含在这里,但要记住这一点。
我假设@ngx-cache/core可以用于维护http调用的缓存功能,特别是如果在浏览器和服务器平台上进行HTTP调用。
假设我们有以下方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
您可以使用Cached
的@ngx-cache/core装饰器来存储从cache storage
进行HTTP调用的方法返回的值(storage
可以配置,请检查ng-seed/universal的实现) - 就在第一次执行时。下次调用该方法时(无论是在浏览器还是服务器平台上),都会从cache storage
中检索该值。
import { Cached } from '@ngx-cache/core';
...
@Cached('get-customer') // the cache key/identifier
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
还有可能使用has
使用缓存方法(get
,set
,caching API)。
anyclass.ts
...
import { CacheService } from '@ngx-cache/core';
@Injectable()
export class AnyClass {
constructor(private readonly cache: CacheService) {
// note that CacheService is injected into a private property of AnyClass
}
// will retrieve 'some string value'
getSomeStringValue(): string {
if (this.cache.has('some-string'))
return this.cache.get('some-string');
this.cache.set('some-string', 'some string value');
return 'some string value';
}
}
以下是客户端和服务器端缓存的软件包列表:
rxjs 5.3.0
我对.map(myFunction).publishReplay(1).refCount()
不满意
有多个订阅者,.map()
在某些情况下执行myFunction
两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)
你可以做的另一件事是,不要使用refCount()
并让Observable立即变热:
let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;
无论订户如何,这都将启动HTTP请求。我不确定在HTTP GET完成之前取消订阅是否会取消它。
我们想要做的是确保这不会导致多个网络请求。
我个人最喜欢的是将async
方法用于发出网络请求的呼叫。方法本身不返回值,而是在同一服务中更新BehaviorSubject
,组件将订阅。
现在为什么使用BehaviorSubject
而不是Observable
?因为,
onnext
时触发。getValue()
方法。例:
customer.service.ts
public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);
public async getCustomers(): Promise<void> {
let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
if (customers)
this.customers$.next(customers);
}
然后,只要有需要,我们就可以订阅customers$
。
public ngOnInit(): void {
this.customerService.customers$
.subscribe((customers: Customer[]) => this.customerList = customers);
}
或者您可能希望直接在模板中使用它
<li *ngFor="let customer of customerService.customers$ | async"> ... </li>
所以现在,在你再次调用getCustomers
之前,数据将保留在customers$
BehaviorSubject中。
那么如果你想刷新这些数据怎么办?只是打电话给getCustomers()
public async refresh(): Promise<void> {
try {
await this.customerService.getCustomers();
}
catch (e) {
// request failed, handle exception
console.error(e);
}
}
使用此方法,我们不必在后续网络调用之间明确保留数据,因为它由BehaviorSubject
处理。
PS:通常当一个组件被破坏时,摆脱订阅是一个好习惯,因为你可以使用this回答中建议的方法。
很棒的答案。
或者你可以这样做:
这是来自最新版本的rxjs。我使用的是5.5.7版本的RxJS
import {share} from "rxjs/operators";
this.http.get('/someUrl').pipe(share());
只需在map之后和任何订阅之前调用share()。
在我的例子中,我有一个通用服务(RestClientService.ts),它正在进行其余的调用,提取数据,检查错误并将observable返回到具体的实现服务(f.ex。:ContractClientService.ts),最后这个具体的实现将observable返回给de ContractComponent.ts,并且这个订阅更新视图。
RestClientService.ts:
export abstract class RestClientService<T extends BaseModel> {
public GetAll = (path: string, property: string): Observable<T[]> => {
let fullPath = this.actionUrl + path;
let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
observable = observable.share(); //allows multiple subscribers without making again the http request
observable.subscribe(
(res) => {},
error => this.handleError2(error, "GetAll", fullPath),
() => {}
);
return observable;
}
private extractData(res: Response, property: string) {
...
}
private handleError2(error: any, method: string, path: string) {
...
}
}
ContractService.ts:
export class ContractService extends RestClientService<Contract> {
private GET_ALL_ITEMS_REST_URI_PATH = "search";
private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
public getAllItems(): Observable<Contract[]> {
return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
}
}
ContractComponent.ts:
export class ContractComponent implements OnInit {
getAllItems() {
this.rcService.getAllItems().subscribe((data) => {
this.items = data;
});
}
}
我写了一个缓存类,
/**
* Caches results returned from given fetcher callback for given key,
* up to maxItems results, deletes the oldest results when full (FIFO).
*/
export class StaticCache
{
static cachedData: Map<string, any> = new Map<string, any>();
static maxItems: number = 400;
static get(key: string){
return this.cachedData.get(key);
}
static getOrFetch(key: string, fetcher: (string) => any): any {
let value = this.cachedData.get(key);
if (value != null){
console.log("Cache HIT! (fetcher)");
return value;
}
console.log("Cache MISS... (fetcher)");
value = fetcher(key);
this.add(key, value);
return value;
}
static add(key, value){
this.cachedData.set(key, value);
this.deleteOverflowing();
}
static deleteOverflowing(): void {
if (this.cachedData.size > this.maxItems) {
this.deleteOldest(this.cachedData.size - this.maxItems);
}
}
/// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
/// However that seems not to work. Trying with forEach.
static deleteOldest(howMany: number): void {
//console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
let iterKeys = this.cachedData.keys();
let item: IteratorResult<string>;
while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
//console.debug(" Deleting: " + item.value);
this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
}
}
static clear(): void {
this.cachedData = new Map<string, any>();
}
}
它是静态的,因为我们如何使用它,但随意使它成为一个普通的类和服务。我不确定角度是否在整个时间内保持单个实例(Angular2新增)。
这就是我使用它的方式:
let httpService: Http = this.http;
function fetcher(url: string): Observable<any> {
console.log(" Fetching URL: " + url);
return httpService.get(url).map((response: Response) => {
if (!response) return null;
if (typeof response.json() !== "array")
throw new Error("Graph REST should return an array of vertices.");
let items: any[] = graphService.fromJSONarray(response.json(), httpService);
return array ? items : items[0];
});
}
// If data is a link, return a result of a service call.
if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
{
// Make an HTTP call.
let url = this.data[verticesLabel][name]["link"];
let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
if (!cachedObservable)
throw new Error("Failed loading link: " + url);
return cachedObservable;
}
我认为可能有一种更聪明的方式,这将使用一些Observable
技巧,但这对我的目的来说很好。
只需使用此缓存层,它就可以执行您需要的所有操作,甚至可以管理ajax请求的缓存。
http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html
这很容易使用
@Component({
selector: 'home',
templateUrl: './html/home.component.html',
styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
constructor(AjaxService:AjaxService){
AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
}
articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}
该层(作为可注入的角度服务)是
import { Injectable } from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable } from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
public data:Object={};
/*
private dataObservable:Observable<boolean>;
*/
private dataObserver:Array<any>=[];
private loading:Object={};
private links:Object={};
counter:number=-1;
constructor (private http: Http) {
}
private loadPostCache(link:string){
if(!this.loading[link]){
this.loading[link]=true;
this.links[link].forEach(a=>this.dataObserver[a].next(false));
this.http.get(link)
.map(this.setValue)
.catch(this.handleError).subscribe(
values => {
this.data[link] = values;
delete this.loading[link];
this.links[link].forEach(a=>this.dataObserver[a].next(false));
},
error => {
delete this.loading[link];
}
);
}
}
private setValue(res: Response) {
return res.json() || { };
}
private handleError (error: Response | any) {
// In a real world app, we might use a remote logging infrastructure
let errMsg: string;
if (error instanceof Response) {
const body = error.json() || '';
const err = body.error || JSON.stringify(body);
errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
} else {
errMsg = error.message ? error.message : error.toString();
}
console.error(errMsg);
return Observable.throw(errMsg);
}
postCache(link:string): Observable<Object>{
return Observable.create(observer=> {
if(this.data.hasOwnProperty(link)){
observer.next(this.data[link]);
}
else{
let _observable=Observable.create(_observer=>{
this.counter=this.counter+1;
this.dataObserver[this.counter]=_observer;
this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
_observer.next(false);
});
this.loadPostCache(link);
_observable.subscribe(status=>{
if(status){
observer.next(this.data[link]);
}
}
);
}
});
}
}
它是.publishReplay(1).refCount();
或.publishLast().refCount();
,因为Angular Http observables在请求后完成。
这个简单的类缓存结果,因此您可以多次订阅.value并只发出1个请求。您还可以使用.reload()发出新请求并发布数据。
您可以像以下一样使用它:
let res = new RestResource(() => this.http.get('inline.bundleo.js'));
res.status.subscribe((loading)=>{
console.log('STATUS=',loading);
});
res.value.subscribe((value) => {
console.log('VALUE=', value);
});
和来源:
export class RestResource {
static readonly LOADING: string = 'RestResource_Loading';
static readonly ERROR: string = 'RestResource_Error';
static readonly IDLE: string = 'RestResource_Idle';
public value: Observable<any>;
public status: Observable<string>;
private loadStatus: Observer<any>;
private reloader: Observable<any>;
private reloadTrigger: Observer<any>;
constructor(requestObservableFn: () => Observable<any>) {
this.status = Observable.create((o) => {
this.loadStatus = o;
});
this.reloader = Observable.create((o: Observer<any>) => {
this.reloadTrigger = o;
});
this.value = this.reloader.startWith(null).switchMap(() => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.LOADING);
}
return requestObservableFn()
.map((res) => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.IDLE);
}
return res;
}).catch((err)=>{
if (this.loadStatus) {
this.loadStatus.next(RestResource.ERROR);
}
return Observable.of(null);
});
}).publishReplay(1).refCount();
}
reload() {
this.reloadTrigger.next(null);
}
}
您可以构建简单的类Cacheable <>,以帮助管理从具有多个订阅者的http服务器检索的数据:
declare type GetDataHandler<T> = () => Observable<T>;
export class Cacheable<T> {
protected data: T;
protected subjectData: Subject<T>;
protected observableData: Observable<T>;
public getHandler: GetDataHandler<T>;
constructor() {
this.subjectData = new ReplaySubject(1);
this.observableData = this.subjectData.asObservable();
}
public getData(): Observable<T> {
if (!this.getHandler) {
throw new Error("getHandler is not defined");
}
if (!this.data) {
this.getHandler().map((r: T) => {
this.data = r;
return r;
}).subscribe(
result => this.subjectData.next(result),
err => this.subjectData.error(err)
);
}
return this.observableData;
}
public resetCache(): void {
this.data = null;
}
public refresh(): void {
this.resetCache();
this.getData();
}
}
用法
声明Cacheable <>对象(可能是服务的一部分):
list: Cacheable<string> = new Cacheable<string>();
和处理程序:
this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}
从组件调用:
//gets data from server
List.getData().subscribe(…)
您可以订阅几个组件。
更多细节和代码示例如下:http://devinstance.net/articles/20171021/rxjs-cacheable
根据@Cristian的建议,这是一种适用于HTTP observable的方法,它只发出一次然后完成:
getCustomer() {
return this.http.get('/someUrl')
.map(res => res.json()).publishLast().refCount();
}
你可以简单地使用ngx-cacheable!它更适合您的场景。
使用它的好处
- 它仅调用rest API一次,缓存响应并为后续请求返回相同的响应。
- 可以在创建/更新/删除操作后根据需要调用API。
那么,你的服务类就是这样的 -
import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';
const customerNotifier = new Subject();
@Injectable()
export class customersService {
// relieves all its caches when any new value is emitted in the stream using notifier
@Cacheable({
cacheBusterObserver: customerNotifier,
async: true
})
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
addCustomer() {
// some code
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
updateCustomer() {
// some code
}
}
Here的链接供更多参考。
这正是我为库创建库ngx-rxcache的原因。
在https://github.com/adriandavidbrand/ngx-rxcache看看它,看看https://stackblitz.com/edit/angular-jxqaiv的一个工作示例
您是否尝试过运行已有的代码?
因为您正在根据getJSON()
产生的承诺构建Observable,所以在任何人订阅之前都会发出网络请求。由此产生的承诺由所有订户共享。
var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
更新:Ben Lesh说5.2.0之后的下一个小版本,你将能够只调用shareReplay()来真正缓存。
先前.....
首先,不要使用share()或publishReplay(1).refCount(),它们是相同的,并且它的问题在于,只有在observable处于活动状态时建立连接时,如果在完成后连接,它才会共享,它再次创建一个新的可观察,翻译,而不是真正的缓存。
Birowski在上面提供了正确的解决方案,即使用ReplaySubject。在我们的案例1中,ReplaySubject将缓存你给它的值(bufferSize)。一旦refCount达到零并且你建立一个新的连接,这将是缓存的正确行为,它不会创建像share()这样的新的observable。
这是一个可重用的功能
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
这是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
下面是可缓存功能的更高级版本。这个允许有自己的查找表+提供自定义查找表的能力。这样,您不必像上面的示例中那样检查this._cache。另请注意,不是将observable作为第一个参数传递,而是传递一个返回observable的函数,这是因为Angular的Http立即执行,所以通过返回一个惰性执行函数,我们可以决定不调用它,如果它已经在我们的缓存。
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
rxjs 5.4.0有一个新的shareReplay方法。
作者明确表示“理想的处理诸如缓存AJAX结果之类的东西”
rxjs PR #2443 feat(shareReplay): adds shareReplay
variant of publishReplay
shareReplay返回一个observable,它是在ReplaySubject上进行多播的源。该重放主题在来自源的错误时被回收,但不是在源完成时回收。这使得shareReplay非常适合处理诸如缓存AJAX结果之类的事情,因为它是可重试的。然而,它的重复行为与分享的不同之处在于它不会重复源观察,而是重复源可观察的值。
根据这个article
事实证明,我们可以通过添加publishReplay(1)和refCount轻松地将缓存添加到observable。
所以内部if语句只是附加
.publishReplay(1)
.refCount();
到.map(...)
我主演了这个问题,但我会试着去试试这个问题。
//this will be the shared observable that
//anyone can subscribe to, get the value,
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);
getCustomer().subscribe(customer$);
//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));
//here's the second subscriber
setTimeout(() => {
customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);
function getCustomer() {
return new Rx.Observable(observer => {
console.log('api request');
setTimeout(() => {
console.log('api response');
observer.next('customer object');
observer.complete();
}, 500);
});
}
这是proof :)
只有一个外卖:getCustomer().subscribe(customer$)
我们没有订阅getCustomer()
的api响应,我们订阅了一个可观察的ReplaySubject,它也可以订阅一个不同的Observable并且(这很重要)保持它的最后一个值并将其重新发布到它的任何一个( ReplaySubject的订阅者。
我找到了一种方法来将http get结果存储到sessionStorage并将其用于会话,以便它永远不会再次调用服务器。
我用它来调用github API来避免使用限制。
@Injectable()
export class HttpCache {
constructor(private http: Http) {}
get(url: string): Observable<any> {
let cached: any;
if (cached === sessionStorage.getItem(url)) {
return Observable.of(JSON.parse(cached));
} else {
return this.http.get(url)
.map(resp => {
sessionStorage.setItem(url, resp.text());
return resp.json();
});
}
}
}
仅供参考,sessionStorage限制为5M(或4.75M)。因此,对于大型数据集,不应该像这样使用它。
------编辑------------- 如果你想用F5刷新数据,它使用内存数据而不是sessionStorage;
@Injectable()
export class HttpCache {
cached: any = {}; // this will store data
constructor(private http: Http) {}
get(url: string): Observable<any> {
if (this.cached[url]) {
return Observable.of(this.cached[url]));
} else {
return this.http.get(url)
.map(resp => {
this.cached[url] = resp.text();
return resp.json();
});
}
}
}
您选择的实现将取决于您是否希望unsubscribe()取消您的HTTP请求。
无论如何,TypeScript decorators是标准化行为的好方法。这是我写的那个:
@CacheObservableArgsKey
getMyThing(id: string): Observable<any> {
return this.http.get('things/'+id);
}
装饰者定义:
/**
* Decorator that replays and connects to the Observable returned from the function.
* Caches the result using all arguments to form a key.
* @param target
* @param name
* @param descriptor
* @returns {PropertyDescriptor}
*/
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
const originalFunc = descriptor.value;
const cacheMap = new Map<string, any>();
descriptor.value = function(this: any, ...args: any[]): any {
const key = args.join('::');
let returnValue = cacheMap.get(key);
if (returnValue !== undefined) {
console.log(`${name} cache-hit ${key}`, returnValue);
return returnValue;
}
returnValue = originalFunc.apply(this, args);
console.log(`${name} cache-miss ${key} new`, returnValue);
if (returnValue instanceof Observable) {
returnValue = returnValue.publishReplay(1);
returnValue.connect();
}
else {
console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
}
cacheMap.set(key, returnValue);
return returnValue;
};
return descriptor;
}
rxjs version 5.4.0 (2017-05-09)增加了对shareReplay的支持。
为什么要使用shareReplay?
当您有副作用或计算不想要在多个订阅者之间执行的计算时,通常需要使用shareReplay。在您知道您将拥有需要访问先前发出的值的流的后期订阅者的情况下,它也可能很有价值。这种重放订阅值的能力是区分共享和共享重放的能力。
您可以轻松地修改角度服务以使用它并返回一个带有缓存结果的observable,该结果只会使http调用一次(假设第一次调用是成功的)。
这是一个使用shareReplay
的非常简单的客户服务。
customer.service.ts
import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Injectable()
export class CustomerService {
private readonly _getCustomers: Observable<ICustomer[]>;
constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}
export interface ICustomer {
/* ICustomer interface fields defined here */
}
请注意,构造函数中的赋值可以移动到方法getCustomers
,但是从HttpClient
are "cold"返回的observables在构造函数中执行此操作是可以接受的,因为http调用只会在第一次调用subscribe
时生成。
此外,这里的假设是初始返回的数据在应用程序实例的生命周期中不会过时。