Python调用C库获取企业微信会话存档

Fei Yu

项目涉及到一个读取企业微信会话存档的需求,但企微开发者中心并不直接提供 http 接口,而是提供 C 和 Java 的 SDK,需要通过集成 SDK 才能拉取聊天记录并解密

这里记录一下如何将 C 动态库封装为 Python 模块,以便在 Python 后端中调用

Python 调用 C 动态库方案

对于简单的需求,有两种简单的 C 库调用方式:ctypescffi

ctypes

ctypes 是 Python 标准库中的一个模块,通过 ctypes 可以直接加载动态库并调用 C 函数

import ctypes

libc = ctypes.CDLL("libc.so.6") # `libSystem.dylib` on macOS, `msvcrt` on Windows

print(libc.time(None))

cffi

cffi 是一个更高级的调用 C 库的模块,但需要通过 cdef 方法声明 C 函数的接口

import cffi

ffi = cffi.FFI()
ffi.cdef("""
int time(int *t);
""")

libc = ffi.dlopen("libc.so.6")

print(libc.time(ffi.NULL))

但对于比较复杂的逻辑,更佳的方案是使用 Python/C API 编写 Python 模块,这样可以更好地进行内存管理和异常处理:

Python/C API

使用 C/C++ 编写的 Python/C API 模块

#include <Python.h>
#include <time.h>

static PyObject *time(PyObject *self, PyObject *args) {
time_t t;
time(&t);
return PyLong_FromLong(t);
}

static PyMethodDef methods[] = {
{"time", time, METH_VARARGS, "Return the current time"},
{NULL, NULL, 0, NULL}
};

static struct PyModuleDef module = {
PyModuleDef_HEAD_INIT,
"libc",
NULL,
-1,
methods
};

PyMODINIT_FUNC PyInit_libc(void) {
return PyModule_Create(&module);
}

使用 g++ 编译后

g++ -shared -fPIC libc.c -o libc.so $(python3-config --includes) -L$(python3-config --prefix)/lib -lpython3.10

得到的 libc.so 文件可以直接被 Python 解释器加载调用

import libc

print(libc.time())

企业微信 Python SDK

企业微信 C SDK 具体逻辑可以参考官方文档 ,本文基于其 Linux-v1.1 版本进行封装

引入 Python 库与企微 SDK 头文件

#define PY_SSIZE_T_CLEAN

#include <Python.h>
#include <structmember.h>
#include <string>
#include "WeWorkFinanceSdk_C.h"

auto METH_FLAG = METH_VARARGS | METH_KEYWORDS;

封装 SDK 对象

typedef struct {
PyObject_HEAD
WeWorkFinanceSdk_t *sdkInstance;
const char *proxy, *proxyAuth;
} SDKObject;


static void sdk_dealloc(SDKObject *self) {
DestroySdk(self->sdkInstance);
Py_TYPE(self)->tp_free((PyObject *) self);
}

static int sdk_init(SDKObject *self, PyObject *args, PyObject *kwargs) {

const char *corpId, *secret;
const char *kwlist[] = {"corp_id", "secret", "proxy", "proxy_auth", nullptr};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "ss|ss", const_cast<char **>(kwlist),
&corpId, &secret,
&self->proxy, &self->proxyAuth)) {
PyErr_Format(PyExc_RuntimeError, "invalid params");
return -1;
}
self->sdkInstance = NewSdk();
int ret = Init(self->sdkInstance, corpId, secret);
if (ret != 0) {
DestroySdk(self->sdkInstance);
PyErr_Format(PyExc_RuntimeError, "init sdk error: %d", ret);
return -1;
}

return 0;
}

封装获取聊天记录接口

static PyObject *sdk_getChatData(SDKObject *self, PyObject *args, PyObject *kwargs) {
uint64_t seq = 0, limit = 10;
int timeout = 5;
const char *kwlist[] = {"seq", "limit", "timeout", nullptr};

if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "ii|i", const_cast<char **>(kwlist),
&seq, &limit, self->proxy, self->proxyAuth, &timeout)) {
return nullptr;
}

auto *chatData = NewSlice();

int ret = GetChatData(
self->sdkInstance,
seq, limit,
self->proxy, self->proxyAuth,
timeout,
chatData
);

if (ret != 0) {
PyErr_Format(PyExc_RuntimeError, "get chat data error: %d", ret);
return nullptr;
}

auto res = Py_BuildValue("s", chatData->buf);
FreeSlice(chatData);

return res;
}

封装获取媒体消息接口

