这几天项目中使用 Hadoop 遇到一个问题,对于这样 key-value 的数据集合:id-biz object,对 id 进行 partition(比如根据某特定的 hash 算法 P),分为 a 份;使用数量为 b 的 reducer,在 reducer 里面要使用第三方组件进行批量上传;上传成文件,文件数量为 c,但是有两个要求:
- 上述 a、b、c 都相等,从而使得每个 partition 的数据最终都通过同一个 reducer 上传到同一个文件中去;
- 每个 reducer 中上传的数据要求 id 必须有序。
最开始,想到的办法是,为了保证 reducer 中的批量上传,需要使得传入 reducer 的 key 变成一个经过 hash 算法 A 计算得到的 index,这样就使得 reducer 中的 value 是一个包含了数个 biz boject 的集合的 iterator,从而实现在一次 reducer 调用中批量上传并且提交。在批量上传提交的过程中,按照每上限个(例如 1000 个)文件提交一次的办法进行,以保证内存占用控制在一定范围内。
如何保证有序?
Hadoop 在 Reduce 之前会自动对 key 排序,但是上述的情况实际是要根据 id 来给 value 排序(因为在 map 之后 key 已经变成 index 了),凡是涉及到要给 value 排序的,都要使用 Hadoop 的 Secondary Sorting(见 stackoverflow 链接)。
这张图其实已经可以说明,把 value 要排序的关键属性放到 key 里面去,这样 key 就变成了 natural key(上述的 index)和 secondary key(上述的 id)这样两部分组成的一个 composite key。
1. Partition:Partition 的时候仅使用 natural key,保证所有 index 的数据都分在同一个 partition;
JobConf.setPartitionClass(...);
2. Sort:真正给 key 排序的比较算法要对 natural key 和 secondary key 两部分进行排序,从而保证了 key 在 id 维度上是有序的,而 id 和 value 是一一对应的,因此 value 也就是有序的。
JobConf.setOutputKeyComparatorClass(...);
3. Group:grouping 的比较算法忽略掉 secondary key,只对 natural keygrouping,使得属于同一 index 的数据都走到同一个 reducer 中去。
JobConf.setOutputValueGroupingComparatorClass(...);
总结一下,这样一来,在 reducer 中,input key 是上述这样一个 composite key 对象,包含了 index 和 id,input value 是一个可以遍历的元素为原始 biz object 类型的对象。
后话:这是 Secondary Sorting 的过程,可以解决我的问题,但是后来发现,实际上,我的问题并不需要要用这样啰嗦的方式来解决:
- 进入 reducer 的 key 只需要是 id,Hadoop 会对 key 自动排序;
- partition 策略不变,但是是在 partitioner 中计算 index 并根据它来 partition;
- 不需要单独指定 Grouping 和 Sorting 的算法;
- 在 reducer 中建立一个大小为上限(如 1000 个)的容器对象 p。
这样,既然对于每个 partition 的数据,都在同一个 reducer 中得到处理,而 reducer 中每次 reduce 方法彼此之间是根据 id 有序进行,那么就可以在每次调用时把数据放到 p 中,在 p 放满时提交一次即可。
测试通过。回头看看,真是刚开始的时候把问题想复杂了。
文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》