Streaming Pipeline in Python - 2

Posted on | 548 words | ~2 mins
Python Design Pattern

除了上一篇文章中提到的几个问题,在使用Generator Expression的过程中,还遇到了一个bug。

pipeline中数据处理分为两步:

第一步,基于每一个旧数据生成一笔新数据,简要代码如下。请注意我们使用了生成器。

1def extend_old_data(items):
2    for item in items:
3        # generate a new item based on an old item
4        new_item = Item()
5        new_item['data'] = item['data'] + 1
6        yield new_item

第二步,使用chain将新旧数据生成器混合在一起,关于itertools

1from itertools import chain
2old_data = read_data('data.txt')
3new_data = extend_old_data(old_data)
4all_data = chain(old_data, new_data)

当然最后要把数据写入文件

1write_to_text_file(all_data, 'processed_data.txt')

如果data.txt中内容为

11       
22

processed_data.txt中会是什么内容?我们可是期盼得到

11          
22          
33        
44

可如果运行代码,我们得到是

11              
22

Generator Expression是生成器,生成器的特点是所有数据都是on the fly,“用后即焚”。我们把代码按照运行的顺序从新排布一下:

1old_data = read_data('data.txt')
2write_to_text_file(old_data, 'processed_data.txt')
3write_to_text_file(extend_old_data(old_data), 'processed_data.txt')

看出来了吗?old_data被write_to_text_file先进行了“用后即焚”。到了extend_old_data(old_data)处,old_data已经空了,自然不会有新的item生成。如果不在意新item和旧item的排列顺序的话,为了正确实现合并新旧数据的意图,需要修改extend_old_data方法如下:

 1def extend_old_data(items):
 2    for item in items:
 3        # generate a new item based on an old item
 4        new_item = Item()
 5        new_item['data'] = item['data'] + 1
 6        yield item
 7        yield new_item
 8
 9old_data = read_data('data.txt')
10all_data = extend_old_data(old_data)