项目涉及到一个读取企业微信会话存档的需求,但企微开发者中心并不直接提供 http 接口,而是提供 C 和 Java 的 SDK,需要通过集成 SDK 才能拉取聊天记录并解密
这里记录一下如何将 C 动态库封装为 Python 模块,以便在 Python 后端中调用
Python 调用 C 动态库方案 对于简单的需求,有两种简单的 C 库调用方式:ctypes
和 cffi
ctypes ctypes
是 Python 标准库中的一个模块,通过 ctypes
可以直接加载动态库并调用 C 函数
import ctypeslibc = ctypes.CDLL("libc.so.6" ) print (libc.time(None ))
cffi cffi
是一个更高级的调用 C 库的模块,但需要通过 cdef
方法声明 C 函数的接口
import cffiffi = cffi.FFI() ffi.cdef(""" int time(int *t); """ )libc = ffi.dlopen("libc.so.6" ) print (libc.time(ffi.NULL))
但对于比较复杂的逻辑,更佳的方案是使用 Python/C API
编写 Python 模块,这样可以更好地进行内存管理和异常处理:
Python/C API 使用 C/C++ 编写的 Python/C API
模块
#include <Python.h> #include <time.h> static PyObject *time (PyObject *self, PyObject *args) { time_t t; time(&t); return PyLong_FromLong(t); } static PyMethodDef methods[] = { {"time" , time, METH_VARARGS, "Return the current time" }, {NULL , NULL , 0 , NULL } }; static struct PyModuleDef module = { PyModuleDef_HEAD_INIT, "libc" , NULL , -1 , methods }; PyMODINIT_FUNC PyInit_libc (void ) { return PyModule_Create(&module); }
使用 g++
编译后
g++ -shared -fPIC libc.c -o libc.so $(python3-config --includes) -L$(python3-config --prefix)/lib -lpython3.10
得到的 libc.so
文件可以直接被 Python 解释器加载调用
import libcprint (libc.time())
企业微信 Python SDK 企业微信 C SDK 具体逻辑可以参考官方文档 ,本文基于其 Linux-v1.1 版本进行封装
引入 Python 库与企微 SDK 头文件 #define PY_SSIZE_T_CLEAN #include <Python.h> #include <structmember.h> #include <string> #include "WeWorkFinanceSdk_C.h" auto METH_FLAG = METH_VARARGS | METH_KEYWORDS;
封装 SDK 对象 typedef struct { PyObject_HEAD WeWorkFinanceSdk_t *sdkInstance; const char *proxy, *proxyAuth; } SDKObject; static void sdk_dealloc (SDKObject *self) { DestroySdk (self->sdkInstance); Py_TYPE (self)->tp_free ((PyObject *) self); } static int sdk_init (SDKObject *self, PyObject *args, PyObject *kwargs) { const char *corpId, *secret; const char *kwlist[] = {"corp_id" , "secret" , "proxy" , "proxy_auth" , nullptr }; if (!PyArg_ParseTupleAndKeywords ( args, kwargs, "ss|ss" , const_cast <char **>(kwlist), &corpId, &secret, &self->proxy, &self->proxyAuth)) { PyErr_Format (PyExc_RuntimeError, "invalid params" ); return -1 ; } self->sdkInstance = NewSdk (); int ret = Init (self->sdkInstance, corpId, secret); if (ret != 0 ) { DestroySdk (self->sdkInstance); PyErr_Format (PyExc_RuntimeError, "init sdk error: %d" , ret); return -1 ; } return 0 ; }
封装获取聊天记录接口 static PyObject *sdk_getChatData (SDKObject *self, PyObject *args, PyObject *kwargs) { uint64_t seq = 0 , limit = 10 ; int timeout = 5 ; const char *kwlist[] = {"seq" , "limit" , "timeout" , nullptr }; if (!PyArg_ParseTupleAndKeywords ( args, kwargs, "ii|i" , const_cast <char **>(kwlist), &seq, &limit, self->proxy, self->proxyAuth, &timeout)) { return nullptr ; } auto *chatData = NewSlice (); int ret = GetChatData ( self->sdkInstance, seq, limit, self->proxy, self->proxyAuth, timeout, chatData ); if (ret != 0 ) { PyErr_Format (PyExc_RuntimeError, "get chat data error: %d" , ret); return nullptr ; } auto res = Py_BuildValue ("s" , chatData->buf); FreeSlice (chatData); return res; }
封装获取媒体消息接口 static PyObject *sdk_getMediaFile (SDKObject *self, PyObject *args, PyObject *kwargs) { char *fileId, *filename; int timeout = 10 ; const char *kwlist[] = {"file_id" , "save_to" , "timeout" , nullptr }; if (!PyArg_ParseTupleAndKeywords ( args, kwargs, "ss|i" , const_cast <char **>(kwlist), &fileId, &filename, &timeout)) { return nullptr ; } std::string index; bool isFinish = false ; while (!isFinish) { auto mediaData = NewMediaData (); int ret = GetMediaData ( self->sdkInstance, index.c_str (), fileId, self->proxy, self->proxyAuth, timeout, mediaData ); if (ret != 0 ) { FreeMediaData (mediaData); PyErr_Format (PyExc_RuntimeError, "get media data error: %d" , ret); return nullptr ; } FILE *fp = fopen (filename, "ab+" ); if (fp == nullptr ) { FreeMediaData (mediaData); PyErr_Format (PyExc_RuntimeError, "cannot open file %s" , filename); return nullptr ; } fwrite (mediaData->data, mediaData->data_len, 1 , fp); fclose (fp); index.assign (std::string (mediaData->outindexbuf)); isFinish = mediaData->is_finish; FreeMediaData (mediaData); } return Py_None; }
封装解密聊天消息接口 static PyObject *sdk_decryptData (SDKObject *, PyObject *args, PyObject *kwargs) { char *encryptRandomKey, *encryptMsg; const char *kwlist[] = {"encrypt_random_key" , "encrypt_msg" , nullptr }; if (!PyArg_ParseTupleAndKeywords ( args, kwargs, "ss" , const_cast <char **>(kwlist), &encryptRandomKey, &encryptMsg)) { return nullptr ; } auto data = NewSlice (); int ret = DecryptData (encryptRandomKey, encryptMsg, data); if (ret != 0 ) { PyErr_Format (PyExc_RuntimeError, "decrypt data error: %d" , ret); return nullptr ; } auto res = Py_BuildValue ("s" , data->buf); FreeSlice (data); return res; }
封装 Python 模块 static PyMethodDef sdk_methods[] = { {"get_chat_data" , (PyCFunction) sdk_getChatData, METH_FLAG, "Get chat data" }, {"get_media_file" , (PyCFunction) sdk_getMediaFile, METH_FLAG, "Download media file" }, {"decrypt_data" , (PyCFunction) sdk_decryptData, METH_FLAG, "Decrypt data" }, {nullptr , nullptr , 0 , nullptr } }; static PyTypeObject SDKType = { PyVarObject_HEAD_INIT (nullptr , 0 ) .tp_name="wecom.SDK" , .tp_basicsize=sizeof (SDKObject), .tp_itemsize=0 , .tp_dealloc=(destructor) sdk_dealloc, .tp_methods=sdk_methods, .tp_init=(initproc) sdk_init, .tp_new=PyType_GenericNew, }; PyMethodDef method_table[] = { {nullptr , nullptr , 0 , nullptr } }; PyModuleDef wecom_module = { PyModuleDef_HEAD_INIT, "wecom" , "Python wrap for WeCom C-SDK" , -1 , method_table, }; #pragma clang diagnostic push #pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection" PyMODINIT_FUNC PyInit_wecom (void ) { PyObject *m; if (PyType_Ready (&SDKType) < 0 ) { return nullptr ; } m = PyModule_Create (&wecom_module); if (m == nullptr ) { return nullptr ; } Py_INCREF (&SDKType); if (PyModule_AddObject (m, "SDK" , (PyObject *) &SDKType) < 0 ) { Py_DECREF (&SDKType); Py_DECREF (m); return nullptr ; } Py_Initialize (); return m; }
编译为 Python 模块 setup.py
文件作为 Python 模块的编译配置文件,可以使用 setuptools
的自定义扩展来编译 C++ 代码为 Python 模块
import osimport sysimport subprocessfrom setuptools import setup, Extensionfrom setuptools.command.build_ext import build_extclass CMakeExtension (Extension ): def __init__ (self, name, cmake_lists_dir="." ): Extension.__init__(self, name, sources=[]) self.cmake_lists_dir = os.path.abspath(cmake_lists_dir) class cmake_build_ext (build_ext ): def build_extensions (self ) -> None : try : out = subprocess.check_output(["cmake" , "--version" ]) except OSError: raise RuntimeError("Cannot find CMake executable" ) for ext in self.extensions: ext_dir = os.path.abspath(os.path.dirname(self.get_ext_fullpath(ext.name))) cfg = "Debug" if self.debug else "Release" cmake_args = [ "-DCMAKE_BUILD_TYPE=" + cfg, "-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=" + ext_dir, "-DCMAKE_ARCHIVE_OUTPUT_DIRECTORY=" + self.build_temp, "-DPYTHON_EXECUTABLE=" + sys.executable, ] if not os.path.exists(self.build_temp): os.makedirs(self.build_temp) subprocess.check_call(["cmake" , ext.cmake_lists_dir] + cmake_args, cwd=self.build_temp) subprocess.check_call(["cmake" , "--build" , "." , "--config" , cfg], cwd=self.build_temp) setup( name="wecom-sdk" , version="1.0" , description="Python wrap for wecom C-sdk" , author="Fei Yu" , packages=["wecom" ], package_dir={"wecom" : "./stub" }, package_data={"wecom" : ["__init__.pyi" ]}, ext_modules=[CMakeExtension("wecom" )], cmdclass={"build_ext" : cmake_build_ext} )
这里用了 CMake 作为编译工具,可以在 CMakeLists.txt
中配置编译选项
cmake_minimum_required(VERSION 3.18) project(wecom-sdk) set(CMAKE_CXX_STANDARD 14) find_package(PythonInterp 3.10 REQUIRED) find_package(PythonLibs 3.10 REQUIRED) if (${CMAKE_SYSTEM_NAME} MATCHES Linux) include_directories(lib/linux) link_directories(lib/linux) elseif (${CMAKE_SYSTEM_NAME} MATCHES Windows) include_directories(lib/win) link_directories(lib/win) else () message(FATAL_ERROR "Unsupported Platform: " ${CMAKE_SYSTEM_NAME}) endif() include_directories(${PYTHON_INCLUDE_PATH}) add_library(wecom MODULE src/wecom_module.cpp) set_target_properties( wecom PROPERTIES PREFIX "" OUTPUT_NAME "wecom" LINKER_LANGUAGE CXX ) target_link_libraries(wecom ${PYTHON_LIBRARIES}) target_link_libraries(wecom WeWorkFinanceSdk_C)
为了让 IDE 能够正确识别封装的模块接口,可以在 __init__.pyi
文件中声明函数签名,并与主模块一同打包
class SDK : """ errcode参考 返回码 错误说明 0 ok 10000 参数错误,请求参数错误 10001 网络错误,网络请求错误 10002 数据解析失败 10003 系统失败 10004 密钥错误导致加密失败 10005 fileid错误 10006 解密失败 10007 找不到消息加密版本的私钥,需要重新传入私钥对 10008 解析encrypt_key出错 10009 ip非法 10010 数据过期 10011 证书错误 """ def __init__ (self, corp_id: str , secret: str , proxy: str = None , proxy_auth: str = None ) -> None : """ :param corp_id: 企业id,可以在企业微信管理端--我的企业--企业信息查看 e.g. wwd08c8exxxx5ab44d :param secret: 聊天内容存档的Secret,可以在企业微信管理端--管理工具--聊天内容存档查看 :param proxy: SDK访问的域名是"https://qyapi.weixin.qq.com",如网络环境无法直连该域名,可通过此参数配置代理 e.g. socks5://10.0.0.1:8081 或者 http://10.0.0.1:8081 :param proxy_auth: 代理账号密码 e.g. user_name:passwd_123 """ def get_chat_data (self, seq: int , limit: int , timeout: int = 5 ) -> str : """ :param seq: 企业存档消息序号,该序号单调递增,拉取序号建议设置为上次拉取返回结果中最大序号。首次拉取时seq传0,sdk会返回有效期内最早的消息 :param limit: 本次拉取的最大消息条数,取值范围为1~1000 :param timeout: 拉取会话存档的超时时间,单位为秒 :return: 本次拉取消息的数据,JSON字符串,内容包括errcode/errmsg,以及每条消息内容 e.g. {"errcode":0,"errmsg":"ok","chatdata":[{"seq":196,"msgid":"CAQQ2fbb4QUY0On2rYSAgAMgip/yzgs=","publickey_ver":3,"encrypt_random_key":"ftJ+uz3n/z1DsxlkwxNgE+mL38H42/KCvN8T60gbbtPD+Rta1hKTuQPzUzO6Hzne97MgKs7FfdDxDck/v8cDT6gUVjA2tZ/M7euSD0L66opJ/IUeBtpAtvgVSD5qhlaQjvfKJc/zPMGNK2xCLFYqwmQBZXbNT7uA69Fflm512nZKW/piK2RKdYJhRyvQnA1ISxK097sp9WlEgDg250fM5tgwMjujdzr7ehK6gtVBUFldNSJS7ndtIf6aSBfaLktZgwHZ57ONewWq8GJe7WwQf1hwcDbCh7YMG8nsweEwhDfUz+u8rz9an+0lgrYMZFRHnmzjgmLwrR7B/32Qxqd79A==","encrypt_chat_msg":"898WSfGMnIeytTsea7Rc0WsOocs0bIAerF6de0v2cFwqo9uOxrW9wYe5rCjCHHH5bDrNvLxBE/xOoFfcwOTYX0HQxTJaH0ES9OHDZ61p8gcbfGdJKnq2UU4tAEgGb8H+Q9n8syRXIjaI3KuVCqGIi4QGHFmxWenPFfjF/vRuPd0EpzUNwmqfUxLBWLpGhv+dLnqiEOBW41Zdc0OO0St6E+JeIeHlRZAR+E13Isv9eS09xNbF0qQXWIyNUi+ucLr5VuZnPGXBrSfvwX8f0QebTwpy1tT2zvQiMM2MBugKH6NuMzzuvEsXeD+6+3VRqL"}]} {"errcode":41001,"errmsg":"access_token missing, hint: [1684401769_399_465504c915f130b50294a4abe4eb82d0], from ip: 117.131.109.99, more info at https://open.work.weixin.qq.com/devtool/query?e=41001","chatdata":[]} """ def get_media_file (self, file_id: str , save_to: str , timeout: int = 10 ) -> None : """ :param file_id: 媒体文件id,从解密后的会话消息中得到 :param save_to: 媒体文件保存路径 :param timeout: 拉取媒体文件的超时时间,单位为秒 :return: None """ def decrypt_data (self, encrypt_random_key: str , encrypt_msg: str ) -> str : """ :param encrypt_random_key: 拉取会话存档返回的encrypt_random_key,使用配置在企业微信管理后台的rsa公钥对应的私钥解密后得到encrypt_key :param encrypt_msg: 拉取会话存档返回的encrypt_chat_msg :return: 解密的消息明文,JSON字符串,消息格式参考 https://developer.work.weixin.qq.com/document/path/91774 """
Python 中调用 SDK 初始化 SDK CORP_ID
/SECRET
从企业微信管理后台获取
import wecomsdk = wecom.SDK("CORP_ID" , "SECRET" ) sdk = wecom.SDK("CORP_ID" , "SECRET" , proxy="http://10.0.0.1:8081" , proxy_auth="username:password" )
获取聊天对话数据 data = sdk.get_chat_data(seq=0 , limit=1 ) data = json.loads(data)
解密聊天消息 使用前需要生成 RSA 密钥对,并在企业微信管理后台配置公钥
SDK 获取的聊天消息被随机密钥加密,随机密钥被配置的公钥加密
解密时需要先通过 RSA 私钥解密随机密钥,再调用 SDK 使用随机密钥解密聊天消息
from base64 import b64decodefrom Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5rsa_key = RSA.import_key(open ("private.key" , "r" ).read()) cipher = PKCS1_v1_5.new(rsa_key) chat_data = data["chatdata" ][0 ] random_key = chat_data["encrypt_random_key" ] random_key = b64decode(random_key) random_key = cipher.decrypt(random_key, b'' ) random_key = str (random_key, encoding="utf-8" ) encrypt_chat_msg = chat_data["encrypt_chat_msg" ] chat_msg = sdk.decrypt_data(encrypt_random_key=random_key, encrypt_msg=encrypt_chat_msg) chat_msg = json.loads(chat_msg)
获取媒体文件 sdk_file_id = chat_msg["file" ]["sdkfileid" ] sdk.get_media_file(file_id=sdk_file_id, save_to="/path/to/file" )
手动释放 SDK 占用内存
完整项目代码已上传至 GitHub