我在创建RSE引擎时添加了快照相关参数参数:
snapshotDir='/root/ddbsnaps'
,snapshotIntervalInMsgCount=1000
但我注意到,随着数据在引擎中积累,快照文件不会生成。是因为我的设置不正确吗?我应该如何设置它们?
要使用快照功能,需要设置参数
handlerNeedMsgId
,然后在handler中,需要使用appendMsg
方法写入数据。这是一个参考脚本:
def sum_diff(x, y){
return (x-y)/(x+y)
}
factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>
share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream
share table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE]) as result
rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, factor1], dummyTable=tickStream, outputTable=result, keyColumn="sym", snapshotDir="/tmp/", snapshotIntervalInMsgCount=50)
subscribeTable(tableName=`tickStream, actionName="factors", handler=appendMsg{getStreamEngine(`reactiveDemo)}
,msgAsTable=true, handlerNeedMsgId = true
)
data1 = table(take("000001.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 10+cumsum(rand(0.1, 100)-0.05) as price)
data2 = table(take("000002.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 20+cumsum(rand(0.2, 100)-0.1) as price)
data3 = table(take("000003.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 30+cumsum(rand(0.3, 100)-0.15) as price)
data = data1.unionAll(data2).unionAll(data3).sortBy!(`time)
replay(inputTables=data, outputTables=tickStream, timeColumn=`time)