如何实现一个基于 jupyter 的 microservices
阅读原文时间:2024年06月17日阅读:1

零、背景:


现有基于 Node.js 的项目,但需要整合 Data Science 同事的基于 python(jupyter) 的代码部分,以实现额外的数据分析功能。于是设想实现一个 microservices。下面介绍一些库的使用方法、自己写的 demo和遇到的坑,方便以后查阅。

一、jupyter_kernel_gateway


第一步,是想办法把 jupyter 文件当成一个 http server 启动,以便可以接受来自任何异构项目的调用。这里可以用到jupyter_kernel_gatewaynotebook-http 功能。

官方文档:https://jupyter-kernel-gateway.readthedocs.io/en/latest/http-mode.html

1、安装

pip install jupyter_kernel_gateway

2、启动

jupyter kernelgateway --KernelGatewayApp.api='kernel_gateway.notebook_http' --KernelGatewayApp.seed_uri='/Users/xjnotxj/Program/PythonProject/main.ipynb'

seed_uri除了是本地路径,也可以是个url http://localhost:8890/notebooks/main.ipynb

3、使用

import json


# imitate REQUEST args (调试时候用,平时请忽略)
# REQUEST = json.dumps({'body': {'age': ['181']}, 'args': {'sex': ['male'], 'location': ['shanghai']}, 'path': {'name': 'colin'}, 'headers': {'Content-Type': 'multipart/form-data; boundary=--------------------------149817035181009685206727', 'Cache-Control': 'no-cache', 'Postman-Token': '96c484cb-8709-4a42-9e12-3aaf18392c92', 'User-Agent': 'PostmanRuntime/7.6.0', 'Accept': '*/*', 'Host': 'localhost:8888', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '161', 'Connection': 'keep-alive'}})

注释定义路由:# POST /post/:name(可以多个 cell 一起用),请求体自动绑定在 req 对象上:

# POST /post/:name

req = json.loads(REQUEST)

# defined return vars
return_status = 200
return_code = 0
return_message = ''
return_data = {}

这里定义了一个检查 req 参数的 function,因为 jupyter_kernel_gateway 不支持 return 或者 exit 退出当前 request,还是会继续往后执行,导致多个输出干扰最终 response 结果。所以我这边代码逻辑写的不简洁,如果有知道改进的朋友可以告诉我。

# POST /post/:name 

def checkReqValid(req):  

    global return_code
    global return_message

    # age
    if 100 <= req["age"] or req["age"] < 0:
        return_code = -2
        return_message = "'age' is out of range"
        return True

    return False

实现 controller 部分:

# POST /post/:name 

try :   

    name = req['path']['name']
    age = int(req['body']['age'][0])
    sex = req['args']['sex'][0]
    location = req['args']['location'][0]

    if checkReqValid({"name":name,
                        "age":age,
                        "sex":sex,
                        "location":location}) == True:
        pass
    else :
        # dosomething……
        return_data = {
            "name":name,
            "age":age,
            "sex":sex,
            "location":location,
            "req":req
        }

except KeyError: # check has field is empty
    return_code = -1
    return_message = "some field is empty"

finally: # return data
    print(json.dumps({
        "code":return_code,
        "message":return_message,
        "data":return_data
    })) 

# ResponseInfo POST /post/:name 定义输出响应头,用 print 写入stdout 的方式来响应请求:

# ResponseInfo POST /post/:name

print(json.dumps({
    "headers" : {
        "Content-Type" : "application/json"
    },
    "status" : return_status
}))

当我访问localhost:8888/post/colin?sex=male&location=shanghai且body体为 age:18时,返回值为:

{
    "code": 0,
    "message": "",
    "data": {
        "name": "colin",
        "age": 18,
        "sex": "male",
        "location": "shanghai",
        "req": {
            "body": {
                "age": [
                    "18"
                ]
            },
            "args": {
                "sex": [
                    "male"
                ],
                "location": [
                    "shanghai"
                ]
            },
            "path": {
                "name": "colin"
            },
            "headers": {
                "Content-Type": "multipart/form-data; boundary=--------------------------981201125716045634129372",
                "Cache-Control": "no-cache",
                "Postman-Token": "ec0f5364-b0ea-4828-b987-c12f15573296",
                "User-Agent": "PostmanRuntime/7.6.0",
                "Accept": "*/*",
                "Host": "localhost:8888",
                "Accept-Encoding": "gzip, deflate",
                "Content-Length": "160",
                "Connection": "keep-alive"
            }
        }
    }
}

关于响应码:

