我的需求是,我需要统计一天处理的消息,所以,我创建了一个子流程,在这里我创建了一个持久的objectstore。我检索它最初的默认值为0。然后,我把它传给java AtomicInteger类,让它在每次有请求的时候做增量。现在我存储新的值。
<sub-flow name="impl-message-counter:\counter-set-flow" doc:id="929c5fa5-74a2-4e8e-b1a0-43209b57222c" >
<os:retrieve doc:name="Retrieve" doc:id="565ecf6c-ffba-491e-b2c9-35ef4b4f52ad" key="counter" objectStore="Object_store" target="count">
<os:default-value ><![CDATA[0]]></os:default-value>
</os:retrieve>
<java:new constructor="AtomicInteger(int)" doc:name="New" doc:id="f0e75c0b-914a-44ee-9e24-a597025afd77" class="java.util.concurrent.atomic.AtomicInteger" target="atomicinteger">
<java:args ><![CDATA[#[output application/java
---
{
arg0 : vars.count as Number {class: "java.lang.Integer"}
}]]]></java:args>
</java:new>
<java:invoke doc:name="Invoke" doc:id="60779082-ad71-4770-b871-8d86044aa62a" instance="#[vars.atomicinteger]" class="java.util.concurrent.atomic.AtomicInteger" method="incrementAndGet()" target="newVal"/>
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</sub-flow>
在上面的流程中,我想添加一个检查处理器,以查看一天是否通过。如果过了午夜,我想把它重新赋值为0,否则继续递增。
<choice doc:name="Choice" doc:id="120b1f05-7afe-4cfc-a103-0b26f9df7140" >
<when expression="#[now().hour==23 and now().minutes==59 and now().seconds==60]">
<os:store doc:name="Store" doc:id="3d995b22-a648-4c7f-af54-33d70b23705c" key="counter" objectStore="Object_store" >
<os:value ><![CDATA[#[0]]]></os:value>
</os:store>
</when>
<otherwise >
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</otherwise>
</choice>
但是上面的方法能行吗?我想如果同一个键存在于同一个对象商店中,它会跳过吗?我如何更新现有的键?
这是一个好主意,但你必须知道,对象存储是不可靠的。你可以检查它,如果你写一个简单的测试,做负载和压力测试。特别是在集群中.首先,通常它的工作,但后来,特别是在同一时刻的并发请求,它失败。所以,如果谨慎使用它,不要依赖它。最糟糕的是--它会打破你的流程。我们可以做的是 - https:/simpleflatservice.commule4IgnoreUnreliableObjectStorage.html。 - 将每一次对对象存储的调用都封装为try块,并简单地忽略任何失败或意外的结果。
下面是代码
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:os="http://www.mulesoft.org/schema/mule/os"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<flow name="selftestFlow" doc:id="79a89806-adea-4017-a838-8b8c3e2002df" >
<http:listener doc:name="Listener" doc:id="63128e3f-34f5-4f04-9b8a-0bebe67f763c" config-ref="HTTP_Listener_config" path="/counter"/>
<try doc:name="Try" doc:id="d5d83aa3-b271-41b0-98a6-381823164de4" >
<os:retrieve doc:name="Retrieve" doc:id="f6d3cccd-a7be-44fa-a51d-36bddbc5892f" key="test" target="localTest">
<os:default-value><![CDATA[N/A]]></os:default-value>
</os:retrieve>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="b14ab2e4-c871-4af8-a488-9c70284a525f" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bc50c855-34b4-4661-9f42-2b18e441782d" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="localTest" ><![CDATA[%dw 2.0
output application/java
---
if (vars.localTest is Number) ( vars.localTest as Number + 1 ) else ( 1 as Number )]]></ee:set-variable>
</ee:variables>
</ee:transform>
<try doc:name="Try" doc:id="2d54c616-7fb6-41ba-bc05-0918199bcba4" >
<os:store doc:name="Store" doc:id="2847d9db-d2f6-4bc4-a0a5-dca0eafbdd49" key="test">
<os:value><![CDATA[#[vars.localTest]]]></os:value>
</os:store>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="90e6616a-af11-4af6-a82f-e9823f764ceb" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bdc2a164-3d80-427f-b95d-a7dfc2b69400" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{ counter: vars.localTest }]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
</mule>