[Spark 源码阅读笔记] FP-Growth 算法在 Spark 中的实现(一)

FPTree 的实现

Posted by Nicodechal on 2018-03-07

FP-growth (Frequent-Pattern Growth)是数据挖掘中用于挖掘频繁项集的经典算法之一。相较于 Apriori 算法,该算法消除了候选项集,并减少了对数据库扫描的次数,因而效率更高。具体算法思路可以参考数据挖掘教材 data mining concepts and techniques 第六章的内容。

本文主要介绍 FPTree 的实现,FPTree 结构是 Spark 中实现 FP-Growth 算法的主要数据结构。

FPTree 的实现

Spark 中 FPTree 的结构如图所示:

图中每个部分都可以和教材中给出的结构相对应。

Spark 中的实现结构如下,下面先概括性的知道一下每个变量和函数的用途,之后会详细说明,另外阅读时要注意变量的类型,以及它们和上图的对应关系,方便下面理解源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class FPTree[T] {
/**
* 树的根结点,Node类型定义在object FPTree中(在下方)
*/
val root: Node[T]
/**
* 项头表,记录了每个记录项的计数以及结点链(如上方图)
* 类型是Map类型,key是传入的事务集合中元素的类型(上图中Item列),
* value是summary类型(上图中Count列+Nodes列,具体定义在下方)
*/
val summaries: mutable.Map[T, summary[T]]
/**
* 向树上添加一个事务
* @t: 该事务中元素的集合,是一个迭代器
* @count: 事务的计数,即数据库中有几个相同的事务,默认是1
* #return 这棵树
*/
def add(t: Iterator[T], count: Long = 1L): this.type
/**
* 合并一棵树到当前树
* @other: 另一颗同类型(泛型相同)的FP树
* #return 这棵树
*/
def merge(other: FPTree[T]): this.type
/**
* 根据后缀元素生成子树
* @suffix: 后缀元素
* #return 对应于后缀元素的 从根结点开始的路径前缀组成的条件模式基 作为事务数据构造生成的FPTree
*/
def project(suffix: T): FPTree[T]
/**
* 返回整棵树上的所有事务,通过调用getTransactions(root)实现
* #return (事务, 事务计数)组成的迭代器
*/
def transactions: Iterator[(List[T], Long)]
/**
* 获得所有在node结点下的事务
* @node: 某棵子树的根结点
* #return 返回一个迭代器类型,其中元素是(事务,事务计数)这样的元组
*/
def getTransactions(node: Node[T]): Iterator[(List[T], Long)]
/**
* 根据 通过验证的后缀 和 最小支持度计数 得到频繁模式组成的列表
* #return (频繁模式集,支持度计数)这样的元组
*/
def extract(minCount: Long, validateSuffix: T => Boolean): Iterator[(List[T], Long)]
}
object FPTree {
class Node[T](val parent: Node[T]) {
//结点中的元素
var item: T
//结点计数
var count: Long
//结点的子结点,用一个map表示方便取item,key为item,value为以key为item的node。
val children: mutable.Map[T, Node[T]]
//返回结点是否为根结点
def isRoot: Boolean
}
class Summary[T] {
//支持度计数
var count: Long
//结点链,直接用可变列表表示
val nodes: ListBuffer[Node[T]]
}
}

下面主要说明对树的几种基本操作。

向树中增加事务

根据教材中的描述,将事务添加到树中时,需要更新 Summaries 的内容和结点树。

下面看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Adds a transaction with count. */
def add(t: Iterable[T], count: Long = 1L): this.type = {
require(count > 0) //确保count为正
var curr = root //curr初始为root,用来指示 即将插入的item的结点的父结点
curr.count += count //这个count将来会用来将树转回事务,暂时用不到,其值表示事务数
t.foreach { item => //这里遍历事务中的每个item
//确认有没有对应于item的summary没有就建个空的
val summary = summaries.getOrElseUpdate(item, new Summary)
summary.count += count //更新支持度计数
//在curr的子结点中找一下item,如果没有就创建个新的结点
val child = curr.children.getOrElseUpdate(item, {
//这里就处理新建结点的情况
val newNode = new Node(curr)
newNode.item = item
summary.nodes += newNode //因为添加了新结点,需要加入到结点链中
newNode
})
//无论是新加的结点还是已有的结点,都应该加上当前事务的count值(参数里的count,这里的含义是item的数量)
child.count += count
curr = child //更新curr,因为这个child将成为下个结点的父结点
}
this
}

将FP树转换成事务集

FP树中包含了事务集的所有信息,所以也可以从结点树解析出事务集,Spark 中提供了getTransactions方法,这个方法可以求得以某个结点为根结点的子树下的事务集(注意不包括这个结点),调用getTransactions(root)即可得到整棵树的事务集。

