本文概要介绍了 WebSphere Message Broker V6 计时器节点,并描述了使用这些节点的若干编程模式。本文的内容对 WebSphere Message Broker 信息中心文档中有关使用 TimeoutNotification 和 TimeoutControl 节点的信息形成补充。
引言
IBM® WebSphere® Message Broker V6(以下称为 Message Broker)引入了两个新的内置节点:TimeoutControl 和 TimeoutNotification。这些节点允许您基于时间相关的事件在 Message Broker 中启动流,例如使用 TimeoutNotification 节点来每小时运行一次消息流。对于可以连接到计时器节点启动的消息流中的节点并没有任何限制。
以下是两个新节点的简单概述。本文稍后将提供详细的技术信息。
TimeoutNotification 节点
TimeoutNotification 节点将是计时器控制的消息流中的第一个节点。它没有 In 端子(我们通常会在此类端子处放置一个 MQInput JMSInput、MqeInput JMSInput、JMSInput 或类似的节点)。此结点有一个属性,即它的唯一标识符——由 1 到 12 个字符组成的标签。此标识符必须在此节点部署到的代理实例内具有全局唯一性。TimeoutNotification 节点可以为独立节点,也可以与一个或多个 TimeoutControl 节点一起使用。
图 1. 消息流中的 TimeoutNotification 节点

作为独立节点,TimeoutNotification 节点支持简单的固定间隔计时器事件。例如,可以让消息流每隔 30 秒、一个小时甚至 17 年执行一次。后继部分将说明,当这些计时器事件其中的一个要发生时,如果代理未运行,将会发生何种情况。
当在消息流中仅使用一个 TimeoutNotification 节点时,几乎没有任何灵活性。一旦包含的消息流被部署,该节点将立即开始计时,第一个计时器事件将立即发生。无法停止计时器(除非停止包含的流本身),也无法动态地更改时间间隔。另外,如果 TimeoutNotification 节点指定某个事件每小时出现一次,则该事件将在部署时及其后的一小时准时出现。因此,如果流在 6:17am 部署,相应的事件将出现在 6:17 a.m.、7:17 a.m.、8:17 a.m.,依此类推。
TimeoutControl 节点
在相同或不同的消息流(甚至不同的执行组)中包含一个或多个 TimeoutControl 节点可使得生成的计时器事件的模式变得灵活得多。与 TimeoutNotification 节点类似,TimeoutControl 节点也有一个唯一标识符属性(称为 UID)。尽管该属性的名称如此,但在这种情况下,它不需要也不应该为唯一的。由于 TimeoutControl 节点是用于动态地设置 TimeoutNotification 节点的特性,因此 TimeoutNotification 节点的 UID 应与控制 TimeoutControl 节点的 UID 相同。多个 TimeoutControl 节点可以通过指定相同的 UID 来控制同一个 TimeoutNotification 节点。
图 2. 包含一个 TimeoutControl 节点。请注意向节点名称添加 UID (TIME) 的命名约定

TimeoutControl 节点具有一个 In 端子,可以将提供具有特定结构的消息树的源连接到其上(在下面详细讨论)。此树的元素包括:
- Identifier
- Action
- Start Date
- Start Time
- Interval
- Count
例如,可以向 TimeoutControl 节点传递一个消息,告知 TimeoutNotification 节点在 2006 年 7 月 8 日 17:50:08 启动消息流,然后每年重新启动该流一次,重复 50 次(即总持续时间为 50 年)。
可以将 TimeoutNotification 节点的输出连接回 TimeoutControl 节点。我们将稍后使用这个模式来设置"突发的"计时器事件。
计时器节点与外部流启动方法的比较
通过这些节点,可以基于计时器事件启动消息流。当然还可以采用其他方法启动消息流:
要让流准时发生,可以使用 UNIX cron 作业,该命令可提供丰富的语义来控制作业执行计划。该作业一般会使用 amqsput 等实用工具来向要运行的消息流的输入队列发送一条(虚拟)消息。
虽然这个方法可以正常工作,但与使用计时器节点相比,它增加了系统的复杂性。可以对 cron 作业的创建和消息流的部署进行协调。如果很多流具有这个需求,crontab 的维护任务就会变得繁重起来。由于引入了另一个容易出现故障的点,因此必须具有相应的机制来确保 amqsput 工具确实正常工作,且未生成 2035 等错误返回值。此方法使得故障确认工作变得更为复杂了。
通过使用 timeout 节点,可提供灵活的自包含机制来启动消息流。除了部署流通常的要求外,不需要对代理进行额外的协调。可以使用内置的代理本地跟踪机制来检测设置计时器时的故障。虽然 UNIX 通常会提供 cron,但并非所有安装都包括它,而在 Windows® 和 z/OS® 等其他平台上使用的又是不同的作业计划机制。timeout 节点可提供跨所有 WebSphere Message Broker V6 平台的一致机制。
TimeoutNotification 节点详细信息
图 3. TimeoutNotification 节点的属性框

