在 PyWebIO 中使用验证码

本文最后更新于:3 个月前

Demo

由于 iframevaptcha 的限制,移动端可能无法验证。

验证流程

受 PyWebIO 实现方式的限制,前端没有能向后端隐式发送自定义信息的功能。
一种解决方式是利用 Cookie 传输信息。

  1. 用户触发验证,后端给用户设置一个加密 Cookie 并展示验证界面;
  2. 用户执行验证结束,将结果 post 到验证服务器;
  3. 验证服务器进行二次验证,决定是否通过验证并以 Cookie 的形式返回验证结果;
  4. 后端循环获取 Cookie。当验证服务器返回 Cookie 后,获取 Cookie 并解密,决定是否通过验证。

验证码平台

这里使用了 vaptcha,其他平台的接入方式自行参考对应的文档。

参考 vaptcha文档,获取 VIDKey

本文中与 PyWebIO 有关的代码默认导入了以下包

1
2
3
from pywebio.input import *
from pywebio.output import *
from pywebio.session import *

如果你对PyWebIO不太熟悉,建议先看看 PyWebIO文档

vaptcha 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def init_vaptcha():
# 引入 vaptcha 的 js
run_js(r"""$('head').append('<script src="https://v-cn.vaptcha.com/v3.js"></script>')""")

def validate():
# vaptcha 初始化并调用验证
run_js(r"""
vaptcha({
vid: 'enter your vid', // 填写刚才获取的 VID
type: 'invisible', // 隐藏式验证码
scene: 1, // 场景
area: 'cn' // 地区
}).then(function (vaptchaObj) {
vobj = vaptchaObj
vaptchaObj.listen('pass', function () {
serverToken=vaptchaObj.getServerToken()
var data = {
server: serverToken.server,
token: serverToken.token,
scene: 1 // 场景,需要和上面的一致
}
$.ajaxSetup({ // 跨域 post 设置
crossDomain: true,
xhrFields: {
withCredentials: true
}
});
$.post('https://example.com/endpoint/', data) // post到验证服务器,修改成你的地址。
})
vaptchaObj.listen('close', function () {
vaptchaObj.reset()
})
vobj.validate()
})""")

在网页初始化时调用 init_vaptcha() 引入js,在需要验证的时候调用 validate()
在验证的时候才创建vaptcha对象是因为引入js需要加载,如果在引入的同时创建对象有可能创建失败。

PyWebIO 没有提供直接获取 cookie 的方式,所以需要一些工具代码。

cookie.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pywebio.session import run_js, eval_js

# 设置 Cookie
def set_cookie(key, value="", expire=-1):
run_js("setCookie(key, value, sec)", key=key, value=value, sec=expire)

# 获取 Cookie
def get_cookie(key):
return eval_js("getCookie(key)", key=key)

# SameSite, Secure, domain 用于跨域请求,domain注意替换成自己的域名
# 该函数应在网页初始化时调用
def init_cookie():
run_js(r"""
window.setCookie = function(name,value,sec) {
var expires = "";
if (sec) {
var date = new Date();
date.setTime(date.getTime() + (sec*1000));
expires = "; expires=" + date.toUTCString();
}
document.cookie = name + "=" + (value || "") + expires + "; path=/; SameSite=None; Secure; domain=.lo-li.cc";
}
window.getCookie = function(name) {
var nameEQ = name + "=";
var ca = document.cookie.split(';');
for(var i=0;i < ca.length;i++) {
var c = ca[i];
while (c.charAt(0)==' ') c = c.substring(1,c.length);
if (c.indexOf(nameEQ) == 0) return c.substring(nameEQ.length,c.length);
}
return null;
}""")

AES加密

