大某监控回流脚本

import json
import logging
import time
from hashlib import md5

import requests
from requests import Response

from Monitor import Monitor


class DM(Monitor):

    def __init__(self, perform: dict) -> None:
        super().__init__()
        logging.info(f"大某 {perform.get('show_name')} 开始加载")
        self.cookies = self.get_cookies(True)
        self.performId = perform.get('show_id')
        self.show_info = dict()
        self.get_show_infos()
        logging.info(f"大某 {self.show_info.get('show_name')} 加载成功")

    def get_show_infos(self):
        t, sign = self.get_show_params()
        response = self.request('https://mtop.大某.cn/h5/mtop.alibaba.大某.detail.getdetail/1.2/?jsv=2.7.2&appKey=12574478&t=%d&sign=%s&api=mtop.alibaba.大某.detail.getdetail&v=1.2&H5Request=true&type=originaljson&timeout=10000&dataType=json&valueType=original&forceAntiCreep=true&AntiCreep=true&useH5=true&data={"itemId":%s,"platform":"8","comboChannel":"2","dmChannel":"大某@d大某h5_h5"}' % (t, sign, self.performId))
        data = json.loads(response.text).get("data")
        if not data:
            self.cookies = self.get_cookies(True)
            return
        show_info = json.loads(data.get("result")).get("detailViewComponentMap").get("item")
        self.show_info["show_id"] = show_info.get("staticData").get("itemBase").get("itemId")
        self.show_info["show_name"] = show_info.get("staticData").get("itemBase").get("itemName")
        self.show_info["seat_info"] = list()
        self.show_info["platform"] = 0
        for session in show_info.get("item").get("performBases"):
            session_id = session.get("performs")[0].get("performId")
            session_name = session.get("performs")[0].get("performName")
            t, sign = self.get_seat_params(session_id)
            response = self.request('https://mtop.大某.cn/h5/mtop.alibaba.detail.subpage.getdetail/2.0/?jsv=2.7.2&appKey=12574478&t=%d&sign=%s&api=mtop.alibaba.detail.subpage.getdetail&v=2.0&H5Request=true&type=originaljson&timeout=10000&dataType=json&valueType=original&forceAntiCreep=true&AntiCreep=true&useH5=true&data={"itemId":"%s","bizCode":"ali.china.大某","scenario":"itemsku","exParams":"{\\"dataType\\":2,\\"dataId\\":\\"%s\\",\\"privilegeActId\\":\\"\\"}","platform":"8","comboChannel":"2","dmChannel":"大某@大某h5_h5"}' % (t, sign, self.performId, session_id))
            data = json.loads(response.text)
            if not data:
                self.cookies = self.get_cookies(True)
                return
            show_session_info = json.loads(data.get("data").get("result"))
            for seat in show_session_info.get("perform").get("skuList"):
                seat_info = dict()
                seat_info["session_id"] = session_id
                seat_info["session_name"] = session_name
                seat_info["seat_plan_id"] = seat.get("skuId")
                seat_info["seat_plan_name"] = seat.get("priceName")
                self.show_info["seat_info"].append(seat_info)

    def monitor(self) -> list:
        logging.info(f"大某 {self.show_info.get('show_name')} 监控中")
        can_buy_list = list()
        t, sign = self.get_show_params()
        response = self.request('https://mtop.大某.cn/h5/mtop.alibaba.大某.detail.getdetail/1.2/?jsv=2.7.2&appKey=12574478&t=%d&sign=%s&api=mtop.alibaba.大某.detail.getdetail&v=1.2&H5Request=true&type=originaljson&timeout=10000&dataType=json&valueType=original&forceAntiCreep=true&AntiCreep=true&useH5=true&data={"itemId":%s,"platform":"8","comboChannel":"2","dmChannel":"大某@大某h5_h5"}' % (t, sign, self.performId))
        data = json.loads(response.text).get("data")
        if not data:
            self.cookies = self.get_cookies(True)
            return list()
        show_info = json.loads(data.get("result")).get("detailViewComponentMap").get("item")
        for session in show_info.get("item").get("performBases"):
            session_id = session.get("performs")[0].get("performId")
            t, sign = self.get_seat_params(session_id)
            response = self.request('https://mtop.大某.cn/h5/mtop.alibaba.detail.subpage.getdetail/2.0/?jsv=2.7.2&appKey=12574478&t=%d&sign=%s&api=mtop.alibaba.detail.subpage.getdetail&v=2.0&H5Request=true&type=originaljson&timeout=10000&dataType=json&valueType=original&forceAntiCreep=true&AntiCreep=true&useH5=true&data={"itemId":"%s","bizCode":"ali.china.大某","scenario":"itemsku","exParams":"{\\"dataType\\":2,\\"dataId\\":\\"%s\\",\\"privilegeActId\\":\\"\\"}","platform":"8","comboChannel":"2","dmChannel":"大某@大某h5_h5"}' % (t, sign, self.performId, session_id))
            data = json.loads(response.text)
            if not data:
                self.cookies = self.get_cookies(True)
                return list()
            show_session_info = json.loads(data.get("data").get("result"))
            for seat in show_session_info.get("perform").get("skuList"):
                if seat.get("skuSalable") == "false":
                    continue
                can_buy_list.append(seat.get("skuId"))
        return can_buy_list

    def get_cookies(self, generate=False):
        self.cookies = {'_m_h5_tk': '3248763e77796ca30ca6f491c67e295d_1695299807188', '_m_h5_tk_enc': 'b5f681b816498129282f003be7b8c547'}
        if generate:
            response = self.request("https://mtop.大某.cn/h5/mtop.大某.wireless.search.project.classify/1.0/?jsv=2.7.2&appKey=12574478&t=1695289192037&sign=26ff9869af995563292758401df0e5c5&type=originaljson&dataType=json&v=1.0&H5Request=true&AntiCreep=true&AntiFlood=true&api=mtop.大某.wireless.search.project.classify&requestStart=1695288564353&data=%7B%22currentCityId%22%3A%220%22%2C%22cityOption%22%3A1%2C%22pageIndex%22%3A1%2C%22pageSize%22%3A15%2C%22sortType%22%3A3%2C%22categoryId%22%3A0%2C%22returnItemOption%22%3A4%2C%22dateType%22%3A0%2C%22dmChannel%22%3A%22大某%40大某h5_h5%22%7D")
            cookies = requests.utils.dict_from_cookiejar(response.cookies)
        return cookies

    def get_show_params(self):
        token = self.cookies.get("_m_h5_tk").split("_")[0]
        t = int(time.time() * 1000)
        raw = '%s&%d&12574478&{"itemId":%s,"platform":"8","comboChannel":"2","dmChannel":"大某@大某h5_h5"}' % (token, t, self.performId)
        return t, self.encrypt_md5(raw)

    def get_seat_params(self, showId):
        token = self.cookies.get("_m_h5_tk").split("_")[0]
        t = int(time.time() * 1000)
        raw = '%s&%d&12574478&{"itemId":"%s","bizCode":"ali.china.大某","scenario":"itemsku","exParams":"{\\"dataType\\":2,\\"dataId\\":\\"%s\\",\\"privilegeActId\\":\\"\\"}","platform":"8","comboChannel":"2","dmChannel":"大某@大某h5_h5"}' % (token, t, self.performId, showId)
        return t, self.encrypt_md5(raw)

    @staticmethod
    def encrypt_md5(s:str):
        new_md5 = md5()
        new_md5.update(s.encode(encoding='utf-8'))
        return new_md5.hexdigest()

    def request(self, url: str) -> Response:
        return requests.get(
            url=url,
            cookies=self.cookies,
            headers={
                'pragma': 'no-cache',
                'sec-ch-ua-mobile': '?0',
                'sec-fetch-mode': 'cors',
                'sec-fetch-dest': 'empty',
                'cache-control': 'no-cache',
                'authority': 'mtop.大某.cn',
                'accept': 'application/json',
                'sec-fetch-site': 'same-site',
                'origin': 'https://m.大某.cn',
                'sec-ch-ua-platform': '"macOS"',
                'referer': 'https://m.大某.cn/',
                'accept-language': 'zh-CN,zh;q=0.9',
                'content-type': 'application/x-www-form-urlencoded',
                'sec-ch-ua': '"Not/A)Brand";v="99", "Google Chrome";v="115", "Chromium";v="115"',
                'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
            },
            proxies=super()._proxy,
            verify=False,
            timeout=10
        )

