本文最后更新于:1 分钟前
验证流程
受 PyWebIO 实现方式的限制,前端没有能向后端隐式发送自定义信息的功能。
一种解决方式是利用 Cookie
传输信息。
- 用户触发验证,后端给用户设置一个加密
Cookie
并展示验证界面;
- 用户执行验证结束,将结果 post 到验证服务器;
- 验证服务器进行二次验证,决定是否通过验证并以
Cookie
的形式返回验证结果;
- 后端循环获取
Cookie
。当验证服务器返回 Cookie
后,获取 Cookie
并解密,决定是否通过验证。
验证码平台
这里使用了 vaptcha,其他平台的接入方式自行参考对应的文档。
参考 vaptcha文档,获取 VID
和 Key
本文中与 PyWebIO
有关的代码默认导入了以下包
| from pywebio.input import * from pywebio.output import * from pywebio.session import *
|
如果你对PyWebIO不太熟悉,建议先看看 PyWebIO文档
vaptcha 初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| def init_vaptcha(): run_js(r"""$('head').append('<script src="https://v-cn.vaptcha.com/v3.js"></script>')""")
def validate(): run_js(r""" vaptcha({ vid: 'enter your vid', // 填写刚才获取的 VID type: 'invisible', // 隐藏式验证码 scene: 1, // 场景 area: 'cn' // 地区 }).then(function (vaptchaObj) { vobj = vaptchaObj vaptchaObj.listen('pass', function () { serverToken=vaptchaObj.getServerToken() var data = { server: serverToken.server, token: serverToken.token, scene: 1 // 场景,需要和上面的一致 } $.ajaxSetup({ // 跨域 post 设置 crossDomain: true, xhrFields: { withCredentials: true } }); $.post('https://example.com/endpoint/', data) // post到验证服务器,修改成你的地址。 }) vaptchaObj.listen('close', function () { vaptchaObj.reset() }) vobj.validate() })""")
|
在网页初始化时调用 init_vaptcha()
引入js,在需要验证的时候调用 validate()
。
在验证的时候才创建vaptcha对象是因为引入js需要加载,如果在引入的同时创建对象有可能创建失败。
cookie 操作
PyWebIO 没有提供直接获取 cookie 的方式,所以需要一些工具代码。
cookie.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| from pywebio.session import run_js, eval_js
def set_cookie(key, value="", expire=-1): run_js("setCookie(key, value, sec)", key=key, value=value, sec=expire)
def get_cookie(key): return eval_js("getCookie(key)", key=key)
def init_cookie(): run_js(r""" window.setCookie = function(name,value,sec) { var expires = ""; if (sec) { var date = new Date(); date.setTime(date.getTime() + (sec*1000)); expires = "; expires=" + date.toUTCString(); } document.cookie = name + "=" + (value || "") + expires + "; path=/; SameSite=None; Secure; domain=.lo-li.cc"; } window.getCookie = function(name) { var nameEQ = name + "="; var ca = document.cookie.split(';'); for(var i=0;i < ca.length;i++) { var c = ca[i]; while (c.charAt(0)==' ') c = c.substring(1,c.length); if (c.indexOf(nameEQ) == 0) return c.substring(nameEQ.length,c.length); } return null; }""")
|
AES加密
用到的库是 pycryptodome
代码从不知道哪里复制粘贴的,反正能用(
aes.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import base64
from Crypto import Random from Crypto.Cipher import AES
class AESCoder: def __init__(self, key): self.key = key self.BS = 16 self.mode = AES.MODE_CBC self.pad = lambda s: s + (self.BS - len(s.encode()) % self.BS)*chr(self.BS - len(s.encode()) % self.BS) self.unpad = lambda s: s[:-ord(s[len(s) - 1:])]
def encrypt(self, raw): raw = self.pad(raw).encode() iv = Random.new().read(AES.block_size) cipher = AES.new(self.key, self.mode, iv) return base64.b64encode(iv + cipher.encrypt(raw)).decode()
def decrypt(self, enc): enc = base64.b64decode(enc) iv = enc[:self.BS] cipher = AES.new(self.key, self.mode, iv) return self.unpad(cipher.decrypt(enc[self.BS:])).decode()
aes = AESCoder(b'123456789012')
|
接下来两节中,验证函数和验证服务器都调用了这个模块,两处的aes密钥要相同。
验证函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| from aes import aes from cookie import get_cookie, set_cookie
@use_scope("vaptcha", clear=True) def vaptcha(timeout=60): validate_key = ''.join(random.sample(string.ascii_lowercase + string.digits, 4)) set_cookie("validate", aes.encrypt(validate_key), timeout + 30) put_buttons(["进行验证"], onclick=[validate]) validate() start_time = time.time() time.sleep(3) while True: key = get_cookie("validated") if key: validated_key = aes.decrypt(key) if validated_key == validate_key + "/pass": clear() set_cookie("validated") return True else: put_error("验证失败:{0},请重试。".format(validated_key.split("/")[1]), closable=True) set_cookie("validated") else: time.sleep(1) if time.time() - start_time > timeout: clear() return False
|
搭建验证服务器
使用 Flask
搭建验证服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import requests from flask import Flask, make_response, request from aes import aes
app = Flask(__name__)
@app.route('/', methods=['POST']) def vaptcha_validate(): server = request.form.get("server") token = request.form.get("token") scene = request.form.get("scene") cookie = request.cookies.get("validate") ip = request.headers.get("X-Forwarded-For").split(",")[0]
if not (server or token): return {"code":501, "msg":"no token or server"}
data = dict( id = "填写 VID", secretkey = "填写 Key", scene = scene, token = token, ip = ip ) result = requests.post(server, data=data).json()
if result["success"] == 1: try: validate_key = aes.decrypt(cookie) cookie = aes.encrypt('/'.join([validate_key, "pass"])) resp = make_response({code: 200}) resp.set_cookie("validated", cookie, secure=True, samesite="None", domain=".example.com") resp.set_cookie("validate", "", secure=True, samesite="None", domain=".example.com") except: resp = make_response({code: 500}) print(f"Fail to decode cookie: {cookie}, ip: {ip}")
print(f"Receive a validation from {ip}, msg:{result['msg']}, score:{result['score']}")
resp.headers["Access-Control-Allow-Origin"] = request.headers.get("Origin") resp.headers["Access-Control-Allow-Credentials"] = "true" return resp
if __name__ == '__main__': app.run(host="0.0.0.0", port=11001, debug=False)
|