用到的库是 pycryptodome
代码从不知道哪里复制粘贴的,反正能用(

aes.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import base64

from Crypto import Random
from Crypto.Cipher import AES


# AESCoder
class AESCoder:
def __init__(self, key):
self.key = key
# 数据块的大小 16位
self.BS = 16
# CBC模式 相对安全 因为有偏移向量 iv 也是16位字节的
self.mode = AES.MODE_CBC
# 填充函数 因为AES加密是一段一段加密的 每段都是BS位字节,不够的话是需要自己填充的
self.pad = lambda s: s + (self.BS - len(s.encode()) % self.BS)*chr(self.BS - len(s.encode()) % self.BS)
# 将填充的数据剔除
self.unpad = lambda s: s[:-ord(s[len(s) - 1:])]

def encrypt(self, raw):
raw = self.pad(raw).encode()
# 随机获取iv
iv = Random.new().read(AES.block_size)
# 定义初始化
cipher = AES.new(self.key, self.mode, iv)
# 此处是将密文和iv一起 base64 解密的时候就可以根据这个iv来解密
return base64.b64encode(iv + cipher.encrypt(raw)).decode()

def decrypt(self, enc):
# 先将密文进行base64解码
enc = base64.b64decode(enc)
# 取出iv值
iv = enc[:self.BS]
# 初始化自定义
cipher = AES.new(self.key, self.mode, iv)
# 返回utf8格式的数据
return self.unpad(cipher.decrypt(enc[self.BS:])).decode()

# 直接实例化方便我调用了(
# 参数是 bytes 类,aes加密密钥
# 密钥长度必须为8、12或16位
aes = AESCoder(b'123456789012')

接下来两节中,验证函数和验证服务器都调用了这个模块,两处的aes密钥要相同。

验证函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from aes import aes
from cookie import get_cookie, set_cookie

@use_scope("vaptcha", clear=True)
def vaptcha(timeout=60):
# 生成随机字符
validate_key = ''.join(random.sample(string.ascii_lowercase + string.digits, 4))
# aes加密,设置名为 validate 的 cookie
set_cookie("validate", aes.encrypt(validate_key), timeout + 30)
# 放个按钮,以防万一
put_buttons(["进行验证"], onclick=[validate])
# 自动调用验证
validate()
start_time = time.time()
time.sleep(3)
while True:
# 循环获取名为 validated 的 cookie
key = get_cookie("validated")
if key:
# 解密,验证
validated_key = aes.decrypt(key)
if validated_key == validate_key + "/pass":
clear()
set_cookie("validated")
return True
else:
put_error("验证失败:{0},请重试。".format(validated_key.split("/")[1]), closable=True)
set_cookie("validated")
else:
time.sleep(1)
if time.time() - start_time > timeout:
clear()
return False

搭建验证服务器

使用 Flask 搭建验证服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import requests
from flask import Flask, make_response, request
from aes import aes

app = Flask(__name__)

@app.route('/', methods=['POST'])
def vaptcha_validate():
server = request.form.get("server")
token = request.form.get("token")
scene = request.form.get("scene")
# 获取原始 cookie
cookie = request.cookies.get("validate")
# 获取用户真实ip,我使用了nginx反向代理,所以需要从headers中获取真实ip。
# 随便填一个ip好像也行(
ip = request.headers.get("X-Forwarded-For").split(",")[0]

if not (server or token):
return {"code":501, "msg":"no token or server"}

data = dict(
id = "填写 VID",
secretkey = "填写 Key",
scene = scene,
token = token,
ip = ip
)
result = requests.post(server, data=data).json()

if result["success"] == 1:
try:
# 解密 cookie
validate_key = aes.decrypt(cookie)
# 给 cookie 加个pass字符串代表通过,然后加密
cookie = aes.encrypt('/'.join([validate_key, "pass"]))
resp = make_response({code: 200})
# 设置新 cookie,并擦掉旧 cookie
# secure, samesite, domain 设置用于跨域请求,域名替换成自己的。
resp.set_cookie("validated", cookie, secure=True, samesite="None", domain=".example.com")
resp.set_cookie("validate", "", secure=True, samesite="None", domain=".example.com")
except:
resp = make_response({code: 500})
print(f"Fail to decode cookie: {cookie}, ip: {ip}")

print(f"Receive a validation from {ip}, msg:{result['msg']}, score:{result['score']}")

# 处理跨域请求
resp.headers["Access-Control-Allow-Origin"] = request.headers.get("Origin")
resp.headers["Access-Control-Allow-Credentials"] = "true"
return resp


if __name__ == '__main__':
# 开启服务,注意修改port
app.run(host="0.0.0.0", port=11001, debug=False)

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 协议 ,转载请注明出处!