续前篇:python利用appium自动化操作安卓app,进行抖音视频截屏,然后利用百度人脸识别api,识别高颜值小姐姐后,进行app的视频保存
#!/usr/bin/env python
# encoding=utf-8
import uuid
import os
import sys
import time
import json
import base64
import urllib3
import logging
from appium import webdriver
from selenium.common import exceptions
from pywinauto.application import Application
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from urllib.parse import urlencode
from cv2 import cv2
urllib3.disable_warnings()
class mumu:
def __init__(self, exe_path):
self.exe_path = exe_path
def print_line(self, count=50):
print('*'*count)
def is_running(self):
content_list = os.popen('tasklist | findstr ^Nemu').readlines()
process_count = len(content_list)
content_list = list(
map(lambda x: x.split(), content_list))
process_list = list(map(lambda x: x[1], content_list))
if process_count > 1:
return process_list
else:
return 0
def kill_running(self, process_list):
content_list = []
for each in process_list:
content_list.append(
os.popen('taskkill /PID '+each+' /F').readline())
if content_list:
print(content_list)
def start_server(self):
self.stop_server()
self.app = Application(backend='uia').start(self.exe_path, timeout=120)
dlg_main = self.app.top_window()
dlg_main.draw_outline()
# dlg_main.print_control_identifiers()
logger.info('*'*2+'启动mumu模拟器')
def stop_server(self):
process_list = self.is_running()
if process_list:
try:
self.kill_running(process_list)
logger.info('*'*2+'mumu进程成功关闭')
except Exception as e:
logger.error(str(e.__traceback__.tb_lineno)+str(e))
else:
pass
# print('Appium进程不存在')
class adb_connect:
def __init__(self, adb_path, adb_connect_str):
logger.info('*'*2+'adb连接android模拟器')
self.connect_result = os.popen(
adb_path+" connect "+adb_connect_str).readlines()
logger.info('*'*2+str(self.connect_result))
class appium:
def __init__(self, exe_path):
self.exe_path = exe_path
def print_line(self, count=50):
logger.info('*'*count)
def is_running(self):
content_list = os.popen('tasklist | findstr ^Appium').readlines()
process_count = len(content_list)
content_list = list(
map(lambda x: x.split(), content_list))
process_list = list(map(lambda x: x[1], content_list))
if process_count > 1:
return process_list
else:
return 0
def kill_running(self, process_list):
content_list = []
for each in process_list:
content_list.append(
os.popen('taskkill /PID '+each+' /F').readline())
if content_list:
print(content_list)
def start_server(self):
logger.info('*'*2+'启动appium')
self.stop_server()
self.app = Application(backend='uia').start(
self.exe_path, timeout=60)
start_server_button = self.app.window(title="Appium", control_type="Pane").window(
title="启动服务器 v1.15.0", auto_id="startServerBtn", control_type="Button")
start_server_button.wait("exists enabled visible ready", timeout=60)
start_server_button.draw_outline()
start_server_button.click()
def stop_server(self):
process_list = self.is_running()
if process_list:
try:
self.kill_running(process_list)
logger.info('*'*2+'appium进程成功关闭')
except Exception as e:
logger.error(str(e.__traceback__.tb_lineno)+str(e))
else:
pass
# print('Appium进程不存在')
class app_automation:
def __init__(self, wait_time=10):
try:
logger.info('*'*2+'执行app初始化启动操作')
desired_caps = {
"automationName": "UiAutomator2",
"platformName": "Android",
"platformVersion": "6.0.1",
"deviceName": "127.0.0.1:7555 device",
"appPackage": "com.ss.android.ugc.aweme",
"appActivity": "com.ss.android.ugc.aweme.main.MainActivity"
}
self.desired_caps = desired_caps
self.driver = webdriver.Remote(
'http://127.0.0.1:4723/wd/hub', desired_caps)
time.sleep(wait_time)
except Exception as e:
logger.info('*'*2+str(e.__traceback__.tb_lineno)+str(e))
sys.exit(1)
def judge_app_crash(self):
try:
logger.info('*'*6+'判断app是否正常运行')
if self.driver.current_activity in self.desired_caps['appActivity']:
return True
else:
logger.error('*'*6+'app crashed')
return False
except Exception as e:
logger.error('*'*6+str(e))
return False
def always_allow(self, number=3):
try:
logger.info('*'*2+'执行app授权操作')
try:
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='跳过广告']"))).click()
except Exception as e:
logger.warning('*'*3+'跳过广告'+str(e))
try:
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='我知道了']"))).click()
except Exception as e:
logger.warning('*'*3+'我知道了'+str(e))
for i in range(number):
try:
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='允许']"))).click()
except Exception as e:
logger.warning('*'*3+'允许'+str(e))
self.close_login()
x = self.driver.get_window_size()["width"]
y = self.driver.get_window_size()["height"]
self.driver.swipe(x/2, y*3/4, x/2, y/4, 200)
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='分享']"))).click()
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='保存本地']"))).click()
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='允许']"))).click()
self.close_login()
return True
except Exception as e:
logger.info('*'*2+str(e))
return False
def screenshot(self, screenpath):
try:
logger.info('*'*6+'执行app截屏操作')
self.driver.get_screenshot_as_file(screenpath)
return True
except Exception as e:
logger.info('*'*6+str(e))
return False
def swipeUp(self):
try:
logger.info('*'*6+'执行上滑操作')
x = self.driver.get_window_size()["width"]
y = self.driver.get_window_size()["height"]
self.driver.swipe(x/2, y*3/4, x/2, y/4, 200)
return True
except Exception as e:
logger.info('*'*6+str(e))
return False
def down_video(self, wait_time=5):
try:
logger.info('*'*10+'执行下载操作')
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='分享']"))).click()
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@text='保存本地']"))).click()
try:
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@content-desc='关闭']"))).click()
except Exception as e:
logger.warning('*'*11+'关闭'+str(e))
time.sleep(wait_time)
return True
except Exception as e:
logger.info('*'*10+str(e))
return False
def close_login(self):
try:
WebDriverWait(self.driver, 0.8, 0.2).until(
EC.presence_of_element_located(("xpath", "//*[@content-desc='关闭']"))).click()
logger.info('检测到登录跳转页')
except Exception as e:
logger.warning('*'*10+str(e))
def tearDown(self):
try:
self.driver.quit()
logger.info('*'*2+'appium app session 关闭成功')
return True
except Exception as e:
logger.error('*'*2+'appium app session关闭失败'+str(e))
return False
class face_recognition:
def __init__(self, API_Key, Secret_Key):
try:
url = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=' + \
API_Key+'&client_secret='+Secret_Key
self.http = urllib3.PoolManager()
request = self.http.request(
'post', url, headers={'Content-Type': 'application/json; charset=UTF-8'})
self.access_token = json.loads(request.data)['access_token']
except Exception as e:
logger.info('*'*6+str(e.__traceback__.tb_lineno)+str(e))
sys.exit(1)
def recognize_single(self, img_all_path):
flag = False
try:
logger.info('*'*6+'执行脸部识别和筛选')
url = 'https://aip.baidubce.com/rest/2.0/face/v3/detect?access_token='+self.access_token
with open(img_all_path, 'rb') as f:
img_conver_data = base64.b64encode(f.read())
params = urlencode({"image": str(img_conver_data, 'utf-8'),
"image_type": "BASE64",
"face_field": "age,beauty,expression,gender,emotion",
"max_face_num": 10,
"face_type": "LIVE",
"liveness_control": "NONE"})
request = self.http.request(
'post', url, body=params, headers={'Content-Type': 'application/json; charset=UTF-8'})
content = request.data
if (content):
content_dict = json.loads(content)
result = content_dict['result']
logger.info('*'*8+str(result))
if result:
face_num = result['face_num']
image = cv2.imread(img_all_path)
for i in range(face_num):
try:
face_dict = result['face_list'][i]
location = face_dict['location']
# 人脸特征条件限定
if face_dict['face_probability'] > 0.6 and face_dict['age'] >= 18 and face_dict['age'] <= 45 and face_dict['beauty'] > 80 and face_dict['gender']['type'] == 'female' and location['width'] > 80 and location['height'] > 80:
flag = True
cv2.rectangle(image,
(int(location['left']),
int(location['top'])),
(int(location['width']+location['left']),
int(location['height']+location['top'])),
(0, 0, 255),
2)
except:
pass
if flag:
logger.info('*'*8+'图片筛选成功')
# cv2.imshow('image', image)
# cv2.waitKey(3000) # 单位是毫秒
# cv2.destroyAllWindows()
cv2.imwrite(img_all_path.split(
'.')[0]+'_new.png', image)
else:
logger.info('*'*8+'图片筛选不合格')
except Exception as e:
logger.info('*'*6+str(e))
finally:
return flag
if __name__ == "__main__":
try:
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
basedir = os.path.abspath(os.path.dirname(__file__))
logfile_name = __file__+'.log'
logfile_path = os.path.join(basedir, logfile_name)
handler = logging.FileHandler(logfile_path, encoding='utf-8')
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(console)
logger.info('*'*20+'程序开始'+'*'*20)
mumu_instance = mumu(
'D:/MuMu/emulator/nemu/EmulatorShell/NemuPlayer.exe')
mumu_instance.start_server()
adb_instance = adb_connect(
'C:/Users/George/AppData/Local/Android/Sdk/platform-tools/adb.exe', '127.0.0.1:7555')
appium_instance = appium(
'C:/Users/George/AppData/Local/Programs/Appium/Appium.exe')
appium_instance.start_server()
img_save_path = 'D:/VsCode/python_control_android/images/'
# 启动app
app_instance = app_automation()
# 同意协议和允许权限
if (app_instance.always_allow()):
face_instance = face_recognition(
'ErGqFgu5shIcseRx4nGiV472', 'enN4xQkjGtrSvm3InfGjk43m0jLPgFsb') # 执行人脸识别初始化操作
count = 1
while (count < 10000):
try:
logger.info('*'*4+'执行第'+str(count)+'次循环')
if app_instance.judge_app_crash():
count = count + 1
img_name = 'img_' + \
''.join(str(uuid.uuid1()).split('-')[:-1])+'.png'
img_path = img_save_path+img_name
if app_instance.screenshot(img_path):
if face_instance.recognize_single(img_path):
if app_instance.down_video():
logger.info('*'*14+'download succeed')
else:
os.remove(img_path)
swipe_count = 0
while swipe_count < 5:
app_instance.close_login()
if app_instance.swipeUp():
break
else:
swipe_count = swipe_count+1
else:
break
except exceptions.InvalidSessionIdException:
logger.info('*'*2+'InvalidSessionIdException')
break
except Exception as e:
logger.error('*'*2+str(e.__traceback__.tb_lineno)+str(e))
break
app_instance.tearDown()
appium_instance.stop_server()
mumu_instance.stop_server()
except Exception as e:
logger.info(str(e.__traceback__.tb_lineno)+str(e))
logger.info('*'*20+'程序结束'+'*'*20)
注意:输入自己申请的百度账号的API_Key, Secret_Key