此节点用于充当闹钟,会在计时器到点时启动消息流。可以独立使用此节点,也可以与一个或多个 TimeoutControl 节点一起使用。
图 3 显示了设置为采用独立模式操作的 TimeoutNotification 节点的属性框。通过将 Operation Mode 属性设置为 Automatic, 可选择独立模式。尽管没有 TimeoutControl 节点,仍然必须指定相应的 UID(在我们的例子中,其值为 Time),且仍然需要在代理中保持唯一性。UID 的长度必须为 1 到 12 个字符之间——直到部署消息流时,才会强制执行 12 个字符的限制。
在独立模式中,将 Transaction 的值设置为 True 或 False,以指示是否在计时器到点时启动事务。Interval 是计时器事件之间的时间间隔,以秒为单位。其他属性不会影响计时器功能。
按惯例连接 TimeoutNotification 节点的 Catch 端子,以避免任何讨厌的重试。获取更多信息,请参阅下面的“通用指南”。
在此节点的 Out 端子上创建的消息应与以下所示类似:
Root:(
(0x01000000):Properties = (
(0x03000000):MessageSet = '
(0x03000000):MessageType = '
(0x03000000):MessageFormat = '
(0x03000000):Encoding = 546
(0x03000000):CodedCharSetId = 0
(0x03000000):Transactional = TRUE
(0x03000000):Persistence = FALSE
(0x03000000):CreationTime = TIMESTAMP '2005-08-11 17:34:54.802123'
(0x03000000):ExpirationTime = -1
(0x03000000):Priority = 0
(0x03000000):ReplyIdentifier = X'
(0x03000000):ReplyProtocol = 'MQ'
(0x03000000):Topic = NULL
(0x03000000):ContentType = '
)
)
Environment:
LocalEnvironment:
(
(0x01000000):TimeoutRequest = (
(0x03000000):Action = 'SET'
(0x03000000):Identifier = 'Time'
(0x03000000):StartDate = '2005-08-11'
(0x03000000):StartTime = '17:34:53.794321'
(0x03000000):Count = 2
(0x03000000):Interval = 1
(0x03000000):IgnoreMissed = TRUE
(0x03000000):AllowOverwrite = TRUE
)
)
|
该节点的树仅包含一个 Properties and LocalEnvironment 子树。因此,如果计划将此消息放到队列上,将需要为 MQOutput 节点构建一个相应的消息(我们将在后面进行此工作)。StartDate 和 StartTime 指示 timeoutNotification 启动的时间。Count 表示事件已出现了多少次(在本例中为第二次出现),Properties.CreationTime 提供事件实际发生时的时间戳。
当代理不可操作,或 TimeoutNotification 节点对应的消息流已停止时,会发生何种情况?显然,应该是不会出现任何计时器事件。但假定代理停止工作 15 分钟,而设置了一个每分钟进行一次计时的计时器,代理是否会在重新启动时创建 15 条计时器消息?答案是不会。对于采用独立模式的节点,在重新启动代理时不会保持状态信息。在代理和流重新启动后,节点将重新启动,且 Count 字段会重置为 1。
TimeoutControl 节点详细信息
TimeoutControl 节点用于设置 TimeoutNotification 节点的参数。
可以将一个或多个 TimeoutControl 节点与单个 TimeoutNotification 节点联合使用。在最简单的配置中,单个消息流可以包含一个 TimeoutControl 及其关联的 TimeoutNotification 节点。所有 timeout 节点都通过在节点属性中指定的 UID 关联。
如图 2 中所示,TimeoutControl 节点具有一个 In 端子,必须将其连接到控制消息的源。这个源通常是 MQInput 节点,但也可以是能产生正确消息的任何节点。
如下面的图 4 中所示,此处指定的 UID 与图 3 中相同。Request Location 是一个可选参数,其缺省值为 LocalEnvironment.TimeoutRequest,用于指定计时器请求子树的位置。由于我们是从 MQInput 节点向此节点发送 XML 消息,因此将其指定为 InputRoot.XML.TimeoutRequest。Request persistence 选项决定了当代理重新启动或包含 TimeoutNotification 节点的执行组停止执行之后,超时请求是否能够持续可用。"Automatic" 意味着使用输入消息的持久性来决定当代理重启超时请求仍然持续可用。
图 4. TimeoutControl 节点基本属性

以下为能够发送到节点的最简单的输入请求消息:
<TimeoutRequest>
<Identifier>TimeReq1</Identifier>
<Action>SET</Action>
</TimeoutRequest>
|
XML 根名称 TimeoutRequest 与在属性面板中的设置对应 (InputRoot.XML.TimeoutRequest)。Identifier 标记为此超时请求提供唯一标识符,该值并不和节点本身的 UID 相关。Action 标记指示我们希望 SET(创建)一个新超时请求。另一个允许的操作是 CANCEL。当将此消息发送到流中时,将立即发生唯一的计时器事件。一旦发生这个唯一的计时器事件,则将该超时请求视为已完成。
当然,立即产生一个计时器事件的消息并不是很有用。更为实用的消息如下:
<TimeoutRequest>
<Identifier>TimeReq1</Identifier>
<Action>SET</Action>
<Count>6</Count>
<Interval>10</Interval>
</TimeoutRequest>
|
在此示例中,超时事件将立即发生,然后每 10 秒钟 (Interval) 重新发生一次,将重复 6 次 (Count)。Count 可以使用特殊值“-1”,表示将重复无限多次。
重要:TimeoutNotification 节点的 Operation Mode 属性必须由 Automatic 更改为 Controlled,以使 TimeoutControl 节点生效:
图 5. 设置了 Controlled 属性的 TimeoutNotification 节点

