CWYAlpha

Just another WordPress.com site

Thought this was cool: 基于python实现map-reduce并用来计算co-occurance (一)

leave a comment »


最近我们的一些离线计算平台迁移到linux,于是研究了一下Unix编程艺术。感受颇深,其中的管道编程和模块化的思想启发我可以自己实现一个单机的map-reduce。其实,map-reduce的核心一个函数式编程的思想,将所有的操作抽象成map和reduce两个函数。而并行化则是说在做了这样的抽象之后就很容易并行化了,所以map-reduce本身并不代表并行化,我们写单机的程序也可以用map-reduce。

我们先讨论一个非并行的map-reduce。

简单概括。map函数的作用是将数据流中的纪录映射到一个key-value pair。然后经过sort,可以将相同的key join起来,然后传给reduce函数。所以如果用linux的管道可以如下表示整个map-reduce的过程

input | map | sort | reduce | output

mapreduce的库函数包含两个函数,一个是do_map。代码如下所示。这个函数接受3个输入函数 map_func, map_setup, map_cleanup,这三个函数的作用和hadoop里面的map, setup, cleanup的作用是一样的。这个函数的过程大致如下,从fin中读入以\t分隔的一行,从而得到这一行的key和value,然后通过map_func处理这个key value pair。

def do_map(map_func, map_cleanup = None, map_setup = None, \
           fin = sys.stdin, config = None):
    if config == None:
        config = dict()
    if map_setup != None:
        map_setup(config)

    for line in fin:
        tks = line.strip().split('\t')
        key = tks[0]
        value = '\t'.join(tks[1:])
        map_func(key, value, config)
    if map_cleanup != None:
        map_cleanup(config)

令一个函数是do_reduce,它接受do_map的输出经过sort后的结果,在sort之后,同样的key将会被sort在一起,这样我们就可以得到具有同样key的所有value,然后将key和values发送给reduce_func进行处理。

def do_reduce(reduce_func, reduce_cleanup = None, \
              reduce_setup = None, fin = sys.stdin, config = None):
    if config == None:
        config = dict()
    if reduce_setup != None:
        reduce_setup(config)
    prev_key = ''
    values = []
    for line in fin:
        tks = line.strip().split('\t')
        key = tks[0]
        value = '\t'.join(tks[1:])

        if prev_key != key:
            if len(prev_key) > 0 and len(values) > 0:
                reduce_func(prev_key, values, config)
            prev_key = key
            values = []
        values.append(value)
    if reduce_cleanup != None:
        reduce_cleanup(config)

有了这两个函数,我们就可以写自己的map_func和reduce_func来进行很多常见的处理了。比如我们最为经典的wordcount问题。

假设我们的输入是一个文本文件hello.txt,每行是空格分隔的单词。因为我们的do_map函数认为输入文件的每一行必须是一个\t分隔的key value pair。这个没有关系,我们可以通过sed处理一下。比如我们用下面的命令就可以得到每个word的词频:

cat hello.txt | sed s/^/0″\t”/ | python wordcount.py map | sort | python wordcount.py reduce

下面的任务就是如何实现wordcount.py,实现如下:

import mapreduce
import sys

def map_func(key, value, config):
    for word in value.split(' '):
        print word + '\t' + '1'

def reduce_func(key, values, config):
    print key + '\t' + str(sum([int(x) for x in values]))

if sys.argv[1] == 'map':
    mapreduce.do_map(map_func = map_func)
else if sys.argv[1] == 'reduce':
    mapreduce.do_reduce(reduce_func = reduce_func)

上面简单介绍了这个python的map-reduce结构。对于这个结构,我们可以用简单的bash脚本将它改成一个基于linux多进程的并行map-reduce框架。如果再做的复杂一点,不利用进程的管道通信,而是基于socket通信,我们就可以将它改成一个基于集群机器的多机并行的map-reduce。当然,如果要做的更复杂,最好有分布式文件系统的支持。不过我们就还是只讨论上面提到的基于管道的单机实现。

下面可以介绍一下如何利用上面的框架实现movielens数据集中电影的相似度计算,也就是传统的itemcf算法。(明天待续)

您可能也喜欢:

python 爬虫

Hadoop Map Reduce TextInputFormat的编码问题 (GZIP)

用Python写一个小小的爬虫程序 (Python Spider)

Test XMLRPC By Python

高精度计算问题

无觅

from xlvector – Recommender System: http://xlvector.net/blog/?p=874

Written by cwyalpha

十二月 30, 2012 在 7:08 上午

发表在 Uncategorized

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s

%d 博主赞过: