中国IT动力,最新最全的IT技术教程
最新100篇 | 推荐100篇 | 专题100篇 | 排行榜 | 搜索 | 在线API文档
首 页 | 程序开发 | 操作系统 | 软件应用 | 图形图象 | 网络应用 | 精文荟萃 | 教育认证 | 硬件维护 | 未整理篇 | 站长教程
ASP JS PHP工程 ASP.NET 网站建设 UML J2EESUN .NET VC VB VFP 网络维护 数据库 DB2 SQL2000 Oracle Mysql
服务器 Win2000 Office C DreamWeaver FireWorks Flash PhotoShop 上网宝典 CorelDraw 协议大全 网络安全 微软认证
硬件维护  CPU  主板  硬盘  内存  显卡  显示器  键盘鼠标  声卡音箱  打印机  机箱电源  BIOS  网卡  C#  Java  Delphi  vs.net2005
  当前位置:> IBM专区 > DB2 > Java 技术
用户自定义聚集函数提供分组支持
作者:佚名 时间:2005-08-31 15:46 出处:互连网 责编:小渔
              摘要:为 DB2 Universal Database 中用户自定义聚集函数提供分组支持

级别: 高级

Knut Stolze
信息集成开发, IBM Germany
2004 年 8 月

在前面一篇文章中,Knut Stolze 解释了如何在 DB2 UDB 中实现用户自定义的聚集函数。现在他将深入探讨此话题,展示如何通过两种不同的 Java 实现技术支持用户自定义聚集函数中的分组。

简介
在 前一篇文章 中,我介绍了在 DB2° Universal Database™ 中自己实现聚集(列)函数的机制。我那篇文章里所描述的方法可以总结如下:用一个用户自定义函数(如 computeAggregate )计算出一个中间结果,表示查询进行到当前点为止该聚集值的结果。该中间结果用二进制形式编码表示,并通过一个经过编码的计数器逐渐增加。实际的聚集计算是通过 DB2 的 MAX 函数实现的,它使用这个经过编码的计数器找到最新的中间结果。最后一部分是 UDF(如 getAggregateResult ),它将中间结果作为输入参数,并对其进行解码,用正确的数据类型返回其结果。清单 1 展示了采用这种技术的一个查询:

清单 1. 用户自定义聚集函数查询示例

            SELECT getAggregateResult(MAX(
            computeAggregate(yourColumn)))
            FROM   yourTable@

文章 [ 1] 指出查询中不能包含 GROUP BY 子句。然而,这是一个严重的限制,因为聚集函数最常见的用法是按照分组的方式进行的。本文展示了两种用 Java 编程语言实现的技术,可以为用户自定义聚集函数提供分组支持。 第一节解释了如何管理不同分组的中间结果。本节的前提是中间结果相当短小,不会超过某个固定的大小(以字节为单位)。

如果中间结果需要占用大量的存储空间,或者大小不固定,这时最好将中间结果保存在内存中,不在 UDF 和 DB2 引擎之间来回传递。 第二节将告诉您在这种情况下函数该如何实现。这时,中间结果由聚集函数和分组的标识构成,其中不包括实际的中间结果本身。DB2 MAX 函数对中间结果函数过滤之后,我们的函数会通过这些标识访问内部数据结构,找到相关的信息,从而实现对中间结果的二次分解。

这两种技术要求计算中间结果的函数不仅接收应该被聚集的值,也接收能够指出某个(组)特定的值属于哪个分组的信息。没有必要根据内部处理分组的方式对 getAggregateResult 函数进行任何更改。因此, computeAggregate getAggregateResult 的函数签名如 清单 2 所示。

清单 2. 函数签名

            >>--computeAggregate--(--
            parameters--,--
            group_id--)--><
            >>--getAggregateResult--(--
            intermediate_result --)---><
            

下面一节通过例子说明如何用函数计算每一个分组的加权平均数。computeAggregate 的参数包括:

  • 需要进行聚集计算的值。
  • 与之相关的权数。
可通过 VARCHAR 值来识别分组,但也可以将分组定义为整数。当然,加权平均数可以用类似“AVG(value * weight)”的 SQL 表达式来计算,但是这里介绍的技术还可以应用在很多没有这么简单的场景中,感兴趣的读者会喜欢的。本文中的 Java 和 SQL 代码也可以从“下载”一节中找到。

 

