python自动化下载抖音高颜值小姐姐

续前篇:python利用appium自动化操作安卓app,进行抖音视频截屏,然后利用百度人脸识别api,识别高颜值小姐姐后,进行app的视频保存

#!/usr/bin/env python
# encoding=utf-8

import uuid
import os
import sys
import time
import json
import base64
import urllib3
import logging
from appium import webdriver
from selenium.common import exceptions
from pywinauto.application import Application
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from urllib.parse import urlencode
from cv2 import cv2
urllib3.disable_warnings()

class mumu:
    def __init__(self, exe_path):
        self.exe_path = exe_path

    def print_line(self, count=50):
        print('*'*count)

    def is_running(self):
        content_list = os.popen('tasklist | findstr ^Nemu').readlines()
        process_count = len(content_list)
        content_list = list(
            map(lambda x: x.split(), content_list))
        process_list = list(map(lambda x: x[1], content_list))
        if process_count > 1:
            return process_list
        else:
            return 0

    def kill_running(self, process_list):
        content_list = []
        for each in process_list:
            content_list.append(
                os.popen('taskkill /PID '+each+' /F').readline())
        if content_list:
            print(content_list)

    def start_server(self):
        self.stop_server()
        self.app = Application(backend='uia').start(self.exe_path, timeout=120)
        dlg_main = self.app.top_window()
        dlg_main.draw_outline()
        # dlg_main.print_control_identifiers()
        logger.info('*'*2+'启动mumu模拟器')

    def stop_server(self):
        process_list = self.is_running()
        if process_list:
            try:
                self.kill_running(process_list)
                logger.info('*'*2+'mumu进程成功关闭')
            except Exception as e:
                logger.error(str(e.__traceback__.tb_lineno)+str(e))
        else:
            pass
            # print('Appium进程不存在')

class adb_connect:
    def __init__(self, adb_path, adb_connect_str):
        logger.info('*'*2+'adb连接android模拟器')
        self.connect_result = os.popen(
            adb_path+" connect "+adb_connect_str).readlines()
        logger.info('*'*2+str(self.connect_result))

class appium:
    def __init__(self, exe_path):
        self.exe_path = exe_path

    def print_line(self, count=50):
        logger.info('*'*count)

    def is_running(self):
        content_list = os.popen('tasklist | findstr ^Appium').readlines()
        process_count = len(content_list)
        content_list = list(
            map(lambda x: x.split(), content_list))
        process_list = list(map(lambda x: x[1], content_list))
        if process_count > 1:
            return process_list
        else:
            return 0

    def kill_running(self, process_list):
        content_list = []
        for each in process_list:
            content_list.append(
                os.popen('taskkill /PID '+each+' /F').readline())
        if content_list:
            print(content_list)

    def start_server(self):
        logger.info('*'*2+'启动appium')
        self.stop_server()
        self.app = Application(backend='uia').start(
            self.exe_path, timeout=60)
        start_server_button = self.app.window(title="Appium", control_type="Pane").window(
            title="启动服务器 v1.15.0", auto_id="startServerBtn", control_type="Button")
        start_server_button.wait("exists enabled visible ready", timeout=60)
        start_server_button.draw_outline()
        start_server_button.click()

    def stop_server(self):
        process_list = self.is_running()
        if process_list:
            try:
                self.kill_running(process_list)
                logger.info('*'*2+'appium进程成功关闭')
            except Exception as e:
                logger.error(str(e.__traceback__.tb_lineno)+str(e))
        else:
            pass
            # print('Appium进程不存在')

