RPA+OpenCV实现连连看游戏自动化

Fei Yu

看到身边有人沉迷连连看微信小游戏,经典的拉高难度 + 看广告获取道具的套路,想过一关得要看个好多次 30s 广告

于是想验证一下关卡的砖块布局是否有可行的解法,如果有的话就通过 RPA (Robotic process automation) 自动化操作来通关

gif

架构设计

用到的库主要是以下两个:

此外还涉及到 numpy/pillow/matplotlib/scipy 这些数据与图像处理的库就不一一介绍了

环境为 Windows + Python3.10 + PC端微信小游戏

具体实现三个核心模块:

  • CV 类调用 opencv 中的算法实现砖块的识别,生成游戏场景的对应的数据结构
  • Board 类实现游戏场景的逻辑,并通过算法寻找可行的解法
  • Operator 类实现与游戏窗口的交互,并调用上述两个类的方法生成操作序列来完成最终的自动化通关

实现流程

定位工具

由于后续的截图与点击操作都是基于屏幕坐标的,所以首先实现一个简单的定位工具,用于获取游戏窗口内特定位置坐标信息

import cv2
import numpy as np
import pyautogui as gui


class StopException(Exception):
pass


class Operator(object):
window_name = "xxx"

def __init__(self):
self.window = gui.getWindowsWithTitle(self.window_name)[0]
self.base_position = self.window.topleft
self.window_size = self.window.size

def run(self):
raise StopException

def locate_pointer(self):
self.window.activate()
img = gui.screenshot(region=(
self.base_position.x,
self.base_position.y,
self.window_size.width,
self.window_size.height
))
img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)

def on_mouse_click(event, x, y, _flags, _param):
if event == cv2.EVENT_LBUTTONDOWN:
print(x, y)

cv2.imshow("img", img)
cv2.setMouseCallback("img", on_mouse_click)
cv2.waitKey(0)
cv2.destroyAllWindows()


if __name__ == "__main__":
op = Operator()
try:
op.run()
except (gui.FailSafeException, StopException):
op.locate_pointer()
pass

通过在需要获取坐标定位的时候可以抛出自定义的 StopException 异常,通过在弹出的窗口中点击模板位置获取相对坐标

pyautogui 库的 screenshot 方法返回的是 PILImage 对象,在传递给 opencv 的方法前需要先进行转换

游戏场景识别

有了游戏窗口内的坐标信息,就可以完成游戏场景的识别了,下面实现 CV 类将游戏场景截图转换为对应的数据结构

  1. 计算砖块图像单元尺寸
  2. 切分图像单元
  3. 计算图像单元的图像特征,通过特征相似度来判断是否为同一种砖块,并赋予类型 id
import time


class CV(object):
rows = 14
cols = 8

def __init__(self, board_size):
self.board_size = board_size

self.cell_size = None
self.cell_padding = None
self.cells = None # image cells
self.grid = None # pattern id grid

self.solve_size(board_size)

def analyze(self, img):
self.get_cells(img)
self.get_grid()

def solve_size(self):
"""calculate cell size and padding"""
...

def get_cells(self, img):
"""split image into cells"""
...

def get_grid(self):
"""get pattern id from cells"""
...


class Operator(object):
...

board_position = (74, 168)
board_size = (468, 812)

def __init__(self):
...
self.cv = CV(self.board_size)

def run(self):
...
self.window.activate()
self.delay()
img_board = gui.screenshot(
region=(
self.base_position[0] + self.board_position[0],
self.base_position[1] + self.board_position[1],
self.board_size[0],
self.board_size[1]
)
)
self.cv.analyze(img_board)

@staticmethod
def delay(t=0.5):
time.sleep(t)

在截图前先激活游戏窗口并延迟一段时间,以保证截图不会被窗口切换等操作影响

计算单元格尺寸

通过游戏场景的画面尺寸计算出单元格的尺寸,以及单元格之间的间距,便于后续切分图像

这里简单的使用 scipy 库求解一个二元一次方程组即可

import scipy.optimize as opt


class CV(object):
...

def solve_size(self):
"""calculate cell size and padding"""