static PyObject *sdk_getMediaFile(SDKObject *self, PyObject *args, PyObject *kwargs) {

char *fileId, *filename;
int timeout = 10;
const char *kwlist[] = {"file_id", "save_to", "timeout", nullptr};

if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "ss|i", const_cast<char **>(kwlist),
&fileId, &filename, &timeout)) {
return nullptr;
}

std::string index;
bool isFinish = false;

while (!isFinish) {
auto mediaData = NewMediaData();
int ret = GetMediaData(
self->sdkInstance,
index.c_str(),
fileId,
self->proxy, self->proxyAuth,
timeout,
mediaData
);

if (ret != 0) {
FreeMediaData(mediaData);
PyErr_Format(PyExc_RuntimeError, "get media data error: %d", ret);
return nullptr;
}

FILE *fp = fopen(filename, "ab+");
if (fp == nullptr) {
FreeMediaData(mediaData);
PyErr_Format(PyExc_RuntimeError, "cannot open file %s", filename);
return nullptr;
}

fwrite(mediaData->data, mediaData->data_len, 1, fp);
fclose(fp);

index.assign(std::string(mediaData->outindexbuf));
isFinish = mediaData->is_finish;
FreeMediaData(mediaData);
}

return Py_None;
}

封装解密聊天消息接口

static PyObject *sdk_decryptData(SDKObject *, PyObject *args, PyObject *kwargs) {
char *encryptRandomKey, *encryptMsg;
const char *kwlist[] = {"encrypt_random_key", "encrypt_msg", nullptr};

if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "ss", const_cast<char **>(kwlist),
&encryptRandomKey, &encryptMsg)) {
return nullptr;
}

auto data = NewSlice();
int ret = DecryptData(encryptRandomKey, encryptMsg, data);
if (ret != 0) {
PyErr_Format(PyExc_RuntimeError, "decrypt data error: %d", ret);
return nullptr;
}

auto res = Py_BuildValue("s", data->buf);
FreeSlice(data);

return res;
}

封装 Python 模块

static PyMethodDef sdk_methods[] = {
{"get_chat_data", (PyCFunction) sdk_getChatData, METH_FLAG, "Get chat data"},
{"get_media_file", (PyCFunction) sdk_getMediaFile, METH_FLAG, "Download media file"},
{"decrypt_data", (PyCFunction) sdk_decryptData, METH_FLAG, "Decrypt data"},
{nullptr, nullptr, 0, nullptr}
};

static PyTypeObject SDKType = {
PyVarObject_HEAD_INIT(nullptr, 0)
.tp_name="wecom.SDK",
.tp_basicsize=sizeof(SDKObject),
.tp_itemsize=0,
.tp_dealloc=(destructor) sdk_dealloc,
.tp_methods=sdk_methods,
.tp_init=(initproc) sdk_init,
.tp_new=PyType_GenericNew,
};


PyMethodDef method_table[] = {
{nullptr, nullptr, 0, nullptr}
};

PyModuleDef wecom_module = {
PyModuleDef_HEAD_INIT,
"wecom",
"Python wrap for WeCom C-SDK",
-1,
method_table,
};

#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection"
PyMODINIT_FUNC PyInit_wecom(void) {
PyObject *m;
if (PyType_Ready(&SDKType) < 0) {
return nullptr;
}

m = PyModule_Create(&wecom_module);
if (m == nullptr) {
return nullptr;
}

Py_INCREF(&SDKType);
if (PyModule_AddObject(m, "SDK", (PyObject *) &SDKType) < 0) {
Py_DECREF(&SDKType);
Py_DECREF(m);
return nullptr;
}

Py_Initialize();

return m;
}

编译为 Python 模块

setup.py 文件作为 Python 模块的编译配置文件,可以使用 setuptools 的自定义扩展来编译 C++ 代码为 Python 模块

import os
import sys
import subprocess
from setuptools import setup, Extension
from setuptools.command.build_ext import build_ext


class CMakeExtension(Extension):
def __init__(self, name, cmake_lists_dir="."):
Extension.__init__(self, name, sources=[])
self.cmake_lists_dir = os.path.abspath(cmake_lists_dir)


class cmake_build_ext(build_ext):
def build_extensions(self) -> None:
try:
out = subprocess.check_output(["cmake", "--version"])
except OSError:
raise RuntimeError("Cannot find CMake executable")

for ext in self.extensions:
ext_dir = os.path.abspath(os.path.dirname(self.get_ext_fullpath(ext.name)))
cfg = "Debug" if self.debug else "Release"

