本文是上篇介绍PyODPS《数据处理脚手架PyODPS入门体验》的后续篇,旨在分享如何利用PAI-Designer克服传统ODPS限制,实现动态配置联动、数据与图片处理集成的离线处理方案,通过实战经验与代码示例,深度揭示PAI-Designer在提升数据处理灵活性与功能扩展性方面的独特优势。
背景
接触PyODPS一年之后,现在已经是高端写手了。但仍然有解决不了的问题:
odps没有network,不能和在线配置联动。比如我就有个需求,需要根据在线配置动态决定每天什么上线什么下架。获取根据在线配置动态获取业务关系,用于离线业务处理。odps没有network,不能在强大的pyodps里按行处理数据并上传oss文件。并把url写入到数据表中。odps不能接入图片处理/算法能力,离线处理数据。基于以上,我简单研究了一下PAI-Designer,实现了几个需求。这里分享一下经验~
如何写出来一套牛逼哄哄的万能的离线处理PAI:
初始之地:创建工作流OSS RoleARN看?获取帮助~
嗯,没有这玩意,你写不了代码。
第一个Python脚本拖一个python脚本出来,在左侧栏的自定义脚本中:
这里可以看到只有4个输入+4个输出,明显是被限制了上下游依赖,这是明示咱们写代码模块划分清晰,每个模块单一职责不要太多代码~
不得不说,pai的这个代码模板写得真好:
import osimport argparseimport json"""Python脚本组件示例代码"""# 当前工作空间下的默认MaxCompute执行环境,包含MaxComputeProject的名称以及Endpoint.# 需要当前的工作空间下有MaxCompute项目时,作业的执行环境才会注入。# 示例: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"def init_odps(): """初始化一个ODPS实例,用于读写MaxCompute数据. 具体API请参考PyODPS的文档: https://pyodps.readthedocs.io/ """ from odps import ODPS # 当前工作空间的默认MaxCompute项目信息. mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<YouAccessKeyId>", # ODPS的AccessKey secret_access_key="<YourAccessKeySecret>", # ODPS的AccessSecret # 请根据Project所在的Region选择: https://help.aliyun.com/document_detail/34951.html endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return odef parse_odps_url(table_uri): """解析输入的MaxCompute Table URI 需要打开的MaxCompute表名,格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/ 示例:odps://test/tables/iris/pa=1/pb=1,其中pa=1/pb=1是一个多级partition。 Returns: 返回三元组(ProjectName, TableName, Partition) """ from urllib import parse parsed = parse.urlparse(table_uri) project_name = parsed.hostname r = parsed.path.split("/", 2) # 这里血坑,注意把这里的2改成3!!!!!!!!可能是代码错误? table_name = r[2] if len(r) > 3: partition = r[3] else: partition = None return project_name, table_name, partitiondef parse_args(): """解析给到脚本的arguments.""" parser = argparse.ArgumentParser(description="Python component script example.") # 从上游连线输入当前组件端口的输入,会通过arguments的方式传递给到执行的脚本 # 1. 组件输入 # - OSS的输入: # 来自上游组件的OSS输入,会被挂载到脚本执行的节点上, 然后挂载后的文件路径,会arguments的形式,传递给到运行的脚本。 # 例如 "python main.py --input1 /ml/input/data/input1 " # - MaxComputeTable的输入: # MaxComputeTable的输入不支持挂载,对应的Table信息会以URI的形式,作为arguments传递给到运行脚本 # 例如 "python main.py --input1 odps://some-project-name/tables/table # 对于ODPS URI形式输入,可以用示例的parse_odps_url函数解析出对应的元信息。 parser.add_argument("--input1", type=str, default=None, help="Component input port 1.") parser.add_argument("--input2", type=str, default=None, help="Component input port 2.") parser.add_argument("--input3", type=str, default=None, help="Component input port 3.") parser.add_argument("--input4", type=str, default=None, help="Component input port 4.") # 组件输出 # - OSS输出 # 组件的输出端口1和输出端口2是两个OSS输出端口,可以用于下游的使用OSS路径作为输入的组件。 # 配置组件输出任务输出路径,对应的输出目录会被挂载到 /ml/output/ 下。 # 组件的输出端口 "OSS输出-1"和"OSS输出-2",分别对应子目录/ml/output/output1 和 ml/output/output2。 # - MaxComputeTable的输出 # 组件的输出端口3和输出端口4是MaxComputeTable输出. # 如果当前的工作空间配置了MaxComputeProject项目,则组件传递一个临时表URI给到脚本。 # 例如 python main.py --output3 odps://<some-project-name>/tables/<output-table-name> # 用户的代码可以构建对应的表,写出数据到对应表,然后通过组件连线将表传递给到下游组件。 parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.") parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.") parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.") args, _ = parser.parse_known_args() return argsdef write_table_example(args): """示例:复制将PAI提供公共表的数据,作为当前组件的临时表输出: 更多PyODPS请参考PyODPS文档: https://pyodps.readthedocs.io/ """ output_table_uri = args.output3 o = init_odps() project_name, table_name, partition = parse_odps_url(output_table_uri) o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")def write_output1(args): """将数据结果写入Mount的OSS路径上(output1子目录),对应的结果可以通过连线传递到下游""" output_path = args.output1 os.makedirs(output_path, exist_ok=True) p = os.path.join(output_path, "result.text") with open(p, "w") as f: f.write("TestAccuracy=0.88")if __name__ == "__main__": args = parse_args() print("Input1={}".format(args.input1)) print("Output1={}".format(args.output1)) # write_table_example(args) # write_output1(args)模板的注释解释的贼清楚,这里就不多赘述了,之所以写这个文章,主要是想分享一下常用的几种Python脚本模板。
在线配置读取
在线配置选型选择了MT3:
用过的同学会发现配置文件都在CDN上,预发配置文件一个地址,线上配置文件一个地址。可以直接用requests来拉cdn文件。对应的pai脚本模板代码:
import requestsimport osimport argparseimport json# parse_args与模板一致.def parse_args(): parser = argparse.ArgumentParser(description="Python component script example.") parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") args, _ = parser.parse_known_args() return args# 目标 URLurl = "https://xxx.alicdn.com/fpi/xxxx-data/v1/xxx-config.js?"# 发送 HTTP GET 请求response = requests.get(url)# 检查请求是否成功if response.status_code == 200: # 将响应内容写入文件 args = parse_args() """将数据结果写入Mount的OSS路径上(output1子目录),对应的结果可以通过连线传递到下游""" os.makedirs(args.output1, exist_ok=True) p = os.path.join(args.output1, "result.txt") # 代码里写入的output1,拉线的时候要从output1拉出~ with open(p, "wb") as f: f.write(response.content)else: # 失败及时终止任务 raise Exception("DIY C端配置读取失败")写完代码记得给python脚本的模块配置一个OSS目录。
ODPS数据处理脚本
import argparseimport jsonimport osfrom odps import ODPS, optionsfrom odps.df import DataFrame, Scalar, agg, func, outputoptions.lifecycle=7"""Python脚本组件示例代码"""# 当前工作空间下的默认MaxCompute执行环境,包含MaxComputeProject的名称以及Endpoint.# 需要当前的工作空间下有MaxCompute项目时,作业的执行环境才会注入。# 示例: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"def init_odps(): """初始化一个ODPS实例,用于读写MaxCompute数据. 具体API请参考PyODPS的文档: https://pyodps.readthedocs.io/ """ # 当前工作空间的默认MaxCompute项目信息. mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="", secret_access_key="", # 请根据Project所在的Region选择: https://help.aliyun.com/document_detail/34951.html endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return odef parse_odps_url(table_uri): """解析输入的MaxCompute Table URI 需要打开的MaxCompute表名,格式为odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/ 示例:odps://test/tables/iris/pa=1/pb=1,其中pa=1/pb=1是一个多级partition。 Returns: 返回三元组(ProjectName, TableName, Partition) """ from urllib import parse parsed = parse.urlparse(table_uri) project_name = parsed.hostname r = parsed.path.split("/", 3) table_name = r[2] if len(r) > 3: partition = r[3] else: partition = None return project_name, table_name, partitiondef parse_args(): """解析给到脚本的arguments.""" parser = argparse.ArgumentParser(description="Python component script example.") # 从上游连线输入当前组件端口的输入,会通过arguments的方式传递给到执行的脚本 # 1. 组件输入 # - OSS的输入: # 来自上游组件的OSS输入,会被挂载到脚本执行的节点上, 然后挂载后的文件路径,会arguments的形式,传递给到运行的脚本。 # 例如 "python main.py --input1 /ml/input/data/input1 " # - MaxComputeTable的输入: # MaxComputeTable的输入不支持挂载,对应的Table信息会以URI的形式,作为arguments传递给到运行脚本 # 例如 "python main.py --input1 odps://some-project-name/tables/table # 对于ODPS URI形式输入,可以用示例的parse_odps_url函数解析出对应的元信息。 parser.add_argument( "--input1", type=str, default=None, help="Component input port 1." ) parser.add_argument( "--input2", type=str, default=None, help="Component input port 2." ) parser.add_argument( "--input3", type=str, default=None, help="Component input port 3." ) parser.add_argument( "--input4", type=str, default=None, help="Component input port 4." ) # 组件输出 # - OSS输出 # 组件的输出端口1和输出端口2是两个OSS输出端口,可以用于下游的使用OSS路径作为输入的组件。 # 配置组件输出任务输出路径,对应的输出目录会被挂载到 /ml/output/ 下。 # 组件的输出端口 "OSS输出-1"和"OSS输出-2",分别对应子目录/ml/output/output1 和 ml/output/output2。 # - MaxComputeTable的输出 # 组件的输出端口3和输出端口4是MaxComputeTable输出. # 如果当前的工作空间配置了MaxComputeProject项目,则组件传递一个临时表URI给到脚本。 # 例如 python main.py --output3 odps://<some-project-name>/tables/<output-table-name> # 用户的代码可以构建对应的表,写出数据到对应表,然后通过组件连线将表传递给到下游组件。 parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.") parser.add_argument( "--output3", type=str, default=None, help="Output MaxComputeTable 1." ) parser.add_argument( "--output4", type=str, default=None, help="Output MaxComputeTable 2." ) args, _ = parser.parse_known_args() return argsdef merge_item(args): """示例:复制将PAI提供公共表的数据,作为当前组件的临时表输出: 更多PyODPS请参考PyODPS文档: https://pyodps.readthedocs.io/ """ config = None # 这里是读OSS的配置,这个oss的配置是一段文本,可以直接加载进来用。 with open(args.input3 + "/result.txt") as f: config = json.loads(f.read()) o = init_odps() ( item_project_name, item_table_name, item_partition, ) = parse_odps_url(args.input1) output_project, output_table_name, output_partition = parse_odps_url(args.output3) # 这里的item表来自组件《读数据表》 item_table = DataFrame( o.get_table(f"{item_project_name}.{item_table_name}").get_partition(item_partition) ) # 如果不想要依赖某个表,可以用get_max_partition()取一个最大的分区。 room_table = DataFrame( o.get_table("project.room_table").get_max_partition() ) t = item_table.left_join(room_table, on=[item_table.room_id == room_table.id]) @output(["value"], ["string"]) def handle_value(row): import urllib.parse # url格式是%2D,这种urlencode的格式,这里做一下decode. yield { "sku_id": row.sku_id, "url": urllib.parse.unquote(row.url) } t = t[ t.item_id, t.apply(handle_value, axis = 1) ] ## 做一些超绝的数据聚合,避免重复,如何选择数据。 Agg(object): def buffer(self): # 第一个对象是以RoomId为key,值为url路径列表的value # 第二个对象是以RoomId为key,值为skuId列表的value # 第三个对象是以skuId为key,值为url路径的value # 目标:让尽可能多的skuId都在同一个RoomId下。 return { "roomid_2_url_list": {}, "roomid_2_skuid_list": {}, "skuid_2_url": {} } def __call__(self, buffer, value): value = json.loads(value) if value["sku_id"] not in buffer["skuid_2_url"].keys(): buffer["skuid_2_url"][value["sku_id"]]= value if value["room_id"] not in buffer[roomid_2_url_list]: buffer["roomid_2_url_list"][value["room_id"]] = [value] buffer["roomid_2_skuid_list"][value["room_id"]] = [value["sku_id"]] else: if value["sku_id"] not in buffer["roomid_2_skuid_list"][value["room_id"]]: buffer["roomid_2_url_list"][value["room_id"]].append(value) buffer["roomid_2_skuid_list"][value["room_id"]].append(value["sku_id"]) def merge(self, buffer, pbuffer): for room_id, value_list in pbuffer[0].items(): if room_id not in buffer["roomid_2_url_list"].keys(): buffer["roomid_2_url_list"][room_id] = pbuffer["roomid_2_url_list"][room_id] buffer["roomid_2_skuid_list"][room_id] = pbuffer["roomid_2_skuid_list"][room_id] continue for i in range(0, len(value_list)): if pbuffer["roomid_2_skuid_list"][room_id][i] not in buffer["roomid_2_skuid_list"][room_id]: buffer["roomid_2_url_list"][room_id].append(pbuffer["roomid_2_url_list"][room_id][i]) buffer["roomid_2_skuid_list"][room_id].append(pbuffer["roomid_2_skuid_list"][room_id][i]) for sku_id, value in pbuffer["skuid_2_url"].items(): if sku_id not in buffer["skuid_2_url"].keys(): buffer["skuid_2_url"][sku_id]= value def getvalue(self, buffer): res = [] # 优先roomId最多的那个组合,剩下的随机取一个。 for room_id, value_list in buffer[0].items(): if len(value_list) > len(res): res = value_list all_ready_sku_id = [x["skuId"] for x in res] for x, v in buffer[2].items(): if x not in all_ready_sku_id: # 从剩余的数据里随便挑一个. res.append(v) return json.dumps(res) to_agg = agg([t.value], Agg, rtype='string') t=t.groupby("item_id").agg(value=to_agg) t=t[ t.item_id, t.value, Scalar(1).rename("status") ] t.persist(f"{output_project}.{output_table_name}")if __name__ == "__main__": args = parse_args() merge_item(args)执行以下,可以看到我们能通过pyodps实现非常复杂的数据处理,这是之前ODPS SQL无法带给我们的能力。
我们会看到如下执行参数,这里的input就是之前的输入:
ODPS的input就是读数据表,或者上游的MAXCompute的输出而类似:--input2 /ml/input/data/input2/output1 就是文件资源,上游通常为oss的output。所以oss的文件在代码中是通过读文件的方式读取的。超绝的图片处理
可以看到我这个构建edge_color模块不仅根据配置读取了表的数据,并且将表中的url下载计算edge_color,写入了数据表。
极其强大!
但是我们会有个问题,pai是怎么具有图片处理的能力的呢?答案来自他的执行配置:
可以自定义镜像!当然,如果只是简单的图片处理也可以通过。
安装一些包。这里是为了探索镜像构建的流程,整理了个自己的Docker镜像。
▐镜像构建我的dockerfile:
FROM reg.docker.alibaba-inc.com/alibase/alios7u2-min:1.13COPY ./resource/Python-3.9.18.tar.xz /home/admin/Python-3.9.18.tar.xzWORKDIR /home/adminRUN rpm --rebuilddb && yum install -y gcc gcc-c++ automake autoconf libtool make zlib-devel openssl openssl-devel libxslt-devel libxml2-develRUN rpm --rebuilddb && yum install -y pcre pcre-devel zlib zlib-devel libffi-devel# 安装pythonRUN tar xJf Python-3.9.18.tar.xz && \cd Python-3.9.18 && ./configure --prefix=/usr/local/python && make && make install && \rm -rf /usr/bin/python && \ln -s /usr/local/python/bin/python3 /usr/bin/python && ln -s /usr/local/python/bin/pip3 /usr/bin/pip# 建议换一个自己相关的源RUN pip install --upgrade pip && pip config set global.index-url https://xxxx.xxxx-xxxx.cn/simple/RUN pip install setuptools>=3.0 pyodps pillow requests numpy scipy matplotlib下载Python的安装包确实太慢了,就把python的zip也传到了code的repo里。
PS: 大家不要模仿,完全可以把安装包传到OSS之类的地方加个速。不能什么大文件都往git传。
可以看到我的镜像里安装了pillow、numpy、pyodps。构建完镜像,上传到对应的镜像仓库,如:docker hub等。
▐脚本编写在pai-designer的 python脚本 的执行配置中配置上自己的景象地址之后就可以写main脚本了。
def execute(args): # 读线上的配置. room_config = None with open(args.input1 + "/result.txt") as f: room_config = json.loads(f.read()) room_ids = [int(x["roomId"]) for x in room_config] # 因为是要计算所有的room的edgeColor,所以这里再读一份预发的. room_config_pre = None with open(args.input2 + "/result.txt") as f: room_config_pre = json.loads(f.read()) for x in room_config_pre: if int(x["roomId"]) not in room_ids: room_ids.append(int(x["roomId"])) o = init_odps() output_project, output_table_name, output_partition = parse_odps_url(args.output3) room_table = DataFrame( o.get_table("project.room_table").get_max_partition() ) # 用room_ids的id做一下过滤 room_table = room_table[ room_table.id.isin(room_ids) ] edge_color_result = {} # 实际上配置不到100条数据. for current in room_table.head(100): # 下载图片并解析edgeColor url = current[1] image = download_image(url) edge_color = get_average_color(image) edge_color_result[current[0]]= edge_color os.makedirs(args.output1, exist_ok=True) p = os.path.join(args.output1, "result.txt") with open(p, "w") as f: f.write(json.dumps(edge_color_result))就贴一下关键的部分代码。这里get_average_color调用pillow去获取和计算图片边缘的颜色。
这里有几个注意点:
pyodps的@output以及自定义聚合函数Agg的数据处理方法并不能引用非基础库的依赖。 -- 这里给大家踩过坑啦。因为我的数据不多,所以可以head一下全拿出来,在主进程中做成一个dict,为后续链路使用。但实际上head是有上限的。而且不建议这么用。建议写法,参考帮助文档:https://pyodps.readthedocs.io/zh-cn/stable/base-tables.html#table-read
with t.open_reader(partition='pt=test,pt2=test2') as reader: count = reader.count for record in reader[5:10]: # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作 # 处理一条记录结语PAI-Designer作为一个强大的离线数据处理工具,不仅能够弥补传统ODPS在处理动态配置、网络访问及复杂数据处理上的不足,还通过高度可定制的Python脚本组件和灵活的环境配置,极大地扩展了其应用范围。用户能够借助PAI-Designer构建出既满足特定业务需求又具备高度灵活性的离线数据处理流程,尤其是在需要集成外部服务或进行复杂数据转换的场景下,其价值尤为显著。然而,随着功能的增强,用户也需谨记安全与合规操作的重要性,确保数据处理过程的安全性。
作者:特巴
来源-微信公众号:大淘宝技术
出处:https://mp.weixin.qq.com/s/bJ7mfcUdXeE6nlx6YF7cHw