Python实现YSU登录取信息

项目结构

主体逻辑代码

import random
import time
import json
import urllib.parse
import requests
import urllib
from lxml import etree
from ddddocr import DdddOcr
from config.config import Config


class YSUinfo:
    URL_SERVICE = urllib.parse.quote("https://jwxt.ysu.edu.cn/jwmobile/index")
    URL_SERVICE_GPA = urllib.parse.quote(
        "https://jwxt.ysu.edu.cn/jwapp/sys/cjcx/*default/index.do?THEME=indigo&EMAP_LANG=zh#/cjcx?THEME=indigo&EMAP_LANG=zh"
    )
    URL_LOGIN = f"https://cer.ysu.edu.cn/authserver/login?service={URL_SERVICE}"
    URL_TEST = "https://jwxt.ysu.edu.cn/jwmobile/biz/v410/score/termScore"
    URL_LOGIN_SESSION = "https://jwxt.ysu.edu.cn/jwmobile/login/login.do"
    URL_AUTHORIZATION = "https://jwxt.ysu.edu.cn/jwmobile/auth/index"
    URL_TERMSCORE = "https://jwxt.ysu.edu.cn/jwmobile/biz/v410/score/termScore"
    URL_SCOREDETAIL = "https://jwxt.ysu.edu.cn/jwmobile/biz/v410/score/scoreDetail"
    URL_RECENTEXAMS = "https://jwxt.ysu.edu.cn/jwmobile/biz/v410/examTask/recentExams"
    URL_QUERYSCHEDULE = (
        "https://jwxt.ysu.edu.cn/jwmobile/biz/v410/schedule/querySchedule"
    )
    URL_WEEKS = "https://jwxt.ysu.edu.cn/jwmobile/biz/home/queryWelcomeContent"

    URL_GPA = "https://jwxt.ysu.edu.cn/jwapp/sys/cjcx/modules/cjcx/cxzxfaxfjd.do"

    def __init__(self, userName, passWord, useCache=True, loginType=0):
        """
        初始化
        :param username: 用户名
        :param password: 密码
        :param useCache: 是否使用缓存cookie
        :param type: 0为普通登录,1为获取绩点
        """
        self.username = userName
        self.password = passWord
        self.useCache = useCache
        self.loginType = loginType
        self.session = requests.session()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
            "Accept-Language": "zh-CN,zh;q=0.9",
        }
        self.session.headers.update(self.headers)
        self.cookies = None
        self.authorization = None
        self.set_login_url()
        self.load_session()

    def set_login_url(self):
        if self.loginType == 0:
            self.URL_LOGIN = (
                f"https://cer.ysu.edu.cn/authserver/login?service={self.URL_SERVICE}"
            )
        elif self.loginType == 1:
            self.URL_LOGIN = "https://cer.ysu.edu.cn/authserver/login?service=https%3A%2F%2Fjwxt.ysu.edu.cn%2Fjwapp%2Fsys%2Femapfunauth%2FcasValidate.do%3Fservice%3Dhttps%253A%252F%252Fjwxt.ysu.edu.cn%252Fjwapp%252Fsys%252Fcjcx%252F*default%252Findex.do%253FTHEME%253Dindigo%2526EMAP_LANG%253Dzh"

    def pushMessage(self, token, title, message):
        url = f"http://www.pushplus.plus/send?token={token}&title={title}&content={message}&template=html"
        requests.get(url).json()

    def load_session(self):
        if self.useCache and self.loginType == 0:
            try:
                with open("config/session.json", "r") as f:
                    session_data = json.load(f)
                self.session.cookies.update(session_data)
                self.authorization = session_data["Authorization"]
                self.cookies = session_data
            except FileNotFoundError:
                print("未找到session文件,正在重新登录获取..")
                self.refresh_session()
        else:
            self.refresh_session()

    def refresh_session(self):
        self.login()
        self.save_session()

    def save_session(self):
        with open("config/session.json", "w") as f:
            json.dump(self.session.cookies.get_dict(), f)

    def login(self):
        """
        Login into YSU CAS
        :return: session that contains cookies
        """
        lt_v, dllt_v, execution_v, event_id_v, pwdSalt = self._get_login_parameters(
            self.session
        )
        pwdEcy = self._encrypt_password(pwdSalt)

        workload = {
            "username": self.username,
            "password": pwdEcy,
            "rememberMe": "true",
            "lt": lt_v,
            "dllt": dllt_v,
            "execution": execution_v,
            "_eventId": event_id_v,
            "captcha": "",
        }

        if self._needs_captcha(self.session, self.username):
            code = self._get_captcha(self.session)
            workload["captcha"] = code

        result = self.session.post(self.URL_LOGIN, data=workload, allow_redirects=False)

        if "happyVoyage" in self.session.cookies:
            location = result.headers.get("Location")
            loc = self._get_MOD_AUTH_CAS(location)
            if self.loginType == 1:
                # 需要多次请求才能获取到最终236位的_WEU参数
                loc = self._get_WEU_1(loc)
                appid = self._get_WEU_2(loc)
                self._get_WEU_3(appid)
                self._get_WEU_4(appid)
                self._get_WEU_5()
                self._get_WEU_6()

            else:
                Authorization = self._get_Authorization()
                self.authorization = Authorization
            self.cookies = self.session.cookies.get_dict()
            return self.session
        else:
            print("Login failed, waiting for 5 seconds...")
            time.sleep(3)
            self.login()

    def _get_WEU_1(self, locationUrl):
        res = self.session.get(locationUrl, allow_redirects=False, headers=self.headers)
        location = res.headers.get("Location")
        return location

    def _get_WEU_2(self, locationUrl):
        res = self.session.get(locationUrl, allow_redirects=False, headers=self.headers)
        appid = res.text.split('"appId":"')[1].split('","_v"')[0]
        return appid

    def _get_WEU_3(self, appid):
        url = "https://jwxt.ysu.edu.cn/jwapp/sys/jwpubapp/modules/bb/cxjwggbbdqx.do"
        data = {"APP": appid, "SFQY": "1"}
        self.session.post(url, data=data, headers=self.headers)

    def _get_WEU_4(self, appid):
        url = f"https://jwxt.ysu.edu.cn/jwapp/sys/funauthapp/api/getAppConfig/cjcx-{appid}.do?v=03089445764325194"
        self.session.get(url, headers=self.headers)

    def _get_WEU_5(self):
        url = "https://jwxt.ysu.edu.cn/jwapp/sys/homeapp/index.do"
        self.session.get(url, headers=self.headers)

    def _get_WEU_6(self):
        url = "https://jwxt.ysu.edu.cn/jwapp/sys/emappagelog/config/cjcx.do"
        self.session.get(url, headers=self.headers)

    def _get_MOD_AUTH_CAS(self, locationUrl):
        res = self.session.get(locationUrl, allow_redirects=False)
        location = res.headers.get("Location")
        return location

    def _get_Authorization(self):
        url = self.URL_AUTHORIZATION
        res = self.session.get(url, allow_redirects=False)
        location = res.headers["Location"]
        Authorization = location.split("token=")[1]
        self.session.cookies.set("Authorization", Authorization)
        return Authorization

    def _get_login_parameters(self, session):
        result = session.get(self.URL_LOGIN)
        result.raise_for_status()  # Raise exception for non-200 response
        tree = etree.HTML(result.text)

        lt_v = tree.xpath('//input[@name="lt"]/@value')[0]
        dllt_v = tree.xpath('//input[@name="dllt"]/@value')[0]
        execution_v = tree.xpath('//input[@name="execution"]/@value')[0]
        event_id_v = tree.xpath('//input[@name="_eventId"]/@value')[0]
        pwdSalt = tree.xpath('//*[@id="pwdEncryptSalt"]/@value')[0]

        return lt_v, dllt_v, execution_v, event_id_v, pwdSalt

    def _encrypt_password(self, pwdSalt):
        if not self.username or not self.password:
            exit(2)
        from Encrypt import encryptAES

        return encryptAES(self.password, pwdSalt)

    def _needs_captcha(self, session, username):
        capt_result = session.get(
            f"https://cer.ysu.edu.cn/authserver/needCaptcha.html?username={username}&pwdEncrypt2=pwdEncryptSalt&_={int(time.time())}"
        )
        return "true" in capt_result.text

    def _get_captcha(self, session):
        captimg_result = session.get(
            f"https://cer.ysu.edu.cn/authserver/getCaptcha.htl?ts={random.randint(1, 999)}"
        )
        ocr = DdddOcr(show_ad=False)
        return ocr.classification(captimg_result.content)

    def _get_weeks(self):
        url = self.URL_WEEKS
        response = self.session.post(url, headers=self.headers)
        obj = response.json()
        return obj["data"]["dateTipText"].split("现在是第")[1].split("周")[0]

    def get_term_score(self):
        url = self.URL_TERMSCORE
        payload = {"termCode": "2023-2024-2", "courseNature": "", "courseName": ""}
        response = self.session.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_score_detail(self, id):
        """
        :param id: 课程id
        """
        url = self.URL_SCOREDETAIL
        payload = {"id": f"{id}"}
        response = self.session.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_recentExams(self):
        url = self.URL_RECENTEXAMS
        payload = {"termCode": "2023-2024-2", "courseNature": "", "courseName": ""}
        response = self.session.get(url, json=payload, headers=self.headers)
        return response.json()

    def get_querySchedule(self):
        url = self.URL_QUERYSCHEDULE
        playload = {"skzc": self._get_weeks(), "xnxqdm": "2023-2024-2"}
        response = self.session.post(url, json=playload, headers=self.headers)
        return response.json()

    def get_gpa(self, username):
        url = self.URL_GPA
        data = {
            "XH1": username,
            "XH2": username,
            "XH3": username,
            "XH4": username,
            "XH5": username,
            "XH6": username,
        }
        response = self.session.post(url, data=data, headers=self.headers)
        return response.json()


def pushScore():
    cfg = Config()
    try:
        ysuinfo = YSUinfo(cfg.username, cfg.password)
        scoreJson = ysuinfo.get_term_score()
        newScoreJson = scoreJson["data"]["termScoreList"][0]["scoreList"]
    except Exception as e:
        print(f"Error: {e}. Attempting to re-login.")
        ysuinfo = YSUinfo(cfg.username, cfg.password, False)
        scoreJson = ysuinfo.get_term_score()
        newScoreJson = scoreJson["data"]["termScoreList"][0]["scoreList"]
    try:
        with open("config/score.json", "r", encoding="utf-8") as f:
            fileJson = json.load(f)
    except:
        fileJson = newScoreJson
        with open("config/score.json", "w", encoding="utf-8") as f:
            f.write(json.dumps(fileJson))

    msg = ""
    for score in newScoreJson:
        if score not in fileJson:
            print("出分了!")
            msg = (
                "课程:"
                + score["courseName"]
                + "  成绩:"
                + score["score"]
                + "   学分:"
                + score["coursePoint"]
                + "\n"
            )
    if msg != "":
        with open("config/score.json", "w", encoding="utf-8") as f:
            f.write(json.dumps(newScoreJson))
        print(msg)
        ysuinfo.pushMessage(cfg.pushkey, "出分了!", msg)
    else:
        print("未出分")