class app_automation:
    def __init__(self, wait_time=10):
        try:
            logger.info('*'*2+'执行app初始化启动操作')
            desired_caps = {
                "automationName": "UiAutomator2",
                "platformName": "Android",
                "platformVersion": "6.0.1",
                "deviceName": "127.0.0.1:7555  device",
                "appPackage": "com.ss.android.ugc.aweme",
                "appActivity": "com.ss.android.ugc.aweme.main.MainActivity"
            }
            self.desired_caps = desired_caps
            self.driver = webdriver.Remote(
                'http://127.0.0.1:4723/wd/hub', desired_caps)
            time.sleep(wait_time)
        except Exception as e:
            logger.info('*'*2+str(e.__traceback__.tb_lineno)+str(e))
            sys.exit(1)

    def judge_app_crash(self):
        try:
            logger.info('*'*6+'判断app是否正常运行')
            if self.driver.current_activity in self.desired_caps['appActivity']:
                return True
            else:
                logger.error('*'*6+'app crashed')
                return False
        except Exception as e:
            logger.error('*'*6+str(e))
            return False

    def always_allow(self, number=3):
        try:
            logger.info('*'*2+'执行app授权操作')
            try:
                WebDriverWait(self.driver, 0.8, 0.2).until(
                    EC.presence_of_element_located(("xpath", "//*[@text='跳过广告']"))).click()
            except Exception as e:
                logger.warning('*'*3+'跳过广告'+str(e))
            try:
                WebDriverWait(self.driver, 0.8, 0.2).until(
                    EC.presence_of_element_located(("xpath", "//*[@text='我知道了']"))).click()
            except Exception as e:
                logger.warning('*'*3+'我知道了'+str(e))
            for i in range(number):
                try:
                    WebDriverWait(self.driver, 0.8, 0.2).until(
                        EC.presence_of_element_located(("xpath", "//*[@text='允许']"))).click()
                except Exception as e:
                    logger.warning('*'*3+'允许'+str(e))
            self.close_login()
            x = self.driver.get_window_size()["width"]
            y = self.driver.get_window_size()["height"]
            self.driver.swipe(x/2, y*3/4, x/2, y/4, 200)
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@text='分享']"))).click()
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@text='保存本地']"))).click()
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@text='允许']"))).click()
            self.close_login()
            return True
        except Exception as e:
            logger.info('*'*2+str(e))
            return False

    def screenshot(self, screenpath):
        try:
            logger.info('*'*6+'执行app截屏操作')
            self.driver.get_screenshot_as_file(screenpath)
            return True
        except Exception as e:
            logger.info('*'*6+str(e))
            return False

    def swipeUp(self):
        try:
            logger.info('*'*6+'执行上滑操作')
            x = self.driver.get_window_size()["width"]
            y = self.driver.get_window_size()["height"]
            self.driver.swipe(x/2, y*3/4, x/2, y/4, 200)
            return True
        except Exception as e:
            logger.info('*'*6+str(e))
            return False

    def down_video(self, wait_time=5):
        try:
            logger.info('*'*10+'执行下载操作')
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@text='分享']"))).click()
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@text='保存本地']"))).click()
            try:
                WebDriverWait(self.driver, 0.8, 0.2).until(
                    EC.presence_of_element_located(("xpath", "//*[@content-desc='关闭']"))).click()
            except Exception as e:
                logger.warning('*'*11+'关闭'+str(e))
            time.sleep(wait_time)
            return True
        except Exception as e:
            logger.info('*'*10+str(e))
            return False

    def close_login(self):
        try:
            WebDriverWait(self.driver, 0.8, 0.2).until(
                EC.presence_of_element_located(("xpath", "//*[@content-desc='关闭']"))).click()
            logger.info('检测到登录跳转页')
        except Exception as e:
            logger.warning('*'*10+str(e))

    def tearDown(self):
        try:
            self.driver.quit()
            logger.info('*'*2+'appium app session 关闭成功')
            return True
        except Exception as e:
            logger.error('*'*2+'appium app session关闭失败'+str(e))
            return False