cmake_args = [
"-DCMAKE_BUILD_TYPE=" + cfg,
"-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=" + ext_dir,
"-DCMAKE_ARCHIVE_OUTPUT_DIRECTORY=" + self.build_temp,
"-DPYTHON_EXECUTABLE=" + sys.executable,
]

if not os.path.exists(self.build_temp):
os.makedirs(self.build_temp)

subprocess.check_call(["cmake", ext.cmake_lists_dir] + cmake_args, cwd=self.build_temp)
subprocess.check_call(["cmake", "--build", ".", "--config", cfg], cwd=self.build_temp)


setup(
name="wecom-sdk",
version="1.0",
description="Python wrap for wecom C-sdk",
author="Fei Yu",
packages=["wecom"],
package_dir={"wecom": "./stub"},
package_data={"wecom": ["__init__.pyi"]},
ext_modules=[CMakeExtension("wecom")],
cmdclass={"build_ext": cmake_build_ext}
)

这里用了 CMake 作为编译工具,可以在 CMakeLists.txt 中配置编译选项

cmake_minimum_required(VERSION 3.18)
project(wecom-sdk)

set(CMAKE_CXX_STANDARD 14)


find_package(PythonInterp 3.10 REQUIRED)
find_package(PythonLibs 3.10 REQUIRED)


if (${CMAKE_SYSTEM_NAME} MATCHES Linux)
include_directories(lib/linux)
link_directories(lib/linux)
elseif (${CMAKE_SYSTEM_NAME} MATCHES Windows)
include_directories(lib/win)
link_directories(lib/win)
else ()
message(FATAL_ERROR "Unsupported Platform: " ${CMAKE_SYSTEM_NAME})
endif()
include_directories(${PYTHON_INCLUDE_PATH})


add_library(wecom MODULE src/wecom_module.cpp)

set_target_properties(
wecom
PROPERTIES
PREFIX ""
OUTPUT_NAME "wecom"
LINKER_LANGUAGE CXX
)

target_link_libraries(wecom ${PYTHON_LIBRARIES})
target_link_libraries(wecom WeWorkFinanceSdk_C)

为了让 IDE 能够正确识别封装的模块接口,可以在 __init__.pyi 文件中声明函数签名,并与主模块一同打包

class SDK:
"""
errcode参考
返回码 错误说明
0 ok
10000 参数错误,请求参数错误
10001 网络错误,网络请求错误
10002 数据解析失败
10003 系统失败
10004 密钥错误导致加密失败
10005 fileid错误
10006 解密失败
10007 找不到消息加密版本的私钥,需要重新传入私钥对
10008 解析encrypt_key出错
10009 ip非法
10010 数据过期
10011 证书错误
"""

def __init__(self, corp_id: str, secret: str, proxy: str = None, proxy_auth: str = None) -> None:
"""

:param corp_id: 企业id,可以在企业微信管理端--我的企业--企业信息查看 e.g. wwd08c8exxxx5ab44d
:param secret: 聊天内容存档的Secret,可以在企业微信管理端--管理工具--聊天内容存档查看
:param proxy: SDK访问的域名是"https://qyapi.weixin.qq.com",如网络环境无法直连该域名,可通过此参数配置代理 e.g. socks5://10.0.0.1:8081 或者 http://10.0.0.1:8081
:param proxy_auth: 代理账号密码 e.g. user_name:passwd_123
"""