width, height = self.board_size

def equation(x):
cell_size, cell_padding = x
eq_1 = self.cols * cell_size + (self.cols - 1) * cell_padding - width
eq_2 = self.rows * cell_size + (self.rows - 1) * cell_padding - height
return [eq_1, eq_2]

x0 = np.array([width / self.cols, 0])

solution = opt.root(equation, x0)
self.cell_size, self.cell_padding = solution.x

切分单元格

使用 pillow 库的 Image.crop 方法切分图像,这里为了避免边缘的干扰,对每个单元格还进行了一定的裁剪

class CV(object):
...

crop = 10

def get_cells(self, img):
"""split image into cells"""

cells = []
for i in range(self.rows):
row = []
for j in range(self.cols):
x = int(j * (self.cell_size + self.cell_padding))
y = int(i * (self.cell_size + self.cell_padding))
cell = img.crop(
(x + self.crop, y + self.crop, x + self.cell_size - self.crop, y + self.cell_size - self.crop))
row.append(cell)
cells.append(row)

self.cells = cells

匹配单元格图案

通过计算单元格的图像特征,通过相似度来判断是否为同一种砖块,并赋予类型 id

由于这里的图案相似度较高,所以直接使用直方图相似度即可:判断两个图片中像素值的分布是否相似

class CV(object):
...

threshold = 0.85
_feature_cache = {}

def get_grid(self):
"""get pattern id from cells"""

grid = [[None for _ in range(self.cols)] for _ in range(self.rows)]
idx = 0
for i in range(self.rows):
for j in range(self.cols):
if grid[i][j] is not None:
continue
grid[i][j] = idx
for k in range(i, self.rows):
for l in range(self.cols):
if grid[k][l] is not None:
continue
sim = self.calc_hist_similarity(self.cells[i][j], self.cells[k][l])
if sim > self.threshold:
grid[k][l] = grid[i][j]
idx += 1
self.grid = grid

def calc_hist_similarity(self, img1, img2):
hist1 = self.hist_feature(img1)
hist2 = self.hist_feature(img2)
similarity = cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)
return similarity

def hist_feature(self, img):
key = hash(img.tobytes())
if key in self._feature_cache:
return self._feature_cache[key]

img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
hist_b = cv2.calcHist([img], [0], None, [64], [0, 256])
hist_g = cv2.calcHist([img], [1], None, [64], [0, 256])
hist_r = cv2.calcHist([img], [2], None, [64], [0, 256])
hist = np.concatenate((hist_b, hist_g, hist_r))
hist[np.argsort(hist, axis=0)[-3:]] = 0
hist = cv2.normalize(hist, hist).flatten()
self._feature_cache[key] = hist
return hist

这里有几个需要注意的地方:

  1. 为了避免重复计算,使用了一个缓存字典来存储图像的特征
  2. 为了避免图像中的噪声干扰,计算直方图特征时 bin 的数量设置为 64,即将原本为 256 颜色空间划分为 64 个区间,计算每个区间的像素数量
  3. 为了加快计算速度,分别计算了 RGB 三个通道的直方图特征后拼接在一起,这样特征维度是 64*3=192;如果同时计算三个通道,特征维度就是 64*64*64=262144,计算量会大大增加
  4. 为了提高不同图案的区分度,将直方图中像素数量最多的三个区间的像素数量置为 0,降低相同的背景颜色带来的影响
  5. 为了便于使用阈值判断相似度,对特征分布进行了归一化处理,并使用了 cv2.HISTCMP_CORREL 即相关系数作为相似度的计算方法,0 表示完全不相关,1 表示完全相关

比如其中一个图像单元与对应的直方图特征为:

cell hist

可视化

为了方便调试,可以将识别结果可视化输出,检查是否正确识别了砖块的图案

class CV(object):

def show_grid(self):
"""recreate image from grid with pattern id"""

img = Image.new('RGB', (self.board_size[0], self.board_size[1]))
for i in range(self.rows):
for j in range(self.cols):
x = int(j * (self.cell_size + self.cell_padding) + self.crop)
y = int(i * (self.cell_size + self.cell_padding) + self.crop)
img.paste(self.cells[i][j], (x, y))