class face_recognition:
    def __init__(self, API_Key, Secret_Key):
        try:
            url = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=' + \
                API_Key+'&client_secret='+Secret_Key
            self.http = urllib3.PoolManager()
            request = self.http.request(
                'post', url, headers={'Content-Type': 'application/json; charset=UTF-8'})
            self.access_token = json.loads(request.data)['access_token']
        except Exception as e:
            logger.info('*'*6+str(e.__traceback__.tb_lineno)+str(e))
            sys.exit(1)

    def recognize_single(self, img_all_path):
        flag = False
        try:
            logger.info('*'*6+'执行脸部识别和筛选')
            url = 'https://aip.baidubce.com/rest/2.0/face/v3/detect?access_token='+self.access_token
            with open(img_all_path, 'rb') as f:
                img_conver_data = base64.b64encode(f.read())
                params = urlencode({"image": str(img_conver_data, 'utf-8'),
                                    "image_type": "BASE64",
                                    "face_field": "age,beauty,expression,gender,emotion",
                                    "max_face_num": 10,
                                    "face_type": "LIVE",
                                    "liveness_control": "NONE"})
                request = self.http.request(
                    'post', url, body=params, headers={'Content-Type': 'application/json; charset=UTF-8'})
                content = request.data
                if (content):
                    content_dict = json.loads(content)
                    result = content_dict['result']
                    logger.info('*'*8+str(result))
                    if result:
                        face_num = result['face_num']
                        image = cv2.imread(img_all_path)
                        for i in range(face_num):
                            try:
                                face_dict = result['face_list'][i]
                                location = face_dict['location']
                                # 人脸特征条件限定
                                if face_dict['face_probability'] > 0.6 and face_dict['age'] >= 18 and face_dict['age'] <= 45 and face_dict['beauty'] > 80 and face_dict['gender']['type'] == 'female' and location['width'] > 80 and location['height'] > 80:
                                    flag = True
                                    cv2.rectangle(image,
                                                  (int(location['left']),
                                                   int(location['top'])),
                                                  (int(location['width']+location['left']),
                                                   int(location['height']+location['top'])),
                                                  (0, 0, 255),
                                                  2)
                            except:
                                pass
                        if flag:
                            logger.info('*'*8+'图片筛选成功')
                            # cv2.imshow('image', image)
                            # cv2.waitKey(3000)  # 单位是毫秒
                            # cv2.destroyAllWindows()
                            cv2.imwrite(img_all_path.split(
                                '.')[0]+'_new.png', image)
                        else:
                            logger.info('*'*8+'图片筛选不合格')
        except Exception as e:
            logger.info('*'*6+str(e))
        finally:
            return flag

if __name__ == "__main__":
    try:
        logger = logging.getLogger(__name__)
        logger.setLevel(level=logging.INFO)
        basedir = os.path.abspath(os.path.dirname(__file__))
        logfile_name = __file__+'.log'
        logfile_path = os.path.join(basedir, logfile_name)
        handler = logging.FileHandler(logfile_path, encoding='utf-8')
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        console = logging.StreamHandler()
        console.setLevel(logging.INFO)
        logger.addHandler(handler)
        logger.addHandler(console)

        logger.info('*'*20+'程序开始'+'*'*20)
        mumu_instance = mumu(
            'D:/MuMu/emulator/nemu/EmulatorShell/NemuPlayer.exe')
        mumu_instance.start_server()

        adb_instance = adb_connect(
            'C:/Users/George/AppData/Local/Android/Sdk/platform-tools/adb.exe', '127.0.0.1:7555')

        appium_instance = appium(
            'C:/Users/George/AppData/Local/Programs/Appium/Appium.exe')
        appium_instance.start_server()

        img_save_path = 'D:/VsCode/python_control_android/images/'
        # 启动app
        app_instance = app_automation()
        # 同意协议和允许权限
        if (app_instance.always_allow()):
            face_instance = face_recognition(
                'ErGqFgu5shIcseRx4nGiV472', 'enN4xQkjGtrSvm3InfGjk43m0jLPgFsb')  # 执行人脸识别初始化操作
            count = 1
            while (count < 10000):
                try:
                    logger.info('*'*4+'执行第'+str(count)+'次循环')
                    if app_instance.judge_app_crash():
                        count = count + 1
                        img_name = 'img_' + \
                            ''.join(str(uuid.uuid1()).split('-')[:-1])+'.png'
                        img_path = img_save_path+img_name
                        if app_instance.screenshot(img_path):
                            if face_instance.recognize_single(img_path):
                                if app_instance.down_video():
                                    logger.info('*'*14+'download succeed')
                            else:
                                os.remove(img_path)
                        swipe_count = 0
                        while swipe_count < 5:
                            app_instance.close_login()
                            if app_instance.swipeUp():
                                break
                            else:
                                swipe_count = swipe_count+1
                    else:
                        break
                except exceptions.InvalidSessionIdException:
                    logger.info('*'*2+'InvalidSessionIdException')
                    break
                except Exception as e:
                    logger.error('*'*2+str(e.__traceback__.tb_lineno)+str(e))
                    break
        app_instance.tearDown()
        appium_instance.stop_server()
        mumu_instance.stop_server()
    except Exception as e:
        logger.info(str(e.__traceback__.tb_lineno)+str(e))
    logger.info('*'*20+'程序结束'+'*'*20)

注意:输入自己申请的百度账号的API_Key, Secret_Key

You may also like...

发表评论