解析

这段代码定义了一个名为 DM 的类,它继承自 `Monitor` 类。`DM` 类主要用于自动化监控和抢购大某网上的演出票。下面是代码的详细解析:

类定义和初始化

– __init__ 方法: 这是类的构造函数,它首先调用基类 Monitor 的构造方法,然后设置一些初始化的值,如演出的 ID (`performId`) 和其他相关信息。它还调用 `get_show_infos` 方法来拉取演出的具体信息。

方法说明

– get_show_infos 方法: 该方法通过发起网络请求,获取并解析演出的详细信息(如时间、场次、座位等)。它使用了自定义的请求参数生成方法 `get_show_params` 来获取正确的签名和时间戳,这是为了满足大某网 API 的安全要求。

– monitor 方法: 这个方法定期检查演出的可购买座位,并返回可以购买的座位列表。这是通过定期调用 `get_show_infos` 方法并解析返回数据来实现的。

– get_cookies 方法: 用于获取或刷新网站的 cookies。如果 `generate` 参数为 `True`,它会尝试通过一个具体的请求来生成新的 cookies。

– get_show_params 和 get_seat_params 方法: 这两个方法用于生成访问大某网 API 时所需的参数,包括 URL 签名。它们使用了 MD5 加密来确保请求的有效性。

– encrypt_md5 方法: 这是一个静态方法,用于对给定的字符串进行 MD5 加密。

– request 方法: 这是一个用于发起网络请求的方法。它接收一个 URL 并发起 GET 请求,同时处理 cookies 和 headers。

通过这个类,用户可以自动化地检查并尝试抢购大某网上的演出票。它通过精心设计的网络请求和参数处理确保了与大某网 API 的兼容性,同时通过周期性的监控来提高抢票的成功率。

注意事项

– 网络请求的安全性: 代码中应确保所有网络请求都是安全的,避免可能的数据泄露。

– 代码维护性: 鉴于大某网的页面和 API 可能会变动,代码需要定期更新和维护。

注意

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。

© 版权声明
THE END
喜欢就支持一下吧
点赞33 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容