def get_chat_data(self, seq: int, limit: int, timeout: int = 5) -> str:
"""
:param seq: 企业存档消息序号,该序号单调递增,拉取序号建议设置为上次拉取返回结果中最大序号。首次拉取时seq传0,sdk会返回有效期内最早的消息
:param limit: 本次拉取的最大消息条数,取值范围为1~1000
:param timeout: 拉取会话存档的超时时间,单位为秒
:return: 本次拉取消息的数据,JSON字符串,内容包括errcode/errmsg,以及每条消息内容
e.g.
{"errcode":0,"errmsg":"ok","chatdata":[{"seq":196,"msgid":"CAQQ2fbb4QUY0On2rYSAgAMgip/yzgs=","publickey_ver":3,"encrypt_random_key":"ftJ+uz3n/z1DsxlkwxNgE+mL38H42/KCvN8T60gbbtPD+Rta1hKTuQPzUzO6Hzne97MgKs7FfdDxDck/v8cDT6gUVjA2tZ/M7euSD0L66opJ/IUeBtpAtvgVSD5qhlaQjvfKJc/zPMGNK2xCLFYqwmQBZXbNT7uA69Fflm512nZKW/piK2RKdYJhRyvQnA1ISxK097sp9WlEgDg250fM5tgwMjujdzr7ehK6gtVBUFldNSJS7ndtIf6aSBfaLktZgwHZ57ONewWq8GJe7WwQf1hwcDbCh7YMG8nsweEwhDfUz+u8rz9an+0lgrYMZFRHnmzjgmLwrR7B/32Qxqd79A==","encrypt_chat_msg":"898WSfGMnIeytTsea7Rc0WsOocs0bIAerF6de0v2cFwqo9uOxrW9wYe5rCjCHHH5bDrNvLxBE/xOoFfcwOTYX0HQxTJaH0ES9OHDZ61p8gcbfGdJKnq2UU4tAEgGb8H+Q9n8syRXIjaI3KuVCqGIi4QGHFmxWenPFfjF/vRuPd0EpzUNwmqfUxLBWLpGhv+dLnqiEOBW41Zdc0OO0St6E+JeIeHlRZAR+E13Isv9eS09xNbF0qQXWIyNUi+ucLr5VuZnPGXBrSfvwX8f0QebTwpy1tT2zvQiMM2MBugKH6NuMzzuvEsXeD+6+3VRqL"}]}
{"errcode":41001,"errmsg":"access_token missing, hint: [1684401769_399_465504c915f130b50294a4abe4eb82d0], from ip: 117.131.109.99, more info at https://open.work.weixin.qq.com/devtool/query?e=41001","chatdata":[]}
"""

def get_media_file(self, file_id: str, save_to: str, timeout: int = 10) -> None:
"""
:param file_id: 媒体文件id,从解密后的会话消息中得到
:param save_to: 媒体文件保存路径
:param timeout: 拉取媒体文件的超时时间,单位为秒
:return: None
"""

def decrypt_data(self, encrypt_random_key: str, encrypt_msg: str) -> str:
"""
:param encrypt_random_key: 拉取会话存档返回的encrypt_random_key,使用配置在企业微信管理后台的rsa公钥对应的私钥解密后得到encrypt_key
:param encrypt_msg: 拉取会话存档返回的encrypt_chat_msg
:return: 解密的消息明文,JSON字符串,消息格式参考 https://developer.work.weixin.qq.com/document/path/91774
"""

Python 中调用 SDK

初始化 SDK

CORP_ID/SECRET 从企业微信管理后台获取

import wecom

sdk = wecom.SDK("CORP_ID", "SECRET")

# use proxy
sdk = wecom.SDK("CORP_ID", "SECRET", proxy="http://10.0.0.1:8081", proxy_auth="username:password")

获取聊天对话数据

data = sdk.get_chat_data(seq=0, limit=1) # JSON format
data = json.loads(data)

# sample data:
# {
# "errcode": 0,
# "errmsg": "ok",
# "chatdata": [
# {
# "seq": 0,
# "msgid": "CAQ...gs=,
# "publickey_ver": 3,
# "encrypt_random_key": "ftJ...A==",
# "encrypt_chat_msg": "898...RqL"
# }
# ]
# }

解密聊天消息

使用前需要生成 RSA 密钥对,并在企业微信管理后台配置公钥

SDK 获取的聊天消息被随机密钥加密,随机密钥被配置的公钥加密

解密时需要先通过 RSA 私钥解密随机密钥,再调用 SDK 使用随机密钥解密聊天消息

from base64 import b64decode
from Crypto.PublicKey import RSA # pycryptodome
from Crypto.Cipher import PKCS1_v1_5

# load RSA private key
rsa_key = RSA.import_key(open("private.key", "r").read())
cipher = PKCS1_v1_5.new(rsa_key)

chat_data = data["chatdata"][0]

# decrypt random key
random_key = chat_data["encrypt_random_key"]
random_key = b64decode(random_key)
random_key = cipher.decrypt(random_key, b'')
random_key = str(random_key, encoding="utf-8")

encrypt_chat_msg = chat_data["encrypt_chat_msg"]

# decrypt chat message
chat_msg = sdk.decrypt_data(encrypt_random_key=random_key, encrypt_msg=encrypt_chat_msg) # JSON format
chat_msg = json.loads(chat_msg)

# Sample data:
# {
# "msgid": "CAQ...AE=",
# "action": "send",
# "from": "XuJinSheng",
# "tolist": ["icefog"],
# "roomid": "",
# "msgtime": 1547087894783,
# "msgtype": "text",
# "text": {"content": "test"}
# }

获取媒体文件

sdk_file_id = chat_msg["file"]["sdkfileid"] # 
sdk.get_media_file(file_id=sdk_file_id, save_to="/path/to/file")

手动释放 SDK 占用内存

del sdk

完整项目代码已上传至 GitHub