一个完全指定好的控制消息应与以下所示类似:(请参阅信息中心有关导入 IBM 提供的模式 (6.0.0\ibm\nodes\timeout\timeoutrequest.xsd) 的信息,以创建映射此消息的消息定义。)
<TimeoutRequest>
<Identifier>TimeReq3</Identifier>
<Action>SET</Action>
<StartDate>2005-08-25</StartDate>
<StartTime>NOW</StartTime>
<Count>6</Count>
<Interval>5</Interval>
<IgnoreMissed>TRUE</IgnoreMissed>
<AllowOverwrite>TRUE</AllowOverwrite>
</TimeoutRequest>
|
此示例演示了如何请求计时器在特定日期和时间启动。StartDate 采用“yyyy-mm-dd”格式,如果省略此标记,其缺省值为当前日期。StartTime 标记将指示开始生成计时器事件的本地时间。其格式为“hh:mm:ss.mmmmmm”,其中“mmmmmm”为可选部分(不过计时器事件间隔的精度仅为 1 秒)。可以为 StartTime 指定特殊值“NOW”,如果省略该标记,会使用此值作为缺省值。StartDate 的缺省值为特殊值“TODAY”。StartDate 也可以为“Date”类型的元素,而 StartTime 可以为“Time”类型的元素。
IgnoreMissed 标志指示,当消息流再次启动时,是否应生成代理关闭时(或包含 TimeoutNotification 节点的消息流停止时)应该发生的计时器事件。缺省值为“True”,表示应忽略这些事件(在重新启动时不生成)。“False”表示应在重新启动时生成这些事件。不过,为了使其按照预期的工作,必须在 TimeoutControl 节点上将 Request Persistence 设置为“Yes”(或者,如果其设置为“automatic”,则必须在持久消息中进行发送)。
AllowOverwrite 标志指示是否允许在计时器仍然处于活动状态时更改计时器设置。活动计时器是仍然在产生计时器事件的计时器。因此,假如您创建了名为“TimeReq1”的计时器请求,Count 为 10,Interval 为 6,该计时器的活动状态将持续五十秒(注意,第一个计时器事件会立即发生)。在此期间,如果使用相同的请求标识符(“TimeReq1”)向 TimeoutControl 节点发送了新控制消息,而之前将 AllowOverwrite 标志(在第一个请求上)设置为“false”,则新消息将引发异常条件(“Timeout Set Identifier already used”)。不过,如果控制消息在五十秒后发送,由于该计时器不再处于活动状态,因此会处理此消息。将 AllowOverwrite 设置为 true,会始终允许处理第二条控制消息,而不会考虑计时器的状态如何。
务必要注意,计时器标识符的范围是在 TimeoutNotification 节点的 UID 内。这意味着,如果具有两个 TimeoutNotification 节点,一个的 UID 为“Time”,另一个的 UID 为“Tiempo”,则这两个节点都可以接受标识符为“TimeReq1”的控制消息,这二者是计时器的不同实例。(可以将一个节点中的计时器标识符看作“Time.TimeReq1”,而将另一个节点中的计时器标识符看作“Tiempo.TimeReq1”。)
最后,可以通过发送具有“CANCEL”操作的控制消息来取消计时器。在这种情况下,只需要在控制消息中指定 Action 和 Identifier。取消不处于活动状态的计时器将抛出“Timeout Set Identifier not in store”。
由于 TimeoutNotification 节点的 Operation Mode 现在为“Controlled”,发送到 Out 端子的消息与设置为 Automatic 时的消息并不相同。在此情况下,消息树将包含发送到启动此计时器的 TimeoutControl 节点的消息的 Root.MQMD 和 Root.XML。(实际上,我们可以控制是存储整个控制消息以便传播,还是仅存储该消息的部分内容。请参阅 TimeoutControl 节点的“Stored Message Location”属性。)这就允许我们将启动计时器的消息中的更多信息传递到处理计时器事件的流中。以下是一个发送到 TimeoutNotification 节点的“Out”端子的示例消息(为了清楚起见,删除了一些字段):
Root:
(
(0x01000000):Properties = (
(0x03000000):MessageSet = '
...
(0x03000000):ContentType = '
)
(0x01000000):MQMD = (
(0x03000000):SourceQueue = '
(0x03000000):Transactional = TRUE
(0x03000000):Encoding = 546
(0x03000000):CodedCharSetId = 1208
(0x03000000):Format = 'MQSTR '
...
(0x03000000):OriginalLength = -1
)
(0x01000010):XML = (
(0x01000000):TimeoutRequest = (
(0x01000000):Identifier = 'TimeReq3'
(0x01000000):Action = 'SET'
(0x01000000):StartDate = 'TODAY'
(0x01000000):Count = '60'
(0x01000000):Interval = '5'
(0x01000000):IgnoreMissed = 'FALSE'
(0x01000000):AllowOverwrite = 'FALSE'
(0x01000000):Redrive = '60'
)
)
)
Environment:
LocalEnvironment:
(
(0x01000000):TimeoutRequest = (
(0x03000000):Action = 'SET'
(0x03000000):Identifier = 'TimeReq3'
(0x03000000):StartDate = '2005-08-29'
(0x03000000):StartTime = '11:29:06.654868'
(0x03000000):Count = 1
(0x03000000):Interval = 5
(0x03000000):IgnoreMissed = FALSE
|
在发送到 TimeoutControl 节点的原始消息中,有一个名为“Redrive”的字段(值为 60)。此字段是我自己设置的字段,因此对 TimeoutControl 节点本身没有任何影响。不过,会从 TimeoutNotification 节点传递该字段和控制消息中的所有其他字段,这是一项很有用的功能,我们将在稍后对其加以利用。请注意另外两个有意义的字段:Root.XML.TimeoutRequest.Count(值为 60)和 LocalEnvironment.TimeoutRequest.Count(值为 1)。这两个字段表示所请求的计时器事件原始数量和当前计时器事件数量。可利用此信息确定是否在处理给定请求的最后一个计时器事件。
将 MRM 域消息发送到节点
可以将任何经过正确格式化的消息子树发送到控制节点。例如,可以通过为“Request Location”指定 InputRoot.MRM 来发送 MRM 域中的控制消息(GA 版本包含一个错误。CWF 字段必须带空格(而不是 NULL)。此问题已通过 Fixpack FP1 得到解决。)
进入该节点的消息可能与以下所示类似:
Root:
((0x0100001B):MRM = (
(0x0300000B):Action = 'SET'
(0x0300000B):Identifier = 'TIMEREQ89012'
(0x0300000B):StartDate = 'TODAY'
(0x0300000B):StartTime = 'NOW'
(0x0300000B):Interval = 60
(0x0300000B):Count = 10
(0x0300000B):IgnoreMissed = 'FALSE'
(0x0300000B):AllowOverwrite = 'FALSE'
)
|
编程模式
通用指南
- 为
TimeoutNotification 和 TimeoutControl 节点使用有意义的名称。建议:“TimeoutNotification_UID”和“TimeoutControl_UID”,其中 UID 为由 1 到 12 个字符组成的节点唯一标识符。如果需要在流中使用多个 TimeoutNotification 节点,则向名称追加“_1”、“_2”等后缀。
- 总是连接
TimeoutNotification 节点的 Catch 端子。TimeoutControl 节点不具有 Catch 端子,因此错误将发送到 Failure 端子或向上引发。应该提供相应的措施来处理此类错误。
- 除非有迫切的需要,否则应将
TimeoutControl 节点上的 Request Persistence 设置为“Yes”或“No”。允许设置为“Automatic”,会使得系统行为太过依赖输入消息的持久性设置。
- 将
Reset Content Descriptor (RCD) 节点连接到 TimeoutNotification 节点的 Catch 节点。这可防止后续节点(如 Trace 节点)在以下情况引发异常:TimeoutControl 节点所存储的消息无法由 TimeoutNotification 节点解析,也不能由 Trace 节点解析。RCD 节点应将域重置为 BLOB。(截至 2006 年 3 月,RCD 的节点处理使用“Automatic”的 TimeoutNotification 节点的输出方面存在一个缺陷。此缺陷会在将来的 Fixpack 中得以解决。)
场景 1. 永远每隔“n”秒触发一个流
需求
| 特性 | 值 |
| 每“n”秒都需要计时器事件消息 |
是 |
| 只要包含的消息流正在运行,就应该生成计时器事件 |
是 |
| 如果消息流(或代理)未运行,应在重新启动时生成错过的计时器事件 |
否 |
| 计时器事件的实际执行时间(挂钟时间)是重要的 |
否 |
可能的场景
如果我们需要出于维护目的而运行代理流(如处理或清除在数据基表中积累的行),则很可能出现这种情况。通常,我们并不考虑这些维护过程出现的具体时间,只要它们定期出现即可(例如:“每小时清理数据库一次”)。
编程模式
考虑到这些要求,最佳实践解决方案将是,编码将单个“独立”TimeoutNotification 节点放在流的开始位置:
图 6. 场景 1 的消息流

在 Catch 节点后插入了一个 Reset Content Descriptor (RCD) 节点。这可防止 Trace 节点在以下情况引发异常:TimeoutControl 节点所存储的消息无法由 TimeoutNotification 节点解析,也不能由 Trace 节点解析。RCD 节点会将域重置为 BLOB。
正如前面所说,采用“Automatic”的 TimeoutNotification 节点不能产生适合发送到 MQOuput 节点的消息树(例如,其中不包含 MQMD 子树)。以下 ESQL 代码可用于创建恰当的消息(将在上面“BuildTimeoutMessage”节点中进行编码)。
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
CALL CopyMessageHeaders();
-- Capture the request as an output message
SET "OutputRoot"."Properties"."MessageDomain" = 'XML';
SET "OutputRoot"."MQMD"."Format" = MQFMT_STRING;
SET "OutputRoot"."XML"."TimeoutRequest" =
"InputLocalEnvironment"."TimeoutRequest";
RETURN TRUE;
END;
|
场景 2:在特定时间每“n”秒触发一次流
需求
| 特性 | 值 |
| 每“n”秒都需要计时器事件消息 |
是 |
| 计时器事件要求在特定日期-时间开始。 |
否,应在代理启动时开始 |
| 启动后,应发生“m”个事件。 |
有可能 |
| 计时器事件的实际执行时间(挂钟时间)是重要的 |
是 |
| 如果消息流(或代理)未运行,应在重新启动时生成遗漏的计时器事件 |
有可能 |
可能的场景
此场景和前一场景的重要区别在于,我们希望计时器事件在特定时间发生,如“每小时一次,在整点后 10 分钟发生”。“m”可以为固定的事件数或一个无穷值。
编程模式
由于计时器事件必须在特定时间发生(例如,在整点时发生,如 1:00 p.m),因此有必要使用 TimeoutControl 节点。可采用两种方法使用此节点。
第一个方法是直接将 MQInput 节点连接到 TimeoutControl 节点,然后向与 MQInput 节点关联的队列发送一条控制消息。以下是需要发送的消息的一个示例:
<TimeoutRequest>
<Identifier>HourlyReq</Identifier>
<Action>SET</Action>
<StartDate>TODAY</StartDate>
<StartTime>13:00:00</StartTime>
<Count>-1</Count>
<Interval>3600</Interval>
</TimeoutRequest>
|
如果 Request Persistence 设置为“Yes”,则此消息仅需要发送一次。将需要在部署了包含 TimeoutNotification 节点的消息流后立即发送此消息。此处有一个问题,即“StartTime”字段的设置。必须将此值设置为下一个小时(或需要应用程序的任何具体时间)。因此,必须编写一个自定义应用程序来创建具有恰当“StartTime”值的消息。这明显是不可取的。
第二个解决方案是最佳实践,将使用采用“Automatic”模式的 TimeoutNotification 节点来生成控制消息。图 7 显示了自动从采用“Automatic”模式的 TimeoutNotification 节点为 TimeoutControl 节点生成控制消息的流。
图 7. 自动生成控制消息的消息流

首次部署此流时(以及进行后续部署时),“TimeoutNotification”节点会立即创建一条消息(会将此消息传播到其“Out”端子)。该节点的属性如下所述:
| 属性 | 值 |
| Unique Identifier |
Start |
| Transaction Mode |
Yes |
| Operation Mode |
Automatic |
| Timeout Interval |
3300 |
Timeout Interval 设置为 3300 秒(55 分钟),以对以下情况加以考虑:流在非常接近整点时部署,而消息未在整点前达到 TimeoutControl 节点。通过将 Timeout Interval 设置为 55 分钟,我们可以保证控制消息将为下一个小时设置相应的控制消息。
我们从此消息使用一个计算节点(“Make_Hourly_TimeoutRequest”)来创建一条控制消息,以要求计时器事件在整点发生。请记得允许节点在其属性框中更新“Message + LocalEnvironment”树(Compute 模式)。此节点的 ESQL 如下所示:
CREATE COMPUTE MODULE Make_Hourly_TimeoutRequest
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
declare nexthour_c character;
declare nexthour_i integer;
declare nexthour_t time;
declare nextday_d date;
CALL CopyEntireMessage_and_Environment();
-- grab just the hour out of the current time ...
set nexthour_c = cast(CURRENT_TIME as character format 'HH');
set nexthour_i = cast(nexthour_c as integer);
-- set the timer for the next hour ....
set nexthour_i = nexthour_i + 1;
-- if its already after 11pm, we have to bump the date as well.
if (nexthour_i > 23) then
-- select midnight
set nexthour_i = 0;
-- select tomorrow's date
set nextday_d = cast(InputLocalEnvironment.TimeoutRequest.StartDate as date);
set nextday_d = nextday_d + INTERVAL '1' day;
set OutputLocalEnvironment.TimeoutRequest.StartDate = nextday_d;
end if;
set nexthour_t = cast(nexthour_i, 0, 0 as time);
set OutputLocalEnvironment.TimeoutRequest.StartTime = cast(nexthour_t as time);
-- forever
set OutputLocalEnvironment.TimeoutRequest.Count = -1;
-- once an hour
set OutputLocalEnvironment.TimeoutRequest.Interval = 3600;
set OutputLocalEnvironment.TimeoutRequest.AllowOverwrite = TRUE;
RETURN TRUE;
END;
CREATE PROCEDURE CopyEntireMessage_and_Environment() BEGIN
SET OutputRoot = InputRoot;
set OutputLocalEnvironment = InputLocalEnvironment;
END;
END MODULE;
|
“TimeoutControl_HOURLY”的属性如下所示:
| 属性 | 值 |
| Unique Identifier |
HOURLY |
| Request Location |
(空白) |
| Request Persistence |
No |
| Message Domain |
XML |
由于“TimeoutNotification_Start”节点将始终在代理重新启动(或消息流重新启动)后生成一个事件,因此不需要保持此计时器。会为 Request Location 使用“InputLocalEnvironment.TimeoutRequest”缺省值。
“TimeoutNotification_HOURLY”的属性如下所示:
| 属性 | 值 |
| Unique Identifier |
HOURLY |
| Transaction Mode |
Yes |
| Operation Mode |
Controlled |
| Timeout Interval |
1(由于这是一个“Controlled”节点,因此会被忽略) |
由于“Operation Mode”设置为“Controlled”,因此将忽略“Timeout Interval”的值。
为了给 MQOutput 构造相应的树,需要使用“Make_MQ_tree”计算节点(请参阅场景 1)。和前面使用 TimeoutNotification 节点的示例一样,没有从 TimeoutControl 节点传播的 MQMD 或 XML 子树,因此我们需要设置域并创建 MQMD 树:
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
CALL CopyMessageHeaders();
-- Capture the request as an output message
SET "OutputRoot"."Properties"."MessageDomain" = 'XML';
SET "OutputRoot"."MQMD"."Format" = MQFMT_STRING;
SET "OutputRoot"."XML"."TimeoutRequest" =
"InputLocalEnvironment"."TimeoutRequest";
RETURN TRUE;
END;
|
场景 3. 每“n”秒触发一次流,重复“m”次,然后等待“z”秒后再次进行此过程
需求
| 特性 | 值 |
| 每“n”秒都需要计时器事件消息 |
是,连续出现事件后,我们希望等待特定时长再触发下一组连续事件。 |
| 计时器事件要求在特定日期-时间开始。 |
是 |
| 启动后,应发生“n”个事件。 |
是 |
| 如果消息流(或代理)未运行,应在重新启动时生成遗漏的计时器事件 |
有可能 |
可能的场景
在此场景中,我们希望在一段时间内连续触发计时器事件,随后暂停一段时间,然后再触发连续计时器事件。因此,在每个小时中,我们可以希望在 10 分钟内每一分钟触发一次计时器事件,而接下来的 50 分钟内不触发事件。
编程模式
由于计时器事件必须在特定时间发生(例如,在整点时发生,如 1:00 p.m)且/或必须连续触发特定数量的此类事件,因此有必要使用 TimeoutControl 节点。和在场景 2 中一样,可以通过送达外部消息或使用 TimeoutNotification 节点来设置 TimeoutControl 节点。对于此场景,我们将使用消息作为输入。
例如,可以将初始控制消息的 Count 值设置为 10,Interval 值设置为 60。这样就可以获得第一组连续计时器事件。出现了最后一个计时器事件后,我们需要将另一个控制消息发送回 TimeoutControl 节点,该消息中可以包含相同值,也可以使用不同的值(可以设置 StartDate 或 StartTime)。以下就是一个可以完成此任务的流:
图 8. 场景 3 的消息流

与前一个场景不同,我们将使用 MQInput 节点来发送第一个控制流消息。以下是输入消息包含的 XML:
<TimeoutRequest>
<Identifier>Time0</Identifier>
<Action>SET</Action>
<Count>3</Count>
<Interval>10</Interval>
<BurstInterval>60</BurstInterval>
</TimeoutRequest>
|
由于并未指定 StartDate 或 StartTime,这两个属性的缺省值为 TODAY 和 NOW。此消息要求发送 3 条消息,消息间的时间间隔为 10 秒。字段“BurstTime”不是 Timeout 节点能识别的字段。这是我们将稍后用于设置连续事件组间的时间间隔的字段。如前面所述,TimeoutControl 节点将保存此字段,我们将从 TimeoutNotification 节点传递的消息树中检索该字段。
MQInput 节点会将控制消息发送到 TimeoutControl 节点“TimeoutControl_BURST”中,后者具有以下属性:
| 属性 | 值 |
| Unique Identifier |
BURST |
| Request Location |
InputRoot.XML.TimeoutRequest |
| Request Persistence |
Yes |
| Message Domain |
XML |
“TimeoutNotification_BURST”节点具有以下属性:
| 属性 | 值 |
| Unique Identifier |
BURST |
| Transaction Mode |
Yes |
| Operation Mode |
Controlled |
| Timeout Interval |
1(由于这是一个“Controlled”节点,因此会被忽略) |
将初始控制消息发送到此节点时,“TimeoutNotify_BURST”节点将按照指定的时间间隔创建计时器事件。所有这些计时器事件消息都将发送到“BuildTimeoutMessage”计算节点进行进一步处理。只需要在此节点将超时消息移动到 XML 域即可:
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
CALL CopyMessageHeaders();
SET "OutputRoot"."Properties"."MessageDomain" = 'XML';
SET "OutputRoot"."MQMD"."Format" = MQFMT_STRING;
SET "OutputRoot"."XML"."TimeoutRequest" =
"InputLocalEnvironment"."TimeoutRequest";
RETURN TRUE;
END;
|
在连续发送计时器消息后,我们将发送另一个计时器控制消息,以设置下一组连续的计时器事件。首先,我们必须检测是否已经收到了连续计时器事件中的最后一个计时器事件。可以通过使用 Filter 节点,并根据 Root.XML.TimeoutRequest.Count 测试“LocalEnvironment.TimeoutRequest.Count”的值来完成此任务,其中 LocalEnvironment.TimeoutRequest.Count 的值是由 TimeoutNotification 节点设置的,指示其表示连续事件中的哪个计时器事件,而 Root.XML.TimeoutRequest.Count 则是由初始控制消息设置的,在缺省情况下随计时器事件消息传播。TimeoutNotification 节点传播的子树由 TimeoutControl 节点的“Stored Message Location”属性进行控制。在缺省情况下,会传播整个控制消息。)以下是筛选器节点“ResetControlMessage?”的代码:
CREATE FILTER MODULE ResetControlMessageQ
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
if LocalEnvironment.TimeoutRequest.Count = Root.XML.TimeoutRequest.Count then
RETURN TRUE;
else
RETURN FALSE;
end if;
END;
END MODULE;
|
如果确实检测到这是连续事件中的最后一个计时器事件,则必须构建一个新的控制消息,以设置下一组连续事件。以下是用于获取计时器事件消息(存储在 LocalEnvironment 中)并通过向 StartTime 添加“BurstInterval”来设置此属性的不完整代码片段。请记住,StartTime 表示第一个计时器事件的时间,而不是最后一个事件的时间,因此正确添加“BurstInterval”的值可设置下一个事件的时间。
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
declare inter interval;
-- copy Root and LocalEnvironment
set OutputRoot = InputRoot;
set OutputLocalEnvironment = InputLocalEnvironment;
-- make BurstInterval an "interval" type
set inter = cast(InputRoot.XML.TimeoutRequest.BurstInterval as interval second);
-- and fix up the time to be a "burstInterval" from the last startTime ....
set OutputLocalEnvironment.TimeoutRequest.StartTime =
cast(InputLocalEnvironment.TimeoutRequest.StartTime as time) + inter;
-- if we go over midnight, the date will also need to be fixed
-- this is left as an exercise for the student
RETURN TRUE;
END;
|
最后,第二个 TimeoutControl 节点“TimeoutControl_Burst_1”具有以下属性:
| 属性 | 值 |
| Unique Identifier |
BURST |
| Request Location |
InputLocalEnvironment.TimeoutRequest |
| Request Persistence |
Yes |
| Message Domain |
XML |
最后要注意,我们可以在此流中使用单个 TimeoutControl 节点。如果采用了这样的方式,则有必要更改计算节点,以将控制消息放置在从 MQInput 节点接受时的相同位置(即,将需要位于 Root.XML 中)。此处选择的两个节点可让连接变得更为清楚,并演示了如何为一个通知节点使用两个控制节点。
场景 4. 从周一到周五给定时间发生的计时器事件
需求
| 特性 | 值 |
| 在特定时间周期性地需要计时器事件消息 |
是 |
| 计时器事件要求在特定日期-时间开始。 |
是 |
| 应该排除一周中特定的日期。 |
是 |
| 如果消息流(或代理)未运行,应在重新启动时生成遗漏的计时器事件 |
有可能 |
可能的场景
这实际上就是“场景 2”,不过希望从中筛选掉特定的计时器事件而已。例如,我们可能希望将星期六和星期天排除掉。
编程模式
我们将使用“场景 2”中的消息流,并添加一个筛选器节点:
图 9. 场景 4 的消息流

要设置筛选器节点,我们要进行以下测试:
CREATE FILTER MODULE Exclude_Sat_and_Sunday_Filter
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
declare day_of_week_i integer;
declare day_of_week_c character;
-- 'e' is day of week :
-- Monday = 1, Friday = 5, Sat = 6, Sun = 7
set day_of_week_c = cast(current_date as char format 'e');
set day_of_week_i = cast(day_of_week_c as integer);
if (day_of_week_i > 5) then
RETURN FALSE;
else
return true;
end if;
END;
END MODULE;
|
场景 5. 使用超时节点向流中引入定时延迟
需求
可能的场景
在此场景中,我们具有一个希望延迟一段时间的消息流。这将通常是因为我们需要流等待异步事件发生(需要等待一段固定的时间)。流将开始处理消息,等待一定时间(例如,60 秒),然后在此时间段结束后继续处理消息。
编程模式
在此场景中,我们将 TimeoutControl 节点和 TimeoutNotification 节点连接到流中,并专门通过 Timeout 节点本身传播消息树。此示例流显示了一个解决方案:
图 10. 场景 5 的消息流

“Make_Delay_Request”节点中的 ESQL 为:
declare myCounter shared integer 0;
CREATE COMPUTE MODULE Make_Delay_Request_Compute
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
CALL CopyEntireMessage();
begin atomic
set OutputLocalEnvironment.TimeoutRequest.Identifier =
'TimeReq' || cast (myCounter as character) || SQL.ExecutionGroupLabel;
set myCounter = myCounter + 1;
end;
set OutputLocalEnvironment.TimeoutRequest.Action = 'SET';
set OutputLocalEnvironment.TimeoutRequest.IgnoreMissed = FALSE;
set OutputLocalEnvironment.TimeoutRequest.StartTime =
cast(CURRENT_TIMESTAMP + INTERVAL '60' second as char format 'HH:mm:ss');
RETURN TRUE;
END;
END MDULE;
|
此时我们并不真正关心消息树的内容是什么,我们主要考虑的是使用“Identifier”、“Action”和“StartTime”值创建“LocalEnvironment.TimeoutRequest”子树。
与不使用此技术的流相比,此消息流有一个重要的不同之处。即,如果只允许一个执行线程,而在输入队列上有多个消息,将在读取第二个消息前完成第一个消息的处理。在此流中,将在 TimeoutControl 节点完成前一直处理第一个消息,然后处理第二个消息。因此,可能出现在第一个消息放置到输出队列前已从输入队列处理了很多消息的情况。这意味着,很可能出现同时有多个计时器处于活动状态的情况,正是由于这个原因,因此每个计时器都必须具有唯一标识符。为了创建能跨执行线程共享的计数器,我们将使用新“shared”数据类型类声明“myCounter”整数。当流部署时(或重新启动代理或消息流时),将初始化此整数。(截至 2006 年 3 月,存在一个程序错误,会在首次流计时器到点时重置此变量。)由于此计数器未跨执行组共享,因此有必要也将执行组名称追加到计数器,以确保唯一性(因为计时器事件是跨执行组共享的)。为了跨此流的多个实例确保完整性,变量的使用和更新都包含在“begin atomic...end”块中。在请求上设置了“IgnoreMissed”属性,因为可能不希望出现由于代理未运行而导致消息丢失的情况。
TimeoutControl 节点“TimeoutControl_DELAY”具有以下属性:
| 属性 | 值 |
| Unique Identifier |
DELAY |
| Request Location |
空白,其缺省值为 InputLocalEnvironment.TimeoutRequest |
| Request Persistence |
Yes |
| Stored Message Location |
InputRoot |
| Message Domain |
BLOB |
最后两个属性“Stored Message Location”和“Message Domain”非常重要,因为它们指示在何处查找要在 TimeoutNotification 节点外传播的消息树。在本例中,我们希望传播 InputRoot(MQInput 节点所读取的原始消息)的整个原始消息树。这就减少了 MQMD 和消息有效负载(我们希望将 MQMD 保留,以稍后供 MQOutput 节点使用)。由于我们并不关心消息的结构,因此将域设置为“BLOB”。
TimeoutNotify 节点“TimeoutNotify_Delay”具有以下属性:
| 属性 | 值 |
| Unique Identifier |
Delay |
| Transaction Mode |
Yes |
| Operation Mode |
Controlled |
| Timeout Interval |
1(由于这是一个“Controlled”节点,因此会被忽略) |
退出队列是一个单线程操作,即使消息流本身配置为多实例时也是如此。因此,我们直接将 TimeoutNotification 节点连接到 MQOutput 节点(后者可以为将执行业务流程的后续流的多个实例提供信息)。
场景 6. 使用超时节点向流中引入定时延迟作为异常处理的一部分
需求
可能的场景
在此场景中,我们具有一个希望在出现异常后延迟一段时间的消息流。这与场景 5 类似。在此场景中,我们有理由相信导致异常的原因是暂时的,可以在一段时间后重新尝试事务。可能的例子包括“queue full”和“database connection not available”(由于数据库关闭)。
编程模式
这实际上是场景 5 的一个延续。如场景 6 中所示,我们将 TimeoutNotification 节点连接到一个 Catch 端子。Filter 节点用于仅选择希望在延迟后重新进行处理的异常条件(其他条件将直接送到 Trace 节点)。筛选器还应该确保消息的重新处理次数少于“n”次,从而确保没有遇到“中毒消息”的情况。(可以通过在计时器请求消息本身中带一个计数器,并在每次尝试之后进行递增,从而完成此任务,请参阅前面“Redrive”的讨论。)
如场景 5 中一样,我们需要在将原始消息传递给 TimeoutControl 节点前得到一个计时器请求消息。
由于要同时将原始 MQ 消息(来自 MQInput 节点)和来自 TimeoutNotification 节点的树传递给“TryCatch”节点,因此必须还要确保来自两个源的树完全相同。我们使用“Make_MQ_Msg”计算节点来完成此工作。
图 11. 场景 6 的消息流

有关重试失败消息流的完整解决方案,请参阅 Generic message retry and requeue with WebSphere Message Broker V6。
结束语
WebSphere Message Broker V6 引入了通过使用 TimeoutNotification 和 TimeoutControl 节点启动消息流的新方法。本文说明了在若干不同的场景中有效使用这些节点的最佳实践。 |