固定大小的短中间结果
根据 DB2 Application Development Guide [ 3] 中的描述,当 DB2 为包含用户自定义函数的 Java 类创建实例时,如果 UDF 用 FINAL CALL 子句注册过,那么在整个 SQL 语句的执行过程中这样的实例化(对于语句中每一次出现的 UDF)只进行一次。或者,换句话说,在第一次调用 UDF 之前,就创建该类的一个对象,并且一直保留到最后一次调用这个 UDF 之后。了解了实例的生命期之后,我们就可以在类的属性中存储任何聚集计算所需要的数据。

可用一个属性保存中间结果列表,其中的每一个元素都代表一个分组的聚集信息。按照例子的要求,聚集信息需要保存目前为止所有(该组)传入值的加权和,并保存已经处理过的数值的数量。因为我们需要找到某个特定分组的聚集信息,因此用 Map 保存从分组到聚集信息的映射关系。这样,我们可定义如 清单 3所示的类和属性:

清单 3. AggregateGroup 类和属性定义

            private class AggrInfo {
            public double weightedSum;
            public int count;
            }
            
            private Map groupMap;
            

下面, computeAggregate 函数的任务就简单了。第一次调用 UDF 的时候,需要初始化 groupMap 属性,然后再继续正常调用过程中的一般性处理。正常调用中使用上面的 map 查找当前分组的聚集函数(分组 id 作为输入参数提供),然后计算下一个中间结果。相应的代码如 清单 4所示:

清单 4. computeAggregate 函数的 Java 代码

            public void computeWeightedMeanAggr(double value,
            double weight, String group, Blob result)
            throws Exception
            {
            switch (getCallType()) {
            case SQLUDF_FIRST_CALL:
            groupMap = new HashMap();
            /* fall through to NORMAL call */
            case SQLUDF_NORMAL_CALL:
            // get the previous aggregation information
            // for that group or start with a new group
            AggrInfo info = (AggrInfo)groupMap.get(group);
            if (info == null) {
            info = new AggrInfo();
            info.weightedSum = 0;
            info.count = 0;
            }
            
// update aggregate information info.weightedSum += value * weight; info.count++; groupMap.put(group, info);
// generate the next intermediate result and // return it as VARCHAR FOR BIT DATA ByteArrayOutputStream blobOut = new ByteArrayOutputStream(); DataOutputStream blobData = new DataOutputStream(blobOut); blobData.writeInt(info.count); blobData.writeDouble( info.weightedSum / (double)info.count); result = Lob.newBlob(); OutputStream blobResult = result.getOutputStream(); blobResult.write(blobOut.toByteArray()); set(4, result); } }

从清单 4 中您可以看到,为每一个分组计算中间结果并保存在内存中的部分都用 粗体标识出来。返回给 DB2 的中间结果中包含着常用的计数器(DB2 MAX 函数需要),以及当前分组到目前为止计算出来的实际加权平均数。这一部分在上述代码中以 加粗斜体表示。

您也许已经注意到,中间结果中并不包含任何表示当前分组的信息。这里并不需要分组信息,因为 DB2 会记住输入参数属于哪个分组,因此,也就知道返回的中间结果属于哪个分组。现在, getAggregateResult UDF 的实现就非常简单了,它只需要将加权平均数从最新的中间结果中提取出来即可。 清单 5在这个 UDF 的代码中用 加粗斜体将这种情况显示出来:

清单 5. getAggregateResult 函数的 Java 代码

            void getAggregateResult(Blob intermediateResult,
            double result) throws Exception
            {
            DataInputStream dataIn = new DataInputStream(>
            intermediateResult.getInputStream());
            dataIn.readInt(); // ignore "count"
            
            set(2, dataIn.readDouble());
            
            }
            

现在,我们只需要用 清单 6所示的 CREATE FUNCTION 语句将函数注册到数据库中,就可以对其进行验证。您也许注意到,中间结果并没有暂存起来。由于我们将所有的信息保存在类的实例中,而由于使用了 FINAL CALL,该实例在整个语句的执行过程中都一直存在,所以不需要任何额外的内存,可忽略将中间结果暂存起来的情况。

清单 6. getAggregateResult 函数的 Java 代码

            CREATE FUNCTION computeAggregate (
            value DOUBLE, weight DOUBLE, group VARCHAR(100) )
            RETURNS VARCHAR(20) FOR BIT DATA
            SPECIFIC compAggrClass
            EXTERNAL NAME 'AggregateGroup.computeWeightedMeanAggr'
            LANGUAGE JAVA  PARAMETER STYLE DB2GENERAL
            NOT DETERMINISTIC  NOT FENCED  THREADSAFE
            RETURNS NULL ON NULL INPUT  NO SQL
            NO EXTERNAL ACTION  NO SCRATCHPAD  FINAL CALL
            DISALLOW PARALLEL  NO DBINFO@
            
CREATE FUNCTION getAggregateResult ( intermResult VARCHAR(20) FOR BIT DATA ) RETURNS DOUBLE SPECIFIC getAggrResClass EXTERNAL NAME 'AggregateGroup.getAggregateResult' LANGUAGE JAVA PARAMETER STYLE DB2GENERAL DETERMINISTIC NOT FENCED THREADSAFE RETURNS NULL ON NULL INPUT NO SQL NO EXTERNAL ACTION NO SCRATCHPAD NO FINAL CALL ALLOW PARALLEL NO DBINFO@
SELECT group, getAggregateResult(MAX( computeAggregate(value, weight, group))) FROM TABLE ( VALUES (1, 1, 'a'), (1, 20, 'b'), (3, 3, 'a'), (4.6, 4, 'a'), (2, 0.1, 'a') ) AS t(value, weight, group) GROUP BY group@
GROUP 2 ----- ------------------------ a +7.15000000000000E+000 b +2.00000000000000E+001 2 record(s) selected.
SELECT group, getAggregateResult(MAX( computeAggregate(value1, weight1, group))), getAggregateResult(MAX( computeAggregate(value2, weight2, group))) FROM TABLE ( VALUES (1, 1, 65, 2, 'a'), (1, 20, 13, 4, 'b'),(4.6, 4, 12.1, 1.2, 'a'), (3, 3, 78, 1, 'a'),(2, 0.1, 0.4, 21.8, 'a') ) AS t(value1, weight1, value2, weight2, group) GROUP BY group@
GROUP 2 3 ----- ------------------------ ------------------------ a +7.15000000000000E+000 +5.78100000000000E+001 b +2.00000000000000E+001 +5.20000000000000E+001 2 record(s) selected

手工验证一下该函数,可知其工作正常。

可变大小的中间结果
第一节中介绍的技术只适用于生成最终结果的所有信息都可保存为中间结果的情况。然而,有时候中间结果可能非常复杂,所以更加有效的方式可能是向 DB2 的 MAX 函数传递一些短小的标识(加上计数器),然后通过这些标识访问真正的中间结果。现在详细介绍每一个必要的步骤。

我们需要一种在不同 UDF 之间共享内存(或者更确切地说是一种 Java 对象)的机制。首先需要记住一点,DB2 试图用尽可能经济的方式管理系统资源。因此,它不会为一个 db2agent [ 4] 进程启动多个 Java 虚拟机(JVM)实例。JVM 也可能在多个 db2agent 进程之间共享。不同的外部 Java 例程(UDF 或存储过程)处于 JVM 的独立线程中。

DB2 所采用的这种方法使我们能够跨越多个不同的(在同一个 db2agent 进程中的)UDF 共享 Java 对象,只需使用单件(singleton)对象即可。单件的基本属性就是环境中仅存在该对象的一个实例,本例中的环境即 JVM。JVM 在 UDF 之间共享,所以单件实例也可自动实现共享。单件为我们提供了一种可以将 SQL 语句中每一个 UDF 进行聚集计算时需要的数据存储在另外的对象当中的方式。(注意,一条 SQL 语句中可以包含多个聚集。)这些对象与 第一节 中使用的 AggregateGroup 类似,也跟踪记录 UDF 所处理的每一个分组的聚集信息。 图 1 说明了我们需要的类。有关这些类的详细说明如下所示。

图 1. 处理复杂用户定义聚集函数的类图
处理复杂用户定义聚集函数的类图

用于缓存聚集对象的单件(AggrCache)
单件类 AggrCache 中包含着 Aggregate 对象组成的向量。其中的每一个聚集对象都通过它在向量中的位置进行标识。缓存提供了一种方法,可以获取(也可能是新建)一个聚集对象,如 清单 7中的 粗体部分所示。这个方法要求指定一个 id。也可以指出不再需要某个聚集对象,可将其释放。用 加粗斜体 标识出来的 releaseAggregate 方法就负责这项工作。该类中仅剩的另一个方法(用 斜体标识)用于获取该单件的一个引用,如果尚未实例化则创建它。

该缓存的完整代码如清单 7 所示。

清单 7. 聚集缓存单件的代码

            private static AggrCache instance = null;
            private Vector aggregates;
            
private AggrCache() { aggregates = new Vector();
} public static synchronized AggrCache getInstance() { if(instance == null) { instance = new AggrCache(); } return instance;
}
public synchronized Aggregate getAggregate(Integer id) { Aggregate newAggr = null; if (id == null) { boolean foundGap = false; // find a gap in the list of aggregate objects // where we will store our new object for (int i = 0; i < aggregates.size(); i++) { if (aggregates.elementAt(i) == null) { newAggr = new Aggregate(i); aggregates.setElementAt(newAggr, i); foundGap = true; break; } } // no gap found --> append at end of vector if (foundGap == false) { newAggr = new Aggregate(aggregates.size()); aggregates.addElement(newAggr); } } else { newAggr = (Aggregate)aggregates.elementAt( id.intValue()); } return newAggr; }
public void releaseAggregate(int aggrId) { aggregates.setElementAt(null, aggrId); // shorten the vector if we just removed the // last element if (aggrId == aggregates.size()) { int lastUsed = aggregates.size(); while (lastUsed > 0 && aggregates.elementAt( lastUsed) == null) { lastUsed--; } aggregates.setSize(lastUsed-1); } }

一个 UDF 的 Aggregate 对象(Aggregate)
我们用一个 Java Map 将聚集信息及其分组关联起来。在实现方面,我们遵从与 第一节中几乎相同的方法。映射对象由构造方法来分配(见其中的 斜体部分)。该类还维护着自己的标识(实例化的时候由缓存来设置),该标识可以被查询,如 清单 8中代码的 粗体部分所示。最后,我们可以获取和更新特定分组的聚集信息。用 加粗斜体标识的那两个方法就是完成这项任务的。

清单 8. Aggregate 类的代码

            int aggrId;
            HashMap groupMap;
            
public Aggregate(int id) { aggrId = id; groupMap = new HashMap(); }
public int getId() { return aggrId; } public Object getAggrInfo(Object groupId) { return groupMap.get(groupId); } public void setAggrInfo(Object groupId, Object aggrInfo) { groupMap.put(groupId, aggrInfo); }

UDF 逻辑(AggrUDF)及其聚集信息(AggrInfo)
AggrUDF 类的逻辑将所有的东西都组织起来。在第一次调用 computeAggregate 函数时,通过 AggrCache 分配一个聚集对象。在所有的后续调用中,我们都通过缓存、根据 id 来获取该对象。对聚集对象的访问如 清单 9中的 粗体部分所示。现在,我们可从当前分组(或一个新开始的分组)中获得聚集信息,并将分组信息和传递给 UDF 的参数结合起来。新的聚集信息在聚集对象中更新。这些类都在代码中用 加粗斜体标出。惟一缺少的部分就是函数所返回的二进制串的构造。这个二进制串由常用的计数器(DB2 MAX 函数进行实际聚集计算时需要)、聚集对象的标识以及分组标识构成。

getAggregateResult UDF(设置为 斜体)需要对二进制串解码,然后访问正确的聚集对象。我们也在这个串的编码中加入了当前分组的信息。有了分组信息之后,我们就可以识别出正确的聚集信息,并计算最终应该返回的结果。在示例代码中,我们再次计算加权平均数,但是感兴趣的读者很容易就能看出来,其实在聚集信息中完全可以加入复杂得多的结构。

清单 9. UDF 的入口点

            private class AggrInfo {
            public double value;
            public int count;
            }
            int aggrId;
            
public void computeWeightedMeanAggr(double value, double weight, String group, Blob result) throws Exception { Aggregate aggregate = null; boolean firstCall = false; switch (getCallType()) { case SQLUDF_FIRST_CALL: // get a new aggregation object and its ID aggregate = AggrCache.getInstance(). getAggregate(null); aggrId = aggregate.getId(); firstCall = true; /* fall through to NORMAL call */
case SQLUDF_NORMAL_CALL: if (firstCall != true) { // retrieve the aggregate object aggregate = AggrCache.getInstance(). getAggregate(new Integer(aggrId)); }
// get the previous aggregation information // for current group or start with a new group AggrInfo info = (AggrInfo)aggregate. getAggrInfo(group); if (info == null) { info = new AggrInfo(); info.value = 0; info.count = 0; } info.value += value * weight; info.count++;
// cache the updated group information aggregate.setAggrInfo(group, info);

// generate the next intermediate result, // consisting of the counter and the weighted // mean value, and return it as the result of // the current function invocation ByteArrayOutputStream blobOut = new ByteArrayOutputStream(); DataOutputStream blobData = new DataOutputStream(blobOut); blobData.writeInt(info.count); // counter blobData.writeInt(aggrId); blobData.writeUTF(group); result = Lob.newBlob(); OutputStream blobResult = result.getOutputStream(); blobResult.write(blobOut.toByteArray()); set(4, result); } }
public void close()
{ try { if (aggrId >= 0) { // release aggregate object in FINAL call AggrCache.getInstance(). releaseAggregate(aggrId); } } catch (Exception e) { } }
void getAggregateResult(Blob intermediateResult, double result) throws Exception { // get ID of aggr object from binary encoding DataInputStream dataIn = new DataInputStream( intermediateResult.getInputStream()); dataIn.readInt(); // ignore counter int aggrId = dataIn.readInt(); String group = dataIn.readUTF();
// get aggr information for (encoded) group Aggregate aggregate = AggrCache.getInstance(). getAggregate(new Integer(aggrId)); AggrInfo info = (AggrInfo)aggregate. getAggrInfo(group); if (info == null) { setSQLstate("38A00"); setSQLmessage("Invalid aggregate identifier"); return; }
// set result based on the aggregation information set(2, info.value / info.count); }

注册并测试 UDF
最后一步很常见,即在数据库中注册这两个函数,并执行一些测试。注意,根据分组参数的数据类型不同,您可能需要使用更长的二进制串。这是将分组信息编码到二进制串中的缘故,分组参数类型为 VARCHAR,比简单的 INTEGER 类型潜在地要求更多的存储空间。从 清单 10的 SQL 语句中能够反映出这一点。

清单 10. 注册并测试 UDF 函数

            CREATE FUNCTION computeAggregate (
            value DOUBLE, weight DOUBLE, group VARCHAR(100) )
            RETURNS VARCHAR(200) FOR BIT DATA
            SPECIFIC compAggrCache
            EXTERNAL NAME 'AggrUDF.computeWeightedMeanAggr'
            LANGUAGE JAVA  PARAMETER STYLE DB2GENERAL
            NOT DETERMINISTIC  NOT FENCED  THREADSAFE
            RETURNS NULL ON NULL INPUT  NO SQL
            NO EXTERNAL ACTION  NO SCRATCHPAD  FINAL CALL
            DISALLOW PARALLEL  NO DBINFO@
            
CREATE FUNCTION getAggregateResult ( intermResult VARCHAR(200) FOR BIT DATA ) RETURNS DOUBLE SPECIFIC getAggrResCache EXTERNAL NAME 'AggrUDF.getAggregateResult' LANGUAGE JAVA PARAMETER STYLE DB2GENERAL DETERMINISTIC NOT FENCED THREADSAFE RETURNS NULL ON NULL INPUT NO SQL NO EXTERNAL ACTION NO SCRATCHPAD NO FINAL CALL ALLOW PARALLEL NO DBINFO@
SELECT group, getAggregateResult(MAX( computeAggregate(value, weight, group))) FROM TABLE ( VALUES (1, 1, 'a'), (1, 20, 'b'), (3, 3, 'a'), (4.6, 4, 'a'), (2, 0.1, 'a') ) AS t(value, weight, group) GROUP BY group@
GROUP 2 ----- ------------------------ a +7.15000000000000E+000 b +2.00000000000000E+001 2 record(s) selected. SELECT group, getAggregateResult(MAX( computeAggregate(value1, weight1, group))), getAggregateResult(MAX( computeAggregate(value2, weight2, group))) FROM TABLE ( VALUES (1, 1, 65, 2, 'a'), (1, 20, 13, 4, 'b'),(4.6, 4, 12.1, 1.2, 'a'), (3, 3, 78, 1, 'a'),(2, 0.1, 0.4, 21.8, 'a') ) AS t(value1, weight1, value2, weight2, group) GROUP BY group@
GROUP 2 3 ----- ------------------------ ------------------------ a +7.15000000000000E+000 +5.78100000000000E+001 b +2.00000000000000E+001 +5.20000000000000E+001 2 record(s) selected.

测试成功,返回期望的值。

结束语
用户自定义函数 [ 1] 方面的第一篇文章详细介绍了如何实现用户自定义的聚集函数。然而,那篇文章没有解决在聚集中支持分组操作符的问题。本文弥补了这一不足,介绍了两种实现方法。这两种方法各自维护了一个对象列表,其中的每一个对象都负责管理每一个具体分组的聚集信息。

如果待计算的中间结果长度固定,而且可以很容易地编为二进制串返回给 computeAggregate 函数,可以用第一种方法。第二种方法更适合更长更复杂的中间结果。二进制串中只保存了一组标识,有了这些标识,就可以通过单件对象找到真正的值。

关闭本页
 
首页 | 投资与合作 | 服务条款 | 隐私政策 | 收藏本站 | 设为首页 | 新用户注册 | 免责声明 | 使用帮助
Copyright ©2005-2008 chinaitpower.com All rights reserved. www.chinaitpower.com 版权所有