已独立成项目在github上面 dataformat,
在进行hadoop测试时,需要造大量数据,例如某个表存在56列,但实际程序逻辑只适用到某几列,我们造的数据 也只需要某几列
#this script change data from your source to the dest data format
#2011-08-05 created version0.1
#2011-10-29 add row-row mapping ,default row value .rebuild all functions. version0.2
#next:add data auto generate by re expression
#2011-12-17 add new functions, add timestamp creator. version0.3
#2012-03-08 rebuild functions. version0.4
#2012-06-22 add function to support multi output separators
#2012-07-11 fix bug line 44,add if
#2012-09-03 rebuild functions,add help msg! version0.5
#2012-11-08 last version edited by lingyue.wkl
# this py: https://github.com/wklken/pytools/blob/master/data_process/dataformat.py
#read file and get each line without n
return [line[:-1] for line in lines ]
def one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno):
for i in range(1, total + 1):
#加入使用默认值列 若是以d开头,后面是默认,否则取文件对应列 done
if fill_index.startswith("d"):
outline.append(fill_index[1:])
outline.append(handler_specal_part(parts[int(fill_index) - 1]))
outline.append(empty_fill)
default_outsp = outsp.get(0,"t")
result.append(outline[i])
result.append(outsp.get(i + 1, default_outsp))
def process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out):
if r":" not in to_row and len(to_row.split(":")) == 2:
used_row.append(int(to_row.split(":")[1]))
if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2:
if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2:
ft_map.update({int(to_row.split("=")[0]): "d"+to_row.split("=")[1]})
elif r":" not in to_row and len(to_row.split(":")) == 2:
ft_map.update({int(to_row.split(":")[0]): to_row.split(":")[1]})
for i in range(1, total + 1):
ft_map.update({int(to_row): str(to_index)})
used_row.append(to_index)
#setp3 处理输出分隔符 outsp 0=t,1= 0代表默认的,其他前面带列号的代表指定的
if len(outsp) > 1 and len(outsp.split(",")) > 1:
outsps = re.findall(r"d=.+?", outsp)
k,v = outsp_kv.split("=")
outsp.update({int(k): v})
lines = read_file(inpath)
if len(insp.split("|")) > 0:
parts = re.split(insp, line)
if len(parts) >= in_count:
outline = one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno)
result.append(outline + "n")
result.append(line + "n")
#特殊的处理入口,处理维度为每一行,目前只有时间处理
def handler_specal_part(part_str):
if part_str.startswith("TS") and "=" in part_str:
ts_format = {8: "%Y%m%d",
#step1 确认输出的格式 TS8 TS10 TS14 TS19
to_l = int(part_str[2:part_str.index("=")])
part_str = part_str.split("=")[1].strip()
inputdate = part_str.split("+")[0].strip()
interval = int(part_str.split("+")[1].strip())
parts = part_str.split("-")
if len(parts) == 2: #20101020 - XX
inputdate = parts[0].strip()
interval = -int(parts[1].strip())
elif len(parts) == 3: #2010-10-20
elif len(parts) == 4: #2010-10-20 - XX
inputdate = "-".join(parts[:-1])
interval = -int(parts[-1])
inputdate = part_str.strip()
part_str = get_timestamp(inputdate, ts_format, interval)
#step4 如果定义了输出格式,转换成目标格式,返回
part_str = time.strftime(ts_format.get(to_l), time.localtime(int(part_str)))
def get_timestamp(inputdate, ts_format, interval=0):
inputdate = time.strftime("%Y%m%d%H%M%S")
inputdate = inputdate.strip()
ts = time.strptime(inputdate, ts_format.get(size))
print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' "
print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' "
return str(int(time.mktime(ts)) + interval)
print("功能:原数据文件转为目标数据格式")
print("t -i inputfilepath [必输,input, 原文件路径]")
print("t -t n [必输,total, n为数字,目标数据总的域个数]")
print("t -a '1,3,4' [必输,array, 域编号字符串,逗号分隔。指定域用原数据字段填充,未指定用'0'填充]")
print("t -a '3,5=abc,6:2' 第5列默认值abc填充,第6列使用输入的第1列填充,第3列使用输入第1列填充")
print("t -o outputfilepath [可选,output, 默认为 inputfilepath.dist ]")
print("t -F 'FS' [可选,field Sep,原文件域分隔符,默认为\t,支持多分隔符,eg.'t|||' ]")
print("t -P 'OFS' [可选,out FS,输出文件的域分隔符,默认为\t,可指定多个,多个需指定序号=分隔符,逗号分隔,默认分隔符序号0 ]")
print("t -f 'fill_str' [可选,fill,未选列的填充值,默认为空 ]")
print("t -s [可选,serial number,当配置时,-f无效,使用列号填充未指派的列]")
print("t -e [可选,error, 源文件列切分不一致行/空行/注释等,会被直接输出,正确行按原逻辑处理]")
def must_be_defined(param, map, error_info):
opts,args = getopt.getopt(sys.argv[1:],"F:P:t:a:i:o:f:hse")
if op in ("-h", "-H", "--help"):
insp = value.decode("string_escape")
outsp = value.decode("string_escape")
print(sys.argv[0]+" : the amount of params must great equal than 3")
print("Command : ./dataformat.py -h")
except getopt.GetoptError:
print(sys.argv[0]+" : params are not defined well!")
print("Command : ./dataformat.py -h")
must_be_defined('inpath', params_map, sys.argv[0]+" : -i param is needed,input file path must define!")
must_be_defined('total', params_map, sys.argv[0]+" : -t param is needed,the fields of result file must define!")
must_be_defined('to', params_map, sys.argv[0]+" : -a param is needed,must assign the field to put !")
if not os.path.exists(inpath):
print(sys.argv[0]+" file : %s is not exists"%inpath)
if 'outpath' not in dir():
process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out)
if __name__ =="__main__":
功能:可指定输入分隔,输出分隔,无配置字段填充,某列默认值,可按顺序填充,也可乱序映射填充
./dataformat.py –i in_file –t 65 -a “22,39,63” –F “^I” –P “^A” –f “0”
in_file中字段是以t分隔的[可不配-F,使用默认]。
将in_file的第1,2,3列分别填充到in_file.dist[use default]的第22,39,63列
in_file.dist共65列,以^A分隔,未配置列以0填充
-a中顺序与源文件列序有关,若-a “39,22,63” 则是将第1列填充到第39列,第二列填充到22列,第3列填充到63列
【需要对某些列填充相同的值,但不想在源文件中维护】
./dataformat.py -i in_file –t 30 –a “3=tag_1,9,7,12=0.0” –o out_file
in_file以t分隔,输出out_file以t分隔
将in_file的第1列,第2列填充到out_file的第9列,第7列
out_file共30列,第3列均用字符串”tag_1”填充,第12列用0.0填充,其他未配置列为空
注意:默认值 的取值,若是使用到等号和冒号,需转义,加 = :
./dataformat.py –i in_file –t 56 –a “3:2,9,5:3,1=abc,11”
目标文件第3列用输入文件第2列填充,目标文件第5列用输入文件第3列填充
目标文件第9列用输入文件第1列填充,第11列用输入文件第4列填充【未配置映射,使用从头开始还没有被用过的列】
脚本会对简单的字段数量等映射逻辑进行检测,复杂最好全配上,使用默认太抽象
本文链接:http://python.jobbole.com/83447/
点击文章底部阅读原文,查看CDA数据分析师认证考试考纲解析和报名流程。
回复关键字 看往期精彩~
1001 ☛ 一分钟读懂2015中国数据分析师行业峰会!
1002 ☛ 吴喜之:数据分析和数据挖掘是最大的求职法宝
1003 ☛ 33道Hadoop面试题,看看你能答对多少?(答案在后面)
1004 ☛ 成为首席数据官是一种什么样的体验?
1005 ☛ 超能教程 十分钟学会 Python!
