代码如下
import gevent
from gevent import monkey
monkey.patch_all()
import traceback
import sys
import threading
import time
import requests
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from pprint import pprint
from selenium import webdriver
import json
import hashlib
confirm_url = "https://wap.showstart.com/pages/order/activity/confirm/confirm?sequence=112533&ticketId=9e91b8d67588498ab759f8f3f5ae259a&ticketNum=1&ioswx=1&terminal=app&from=singlemessage&isappinstalled=0"
login_url = "https://www.showstart.com/artist/list"
wait_time = input("提前时间(秒):")
debug_flag = input("从post_list加载账号(2开启并继续添加 1开启 0关闭):")
start_time = input("开售时间(格式:2020 10 06 16 00 10):")
DEBUG = int(debug_flag)
if DEBUG != 1:
times = input("账号数量:")
confirm_url = input("confirm_url:")
caps = {
'browserName': 'chrome',
'loggingPrefs': {
'browser': 'ALL',
'driver': 'ALL',
'performance': 'ALL',
},
'goog:chromeOptions': {
'perfLoggingPrefs': {
'enableNetwork': True,
},
'w3c': False,
},
}
caps['goog:loggingPrefs'] = {'performance': 'ALL'}
options = webdriver.ChromeOptions()
options.add_argument('log-level=3')
options.add_argument('--window-size=400,700')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option(
"mobileEmulation", {"deviceName": "Nexus 5"})
driver = webdriver.Chrome(desired_capabilities=caps, options=options)
if DEBUG == 2:
with open('post_list.json', 'r') as f:
post_list = json.load(f)
else:
post_list = []
for n in range(int(times)):
try:
print("登录第%s个账号" % str(n+1))
driver.get(login_url)
WebDriverWait(driver, 1000).until(EC.title_is(u"我的"))
driver.get(confirm_url)
b = WebDriverWait(driver, 10).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, '.payBtn')))
time.sleep(1)
input("输入验证码或手机号后,按回车继续。")
driver.get_log('performance') # 清空
b.click()
time.sleep(1)
logs = [json.loads(log['message'])['message'] for log in driver.get_log('performance') if (json.loads(log['message'])['message']['method'] == 'Network.requestWillBeSent' and 'order.json' in json.loads(
log['message'])['message']['params']['request']['url']) or json.loads(log['message'])['message']['method'] == 'Network.requestWillBeSentExtraInfo']
headers = dict()
for i in logs:
if i['method'] == 'Network.requestWillBeSentExtraInfo' and ('Accept' in i['params']['headers'] or 'accept' in i['params']['headers']):
for u in i['params']['headers']:
if ':' in u:
headers[u.strip(':')] = i['params']['headers'][u]
else:
headers[u] = i['params']['headers'][u]
if i['method'] == 'Network.requestWillBeSent':
order_url = i['params']['request']['url']
postData = json.loads(i['params']['request']['postData'])
post_list.append([order_url, postData, headers])
driver.delete_all_cookies()
except:
print(traceback.format_exc())
print("出错,跳过第%s个账号" % str(n+1))
with open('post_list.json', 'w') as f:
json.dump(post_list, f)
try:
driver.quit()
except:
pass
else:
with open('post_list.json', 'r') as f:
post_list = json.load(f)
def worker(i):
n=10
while(1):
try:
d = str(int(time.time()*1000))
u = '/wap/order/order.json'
st = i[2]['st_flpv']
l = 'xVgXtOUSos6jzR3mqb4aLHYybqqPFFGfx12r'
i[2]['r'] = str(int(time.time()*1000))
i[2]['s'] = hashlib.md5((d+u+st+l).encode('utf8')).hexdigest()
r = requests.post(i[0], json=i[1], headers=i[2],
timeout=(2, 0.001))
d = json.loads(r.text)
print(time.asctime(time.localtime(time.time())),time.time(),
i[1]['telephone'], '发包成功', d)
except requests.exceptions.ReadTimeout:
print(time.asctime(time.localtime(time.time())),time.time(),
i[1]['telephone'], '发包成功,不等待响应')
except requests.exceptions.ConnectTimeout:
print(time.asctime(time.localtime(time.time())),time.time(),
i[1]['telephone'], '请求超时')
except:
print(time.asctime(time.localtime(time.time())),time.time(),
i[1]['telephone'], traceback.format_exc())
finally:
n-=1
if(n==0):
print(time.asctime(time.localtime(time.time())),time.time(),
i[1]['telephone'], '已发包10次')
break
# time.sleep(float(wait_time))
if __name__ == '__main__':
thread_l = list()
for i in post_list:
thread_l.append(gevent.spawn(worker, i=i))
# 处理时间
t1 = time.mktime(time.strptime(start_time, "%Y %m %d %H %M %S"))
while(1):
if(t1-time.time()<float(wait_time)):
break
gevent.joinall(thread_l)
input()
# for i in post_list:
# worker(i)
解析
这个Python脚本使用了多个技术和库来实现一个自动化的网页操作和网络请求发送功能,主要用于自动化抢购或购票场景,下面将详细解释脚本的主要部分和功能:
库和模块
– gevent 和 monkey: 这是用于并发编程的库,monkey.patch_all() 是将标准库中适合的部分做上猴子补丁,使得它们变得异步化。
– selenium: 用于自动化网页操作,如点击、输入文本等。
– requests: 用于发送HTTP请求。
– hashlib: 提供了一个强大的字符串哈希功能,通常用于加密操作。
主要流程和功能
1. 初始化和配置变量:
– confirm_url 和 login_url: 分别存储登录和确认订单的URL。
– wait_time: 用户输入的提前多少秒开始执行。
– start_time: 开售时间。
– DEBUG: 调试标志,用于控制脚本的行为。
2. 浏览器自动化:
– 配置Chrome浏览器的性能和日志记录设置。
– 进行浏览器操作,自动登录,并获取特定按钮进行点击。
3. 网络请求:
– 捕获通过浏览器发出的请求并记录其详情。
– 保存请求的URL、POST数据和头部到本地文件。
4. 发起并发请求:
– 使用gevent和requests库发送存储好的请求。
– 使用hashlib来计算请求的签名。
5. 异常处理:
– 捕捉并打印错误堆栈,以便调试。
6. 多线程和定时控制:
– 在预定时间开始前的指定秒数开始执行并发请求。
– 使用`gevent`来管理并发执行的`worker`函数。
关键代码块解释
– 网络请求相关代码:
r = requests.post(i[0], json=i[1], headers=i[2], timeout=(2, 0.001))
这行代码使用requests库发送POST请求。i[0]、i[1]和i[2]分别是从之前保存的列表中读取的URL、POST数据和头部信息。
– 时间控制逻辑:
if(t1-time.time()<float(wait_time)):
break
此段代码检查当前时间是否已接近用户设定的开始时间,用于触发并发执行。
注意:
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。技术层面需要提供帮助,可以通过打赏的方式进行探讨。
1. 转载请在文内以超链形式注明出处,谢谢合作!
2. 本站除原创内容,其余所有内容均收集自互联网,仅限用于学习和研究目的,本站不对其内容的合法性承担任何责任。如有版权内容,请通知我们或作者删除,其版权均归原作者所有,本站虽力求保存原有版权信息,但因众多资源经多次转载,已无法确定其真实来源,或已将原有信息丢失,所以敬请原作者谅解!
3. 本站用户所发布的一切资源内容不代表本站立场,并不代表本站赞同其观点和对其真实性负责,若您对本站所载资源作品版权归属存有异议,请留言附说明联系邮箱,我们将在第一时间予以处理 ,同时向您表示歉意!为尊重作者版权,请购买原版作品,支持您喜欢的作者,谢谢!
4. 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客如有发现请立即向站长举报;本站资源文件大多存储在云盘,如发现链接或图片失效,请联系作者或站长及时更新。
暂无评论内容