默认下为200 OK (且Content-Type: text/plain

如果发生运行错误,则返回500 Internal Server Error

如果没有找到路由,则返回404 Not Found

如果找到路由但是 get/post 等这类请求方法还是没匹配上,则返回405 Not Supported

4、坑

(1)cell 里涉及到注释实现的路由功能时,首行不能是空行,不然报错:
✘ xjnotxj@jiangchengzhideMacBook-Pro  ~/Program/PythonProject  jupyter kernelgateway --KernelGatewayApp.api='kernel_gateway.notebook_http' --KernelGatewayApp.seed_uri='/Users/xjnotxj/Program/PythonProject/tuo.ipynb'
[KernelGatewayApp] Kernel started: bb13bcd6-514f-4682-b627-e6809cbb13ac
Traceback (most recent call last):
  File "/anaconda3/bin/jupyter-kernelgateway", line 11, in <module>
    sys.exit(launch_instance())
  File "/anaconda3/lib/python3.7/site-packages/jupyter_core/application.py", line 266, in launch_instance
    return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs)
  File "/anaconda3/lib/python3.7/site-packages/traitlets/config/application.py", line 657, in launch_instance
    app.initialize(argv)
  File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/gatewayapp.py", line 382, in initialize
    self.init_webapp()
  File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/gatewayapp.py", line 449, in init_webapp
    handlers = self.personality.create_request_handlers()
  File "/anaconda3/lib/python3.7/site-packages/kernel_gateway/notebook_http/__init__.py", line 112, in create_request_handlers
    raise RuntimeError('No endpoints were discovered. Check your notebook to make sure your cells are annotated correctly.')
RuntimeError: No endpoints were discovered. Check your notebook to make sure your cells are annotated correctly.
 ✘ xjnotxj@jiangchengzhideMacBook-Pro  ~/Program/PythonProject  [IPKernelApp] WARNING | Parent appears to have exited, shutting down.
(2)response 里argsbody体里的参数值是一个长度为1的数组
# 注意取法
sex = req['args']['sex'][0]

二、papermill


第二步,就是用类似胶水的东西,把不同的 Data Science 处理脚本,粘连起来,依次调用。

为什么要使用papermill,而不是直接调用脚本?

(1)规范了调用jurpyter文件和传参的模式

(2)执行jurpyter文件后可以生成 out 文件,方便回溯

(3)上下文变量按照每一个jurpyter文件划分区域去存储,互不干扰

1、安装

https://github.com/nteract/papermill

pip install papermill

2、使用

(1)a.ipynb
import papermill as pm 

for i, item in enumerate(data):
    data[i] = item * multiple

pm.record("data", data)
print(data)
(2)main.ipynb
data=[1,2,3]
data


# 也可以通过命令行运行,详细看文档
pm.execute_notebook(
   'a.ipynb',
   'a_out.ipynb',
   parameters = dict(data=data,multiple=3)
)

Papermill 支持输入和输出路径有以下几种类型:

(1)本地文件系统: local

(2)HTTP,HTTPS协议: http://, https://

(3)亚马逊网络服务:AWS S3 s3://

(4)Azure:Azure DataLake Store,Azure Blob Store adl://, abs://

(5)Google Cloud:Google云端存储 gs://

执行main.ipynb后:

1、会生成a_out.ipynb新文件(见下文的(3))

2、有绑定在a_out.ipynb上的上下文变量:

re = pm.read_notebook('a_out.ipynb').dataframe
re

name

value

type

filename

0

data

[1, 2, 3]

parameter

a_out.ipynb

1

multiple

3

parameter

a_out.ipynb

2

data

[3, 6, 9]

record

a_out.ipynb

获取参数稍微有一些繁琐,我这里封装了个 function:

# getNotebookData args
# [filename] .ipynb的文件路径
# [field] 取值变量
# [default_value] 默认返回值(default:None)
# [_type] 'parameter'|'record'(default)

def getPMNotebookData(filename, field ,default_value = None,_type='record'):
    result = default_value
    try:
        re = pm.read_notebook(filename).dataframe
        result = re[re['name']==field][re['type']==_type]["value"].values[0]
    except:
        pass
    finally:
        return result
data = getPMNotebookData('a_out.ipynb', 'data', 0)
data
# [3, 6, 9]
(3)a_out.ipynb

生成的这个新文件,会多出两块内容:

1、在所有 cell 的最开头,会自动插入新的 cell,里面有我们传入的参数

# Parameters
data = [1, 2, 3]
multiple = 3

2、cell 对应的 out 信息

[3, 6, 9]

3、坑

(1)参数不能传 pd.Dataframe 类型

会报错:

TypeError: Object of type DataFrame is not JSON serializable

解决办法:

1、序列化 Dataframe

Dataframe提供了两种序列化的方式,df.to_json()df.to_csv(),解析或者详细的用法请看:https://github.com/nteract/papermill/issues/215

缺点:

在序列化的过程中,Dataframe 每列的数据类型会发生丢失,重新读取后需重新指定。

2、不通过 papermill 的传参机制去传输 Dataframe,而是通过 csv 中间文件承接 【推荐】

三、docker 封装


第三步,就是用 docker ,封装设计好的 microservices,以便部署。

待写……