def pushGPA():
    cfg = Config()
    try:
        ysuinfo = YSUinfo(cfg.username, cfg.password, False, 1)
        gpaJson = ysuinfo.get_gpa(cfg.username)
    except Exception as e:
        print(f"Error: {e}. Attempting to re-login.")
        ysuinfo = YSUinfo(cfg.username, cfg.password, False, 1)
        gpaJson = ysuinfo.get_gpa(cfg.username)
    print(gpaJson)


if __name__ == "__main__":
    pushScore()
    pushGPA()

登录AES加密复刻

import base64
import random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad


def _gas(data, key0, iv0):
    key = key0.strip().encode("utf-8")
    iv = iv0.strip().encode("utf-8")
    cipher = AES.new(key, AES.MODE_CBC, iv)
    encrypted = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
    return base64.b64encode(encrypted).decode("utf-8")


def encryptAES(data, _p1):
    ## Used by CAS Server to encrypt password.
    ## Pattern: encryptAES(pwd, key), key can be found from HTML.
    if not _p1:
        return data
    random_str = _rds(64) + data
    encrypted = _gas(random_str, _p1, _rds(16))
    return encrypted


def _ep(p0, p1):
    try:
        return encryptAES(p0, p1)
    except Exception as e:
        pass
    return p0


_chars = "ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz2345678"
_chars_len = len(_chars)


def _rds(length):
    ret_str = "".join(random.choice(_chars) for _ in range(length))
    return ret_str

配置项

from pydantic import BaseModel


class Config(BaseModel):
    # 配置项
    username: str = "123"
    password: str = "123"
    pushkey: str = "123"
    

测试结果

 

52733