img = np.array(img)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
for i in range(self.rows):
for j in range(self.cols):
x = int(j * (self.cell_size + self.cell_padding))
y = int(i * (self.cell_size + self.cell_padding))
cv2.putText(
img, str(self.grid[i][j]), (x + 30, y + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2
)
cv2.imshow("Grid", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
grid

游戏逻辑模拟

有了游戏场景的数据结构,就可以实现游戏逻辑的模拟了,下面实现 Board

  1. 通过矩阵操作模拟砖块的消除与移动
  2. 通过广度优先搜索来寻找符合消除条件的砖块
  3. 通过深度优先搜索来寻找可行的解法
class Board(object):
def __init__(self, grid):
self.grid = np.array(grid, dtype=np.object_)
self.rows, self.cols = self.grid.shape

def make_move(self, x1, y1, x2, y2):
"""simulate move"""
...

def available_moves(self):
"""find current available moves"""
...

def solve(self):
"""find solution"""
...

这里的 grid 是一个 numpy 二维数组,每个元素是一个砖块的类型 id,空缺处为 None

模拟消除

大部分关卡在消除砖块后会向某一方向移动填充空缺,这里加入了额外的 gravity 参数来控制这一过程

另外为了便于后续的搜索过程,需要记录每一步的操作序列以便回退

class Board(object):
...

def __init__(self, grid, gravity=None):
...

self.gravity = gravity
self.grid_records = [self.grid.copy()]
self.move_records = [] # (x1, y1, x2, y2, idx)

def make_move(self, x1, y1, x2, y2):
new_grid = self.grid.copy()
new_grid[x1, y1] = None
new_grid[x2, y2] = None
idx = self.grid[x1, y1]
self.grid = self.apply_gravity(new_grid)
self.grid_records.append(self.grid.copy())
self.move_records.append((x1, y1, x2, y2, idx))

def undo_move(self):
if len(self.grid_records) > 1:
self.grid_records.pop()
self.grid = self.grid_records[-1]
self.move_records.pop()

def apply_gravity(self, grid):
if self.gravity is None:
return grid

if self.gravity == "up":
grid = np.flipud(grid)
elif self.gravity == "left":
grid = np.rot90(grid, k=1)
elif self.gravity == "right":
grid = np.rot90(grid, k=-1)

new_grid = np.full_like(grid, None)
for j in range(new_grid.shape[1]):
stack = grid[:, j][grid[:, j] != None]
if stack.size == 0:
continue
new_grid[-len(stack):, j] = stack

if self.gravity == "up":
new_grid = np.flipud(new_grid)
elif self.gravity == "left":
new_grid = np.rot90(new_grid, k=-1)
elif self.gravity == "right":
new_grid = np.rot90(new_grid, k=1)

return new_grid

apply_gravity 方法中,通过 numpyflipud/rot90 函数来实现矩阵的翻转与旋转,以简化不同方向的填充操作

寻找可消除砖块

通过广度优先搜索来寻找符合消除条件的砖块,根据游戏规则,两个砖块之间的路径不能有超过两个拐角

from collections import deque


class Board(object):
...

def __init__(self, *args, **kwargs):
...

self.search_order = {
"down": [(i, j) for i in range(self.rows) for j in range(self.cols)],
"up": [(i, j) for i in range(self.rows - 1, -1, -1) for j in range(self.cols)],
"right": [(i, j) for j in range(self.cols) for i in range(self.rows)],
"left": [(i, j) for j in range(self.cols - 1, -1, -1) for i in range(self.rows)]
}

def available_moves(self):
moves = []
if self.gravity is None:
search_direction = "down"
else:
search_direction = self.gravity

for i, j in self.search_order[search_direction]:
if self.grid[i, j] is None:
continue
for k, l in self.search_order[search_direction]:
if self.grid[k, l] is None:
continue
if self.is_valid_move(i, j, k, l):
moves.append((i, j, k, l))
return moves

def is_valid_move(self, x1, y1, x2, y2):
if (x1 == x2 and y1 == y2) or (self.grid[x1, y1] != self.grid[x2, y2]):
return False

padded_grid = np.pad(
self.grid,
pad_width=1,
mode="constant",
constant_values=np.object_(None)
)
x1, y1, x2, y2 = x1 + 1, y1 + 1, x2 + 1, y2 + 1

def is_valid(x, y):
return (
0 <= x < padded_grid.shape[0]
and 0 <= y < padded_grid.shape[1]
and padded_grid[x, y] is None
)

def bfs(start, end):
queue = deque([(start, -1, -1)]) # (x, y), turns, direction
visited = set()

while queue:
(x, y), turns, direction = queue.popleft()
if (x, y) == end:
return True

for d, (dx, dy) in enumerate([(0, 1), (1, 0), (0, -1), (-1, 0)]):
nx, ny = x + dx, y + dy
if (is_valid(nx, ny) or (nx, ny) == end) and ((nx, ny, d) not in visited):
new_turns = turns if direction == d else turns + 1
if new_turns <= 2:
queue.append(((nx, ny), new_turns, d))
visited.add((nx, ny, d))
return False

return bfs((x1, y1), (x2, y2))

这里的几个需要注意的地方:

  1. 连接砖块的路径可以在外围,因此寻找路径时需要在原有矩阵的基础上进行扩展,这里使用了 numpypad 函数在矩阵的外围填充了一圈 None
  2. 搜索可消除砖块时优先选择与填充方向相反的砖块,可以更快的找到可行的解法:填充可能会导致原本可消除的砖块不再可消除,因此优先消除对砖块布局影响较小的砖块
  3. 广度优先搜索时需要记录路径的拐角次数,由于开始时可向任意方向移动,因此起点拐角次数初始化为 -1

寻找可行解法

class LimitExceededException(Exception):
pass


class Board(object):
...

fail_limit = 200

def is_win(self):
return np.all(self.grid == None)

def solve(self):
fails = 0
max_depth_result = []

def dfs(depth=0):
if self.is_win():
return True
moves = self.available_moves()
if not moves:
nonlocal fails, max_depth_result
fails += 1
if len(self.move_records) > len(max_depth_result):
max_depth_result = self.move_records.copy()
if fails > self.fail_limit:
raise LimitExceededException("No solution found")
return False
for idx, move in enumerate(moves):
self.make_move(*move)
if dfs(depth + 1):
return True
self.undo_move()
return False

try:
return dfs()
except LimitExceededException as e:
self.move_records = max_depth_result
return False

普通的深度优先搜索,当搜索到不可再消除时回退到上一步

由于搜索的深度很大,因此超过一定失败次数后直接停止搜索,返回能达到最大深度的解法

自动化操作

有了操作的序列,就可以实现自动化操作了,在 Operator 类中进行具体的模拟操作

添加了一个弹窗在开始时选择重力方向,以便后续模拟计算

class Operator(object):
...

def click_cell(self, i, j):
"""translate matrix coordinate to screen coordinate and click"""

base_x, base_y = np.add(self.base_position, self.board_position)
click_x = base_x + j * (self.cv.cell_size + self.cv.cell_padding) + self.cv.cell_size // 2
click_y = base_y + i * (self.cv.cell_size + self.cv.cell_padding) + self.cv.cell_size // 2
gui.click(click_x, click_y)

def run(self):
...
# pop up gravity selection dialog
gravity = gui.confirm(
"Please select the gravity direction",
buttons=["none", "up", "down", "left", "right"]
)
if gravity == "none":
gravity = None

self.window.activate()
self.delay()
img_board = gui.screenshot(
region=(
self.base_position[0] + self.board_position[0],
self.base_position[1] + self.board_position[1],
self.board_size[0],
self.board_size[1]
)
)
self.cv.analyze(img_board)
board = Board(self.cv.grid, gravity)
if board.solve():
for x1, y1, x2, y2, idx in board.move_records:
self.click_cell(x1, y1)
self.delay(0.2)
self.click_cell(x2, y2)
self.delay()
else:
print("No solution found")

那么问题出现了:

  1. 场景中有些宝箱砖块,消除后会弹出看广告获取道具的界面,这时候需要额外的操作来关闭这个界面
grid
  1. 当有填充方向时,布局极大概率是无解的,这时候需要通过点击看广告获取刷新道具来打乱布局
grid

处理宝箱砖块

这里使用另一种图像特征算法来识别宝箱砖块:SIFT(Scale-Invariant Feature Transform) 尺度不变特征变换

class CV(object):
...

def locate_cell(self, img):
max_sim = 0
max_idx = None
for i in range(self.rows):
for j in range(self.cols):
sim = self.calc_sift_similarity(img, self.cells[i][j])
if sim > max_sim:
max_sim = sim
max_idx = (i, j)
return max_idx

@staticmethod
def calc_sift_similarity(img1, img2):
img1 = cv2.cvtColor(np.array(img1), cv2.COLOR_RGB2GRAY)
img2 = cv2.cvtColor(np.array(img2), cv2.COLOR_RGB2GRAY)

sift = cv2.SIFT_create()
kp1, des1 = sift.detectAndCompute(img1, None)
kp2, des2 = sift.detectAndCompute(img2, None)
if des1 is None or des2 is None:
return 0

bf = cv2.BFMatcher()
matches = bf.knnMatch(des1, des2, k=2)

if len(matches[0]) < 2:
return 0

good = []
for m, n in matches:
if m.distance < 0.75 * n.distance:
good.append([m])

return len(good)

提供一张宝箱砖块的图片,通过计算与每个单元图片间 SIFT 算子的匹配度,选择相似度最高的单元返回其坐标

在序列操作时消除宝箱砖块后额外点击指定位置关闭弹窗即可

class Operator(object):
button_positions = {
"deny": (310, 840),
}

def click_button(self, name):
x, y = self.button_positions[name]
gui.click(self.base_position.x + x, self.base_position.y + y)

def run(self):
...

img_chest = Image.open("chest.png")
i, j = self.cv.locate_cell(img_chest)
idx_chest = self.cv.grid[i][j]

...

for x1, y1, x2, y2, idx in board.move_records:
self.click_cell(x1, y1)
self.delay(0.2)
self.click_cell(x2, y2)
if idx_chest == idx:
self.delay()
self.click_button("deny")
self.delay()

刷新布局

当布局无解时,在尽可能消除砖块后,自动点击刷新道具看广告后,重新识别布局求解消除路径,直至全部消除

class Board(obejct):

...

def __init__(self, grid, gravity=None, mask=None):
self.grid = np.array(grid, dtype=np.object_)
if mask is not None:
self.grid[mask] = None

...

class Operator(object):
...

button_positions = {
"accept": (310, 760),
"close": (580, 67),
"refresh": (535, 925),
}

def run(self):
...

grid_mask = None

while True:
self.window.activate()
self.delay()
img_board = gui.screenshot(
region=(
self.base_position[0] + self.board_position[0],
self.base_position[1] + self.board_position[1],
self.board_size[0],
self.board_size[1]
)
)
self.cv.analyze(img_board)

...

board = Board(self.cv.grid, gravity, grid_mask)
board.solve()

self.window.activate()
self.delay(1)
for x1, y1, x2, y2, idx in board.move_records:
self.click_cell(x1, y1)
self.delay(0.2)
self.click_cell(x2, y2)
if idx_chest == idx:
self.delay()
self.click_button("deny")
self.delay()

if board.is_win():
break
else:
self.click_button("refresh")
self.delay(1)
self.click_button("accept")
self.delay(35)
self.click_button("close")
self.delay(3)

grid_mask = board.grid == None

这里打乱布局后砖块位置保持不变但图案改变,因此在循环中通过 mask 参数来过滤掉已经消除的砖块,

Game Over

比起直接玩游戏,写一个自动化脚本来通关可能更有意思也更有意义一些

就这个游戏本身来说,在自动生成布局时特意不保证可解性,往往一个关卡需要看 3-5 次广告才能通关,可见这类游戏的背后盈利模式

完整代码见 GitHub