某动预约抢票脚本

代码如下

import gevent
from gevent import monkey
monkey.patch_all()

import traceback
import sys
import threading
import time
import requests
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from pprint import pprint
from selenium import webdriver
import json
import hashlib





confirm_url = "https://wap.showstart.com/pages/order/activity/confirm/confirm?sequence=112533&ticketId=9e91b8d67588498ab759f8f3f5ae259a&ticketNum=1&ioswx=1&terminal=app&from=singlemessage&isappinstalled=0"
login_url = "https://www.showstart.com/artist/list"


wait_time = input("提前时间(秒):")

debug_flag = input("从post_list加载账号(2开启并继续添加 1开启 0关闭):")

start_time = input("开售时间(格式:2020 10 06 16 00 10):")

DEBUG = int(debug_flag)

if DEBUG != 1:
    times = input("账号数量:")
    confirm_url = input("confirm_url:")
    caps = {
        'browserName': 'chrome',
        'loggingPrefs': {
            'browser': 'ALL',
            'driver': 'ALL',
            'performance': 'ALL',
        },
        'goog:chromeOptions': {
            'perfLoggingPrefs': {
                'enableNetwork': True,
            },
            'w3c': False,
        },
    }
    caps['goog:loggingPrefs'] = {'performance': 'ALL'}
    options = webdriver.ChromeOptions()
    options.add_argument('log-level=3')
    options.add_argument('--window-size=400,700')
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option(
        "mobileEmulation", {"deviceName": "Nexus 5"})
    driver = webdriver.Chrome(desired_capabilities=caps, options=options)

    if DEBUG == 2:
        with open('post_list.json', 'r') as f:
            post_list = json.load(f)
    else:
        post_list = []

    for n in range(int(times)):
        try:
            print("登录第%s个账号" % str(n+1))
            driver.get(login_url)
            WebDriverWait(driver, 1000).until(EC.title_is(u"我的"))
            driver.get(confirm_url)

            b = WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located((By.CSS_SELECTOR, '.payBtn')))
            time.sleep(1)
            input("输入验证码或手机号后,按回车继续。")

            driver.get_log('performance')  # 清空

            b.click()

            time.sleep(1)

            logs = [json.loads(log['message'])['message'] for log in driver.get_log('performance') if (json.loads(log['message'])['message']['method'] == 'Network.requestWillBeSent' and 'order.json' in json.loads(
                log['message'])['message']['params']['request']['url']) or json.loads(log['message'])['message']['method'] == 'Network.requestWillBeSentExtraInfo']
            headers = dict()
            for i in logs:
                if i['method'] == 'Network.requestWillBeSentExtraInfo' and ('Accept' in i['params']['headers'] or 'accept' in i['params']['headers']):
                    for u in i['params']['headers']:
                        if ':' in u:
                            headers[u.strip(':')] = i['params']['headers'][u]
                        else:
                            headers[u] = i['params']['headers'][u]
                if i['method'] == 'Network.requestWillBeSent':
                    order_url = i['params']['request']['url']
                    postData = json.loads(i['params']['request']['postData'])
            post_list.append([order_url, postData, headers])
            driver.delete_all_cookies()
        except:
            print(traceback.format_exc())
            print("出错,跳过第%s个账号" % str(n+1))
    with open('post_list.json', 'w') as f:
        json.dump(post_list, f)

    try:
        driver.quit()
    except:
        pass
else:
    with open('post_list.json', 'r') as f:
        post_list = json.load(f)


def worker(i):
    n=10
    while(1):
        try:
            d = str(int(time.time()*1000))
            u = '/wap/order/order.json'
            st = i[2]['st_flpv']
            l = 'xVgXtOUSos6jzR3mqb4aLHYybqqPFFGfx12r'
            i[2]['r'] = str(int(time.time()*1000))
            i[2]['s'] = hashlib.md5((d+u+st+l).encode('utf8')).hexdigest()
            r = requests.post(i[0], json=i[1], headers=i[2],
                              timeout=(2, 0.001))
            d = json.loads(r.text)
            print(time.asctime(time.localtime(time.time())),time.time(),
                  i[1]['telephone'], '发包成功', d)
        except requests.exceptions.ReadTimeout:
            print(time.asctime(time.localtime(time.time())),time.time(),
                  i[1]['telephone'], '发包成功,不等待响应')
        except requests.exceptions.ConnectTimeout:
            print(time.asctime(time.localtime(time.time())),time.time(),
                  i[1]['telephone'], '请求超时')
        except:
            print(time.asctime(time.localtime(time.time())),time.time(),
                  i[1]['telephone'], traceback.format_exc())
        finally:
            n-=1
            if(n==0):
                print(time.asctime(time.localtime(time.time())),time.time(),
                  i[1]['telephone'], '已发包10次')
                break
            # time.sleep(float(wait_time))


if __name__ == '__main__':
    thread_l = list()
    for i in post_list:
        thread_l.append(gevent.spawn(worker, i=i))
    # 处理时间
    t1 = time.mktime(time.strptime(start_time, "%Y %m %d %H %M %S"))
    while(1):
        if(t1-time.time()<float(wait_time)):
            break

    gevent.joinall(thread_l)

    input()
    # for i in post_list:
    #     worker(i)

解析

这个Python脚本使用了多个技术和库来实现一个自动化的网页操作和网络请求发送功能,主要用于自动化抢购或购票场景,下面将详细解释脚本的主要部分和功能:

库和模块

– gevent 和 monkey: 这是用于并发编程的库,monkey.patch_all() 是将标准库中适合的部分做上猴子补丁,使得它们变得异步化。

– selenium: 用于自动化网页操作,如点击、输入文本等。

– requests: 用于发送HTTP请求。

– hashlib: 提供了一个强大的字符串哈希功能,通常用于加密操作。

主要流程和功能

1. 初始化和配置变量: 

   – confirm_url 和 login_url: 分别存储登录和确认订单的URL。

   – wait_time: 用户输入的提前多少秒开始执行。

   – start_time: 开售时间。

   – DEBUG: 调试标志,用于控制脚本的行为。

2. 浏览器自动化:

   – 配置Chrome浏览器的性能和日志记录设置。

   – 进行浏览器操作,自动登录,并获取特定按钮进行点击。

3. 网络请求:

   – 捕获通过浏览器发出的请求并记录其详情。

   – 保存请求的URL、POST数据和头部到本地文件。

4. 发起并发请求:

   – 使用gevent和requests库发送存储好的请求。

   – 使用hashlib来计算请求的签名。

5. 异常处理:

   – 捕捉并打印错误堆栈,以便调试。

6. 多线程和定时控制:

   – 在预定时间开始前的指定秒数开始执行并发请求。

   – 使用`gevent`来管理并发执行的`worker`函数。

关键代码块解释

– 网络请求相关代码:

r = requests.post(i[0], json=i[1], headers=i[2], timeout=(2, 0.001))

  这行代码使用requests库发送POST请求。i[0]、i[1]和i[2]分别是从之前保存的列表中读取的URL、POST数据和头部信息。

– 时间控制逻辑:

if(t1-time.time()<float(wait_time)):
    break

此段代码检查当前时间是否已接近用户设定的开始时间,用于触发并发执行。

注意

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。

© 版权声明
THE END
喜欢就支持一下吧
点赞51 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容