getTransactions方法是一个递归的方法,该方法能够返回参数node结点下方的所有事务(注意不包含 node 结点本身),如果要得到包含node结点的所有事务只需在getTransactions调用的返回值之中,给每一项前加入node结点的item即可。

如下图,如果以 I2I2 为根结点,得到的事务集为 [(I2,I3),(I2)][(I2, I3), (I2)] ,而调用getTransactions时,将返回 [(I3),(Nil)][(I3), (Nil)] ,此时将调用结点 I2I2
加入返回值中的每一项 [(I2::I3),(I2::Nil)]=[(I2,I3),(I2)][(I2::I3), (I2::Nil)] = [(I2, I3), (I2)] ,即可得到完整的事务。

下面看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Returns all transactions under this node. */
private def getTransactions(node: Node[T]): Iterator[(List[T], Long)] = {
//这个count最后用于记录node结点的余量,并以(Nil, count)的形式保存在迭代器中,供后续过程使用
var count = node.count
//取出每个子结点这里item = child.item
node.children.iterator.flatMap { case (item, child) =>
//递归获取子结点生成的事务++(Nil+node结点余量)
getTransactions(child).map { case (t, c) =>
count -= c //在这里更新count
(item :: t, c) //将该结点加入事务中,生成一个事务“片段”
}
} ++ {
//这里记录余量
if (count > 0) {
//将余量合并成结点,最晚在根结点处生成完整事务
//(注意此时根结点处会把根结点的子结点连接进这条事务而非root本身)
Iterator.single((Nil, count))
} else {
//像根结点这样没有余量的情况就什么都不加
Iterator.empty
}
}
}

生成相关后缀的子树

该过程输入后缀元素,生成并返回相关子树。这个过程遍历后缀元素对应的结点链,对于每个结点,逆序遍历得到一条事务,事务的频度就是后缀元素的 countcount值。然后将这条事务加入到一棵新建的树中。重复直到结点链尾,最后返回这棵树即可。

注意生成的后缀树有如下性质:

  1. 该树不包含该后缀元素
  2. 后缀元素不一定是叶子结点
  3. 后缀元素指是该事务的最后一项(事务中的项已经有序)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** Gets a subtree with the suffix. */
private def project(suffix: T): FPTree[T] = {
val tree = new FPTree[T] //返回的树初始化
if (summaries.contains(suffix)) { //在Summaries中查找后缀
val summary = summaries(suffix) //去除结点链
summary.nodes.foreach { node =>
var t = List.empty[T] //用来存放解析出来的事务
var curr = node.parent
while (!curr.isRoot) { //循环遍历直到根结点
t = curr.item :: t
curr = curr.parent
}
tree.add(t, node.count) //向tree中添加这条事务,频度等于node的count值
}
}
tree //返回
}

根据验证的后缀和最小支持度计数生成模式

这个函数根据和最小支持度计数生成模式,验证后缀使得函数可以只生成指定后缀的模式,注意由于事务中的 itemitem 已经排序,所以所有的模式组成的集合可以根据后缀元素不相互覆盖的划分

Spark 中这个过程由函数extract实现,函数有两个参数,一个最小支持度计数,另一个是用于验证后缀的函数,这个函数根据输入的元素返回一个布尔值用以决定是否输出相关联的模式,该参数用于分布式生成模式,避免重复输出。

下面看下源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Extracts all patterns with valid suffix and minimum count. */
def extract(
minCount: Long,
// 下面的函数默认值是一个永远返回true的函数
validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
// 遍历树对应的summaries
summaries.iterator.flatMap { case (item, summary) =>
//判断是否是需要的后缀,因为是后缀所以只会在第一次调用时验证
//同时当item的count大于minCount时才有可能有符合条件的模式,这是一个必要条件
if (validateSuffix(item) && summary.count >= minCount) {
//先生成最小的以item为后缀的模式
Iterator.single((item :: Nil, summary.count)) ++ //连接
//这里调用就不需要验证后缀了,因为这里查找的是以item为结尾的事务组成的子树(但是树中不包含item),
//只有这些路径才有可能生成以item为后缀的模式
project(item).extract(minCount).map { case (t, c) =>
//将后缀元连接入结果
(item :: t, c)
}//返回的结果就是所有以item结尾的模式
} else { //如果不符合条件(最小支持度)或者不是指定后缀的结果就返回空
Iterator.empty
}
} //注意flatMap函数返回值类型是Iterator
}

小结

本文主要讲解了 Spark 中 FPTree 的实现,对该结构的重点操作的源代码(包括增加事务、树与事务集的转换以及频繁模式生成)进行了分析。