概述
# -*- coding: utf-8 -*-
# @Author
: zx
import json
import os
from appium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
import time
import threading
from selenium.webdriver.support import expected_conditions as ec
from loguru import logger
from douyin_ele import douyinObject
import random
phone_w = 1080
phone_h = 2120
send_x = 350
send_y = 600
# 需要识别的标识
mark_text = "111"
class Common:
def __init__(self, driver):
self.driver = driver
self.size = self.driver.get_window_size()
self.file_path = "./d_y_data.json"
def swipe_up(self, t=5000, n=1):
# 根据手机屏幕的宽高,来确定起始坐标位置
# 上下滑动水平x轴不变,屏幕的宽度*0.5 表示屏幕的中间
start_x = self.size['width'] * 0.5
# x坐标
# 手势向上滑动,y轴的坐标从大到小
start_y = self.size['height'] * 0.75
# 起点y坐标
end_y = self.size['height'] * 0.25
# 终点y坐标
for i in range(n):
self.driver.swipe(start_x, start_y, start_x, end_y, t)
def swipe_down(self, duration=5000, n=1):
# 根据手机屏幕的宽高,来确定起始坐标位置
# 上下滑动水平x轴不变,屏幕的宽度*0.5 表示屏幕的中间
start_x = self.size['width'] * 0.5
# x坐标
# 手势向下滑动,y轴的坐标从小到大
start_y = self.size['height'] * 0.25
# 起始y坐标
end_y = self.size['height'] * 0.75
# 终点y坐标
for i in range(n):
self.driver.swipe(start_x, start_y, start_x, end_y, duration)
#
等待元素可见
def get_visible_element(self, loc, timeout=60):
try:
ele = WebDriverWait(self.driver, timeout).until(ec.visibility_of_element_located(loc))
return ele
except Exception as e:
raise logger.error("get_visible_element报错,元素%s" % (loc,), repr(e))
def get_presence_element(self, loc, timeout=60):
try:
ele = WebDriverWait(self.driver, timeout).until(ec.presence_of_element_located(loc))
return ele
except Exception as e:
logger.error("get_presence_element报错,元素%s" % (loc,), repr(e))
def get_find_elements(self, loc, timeout=60):
try:
ele = WebDriverWait(self.driver, timeout=timeout).until(lambda x: x.find_elements(*loc))
return ele
except Exception as e:
logger.error("get_find_elements报错,元素%s" % (loc,), repr(e))
def d_page_source(self):
return self.driver.page_source
def scroll(self, start_el, stop_el):
# stop_el = self.get_visible_element(loc1)
# start_el = self.get_visible_element(loc2)
self.driver.scroll(start_el, stop_el)
def swipe_down_search_user(self):
# 向下滑动屏幕
try:
# l = self.driver.get_window_size()
# 获取屏幕尺寸
# 根据比率来滑动屏幕
x1 = self.size['width'] * 0.5
# x坐标
y1 = self.size['height'] * 0.65
# 起始y坐标
y2 = self.size['height'] * 0.85
# 终点y坐标
self.driver.swipe(x1, y1, x1, y2)
except Exception as e:
logger.error("swipe_down_search_user 报错", repr(e))
# 在播放视频的时候获取size会报错
def get_win_size(self):
try:
size = self.driver.get_window_size()
# 获取屏幕尺寸
logger.info("获取屏幕的大小%s" % (size,))
return size
except Exception as e:
logger.error("获取屏幕大小报错", repr(e))
def send_button(self):
try:
a1 = send_x / phone_w
b1 = send_y / phone_h
# 获取当前手机屏幕大小X,Y
X = self.size['width']
Y = self.size['height']
logger.info("屏幕参数%s" % (a1 - X - b1 - Y))
# 屏幕坐标乘以系数即为用户要点击位置的具体坐标
self.driver.tap([(a1 * X, b1 * Y)])
time.sleep(0.3)
except Exception as e:
logger.error("send_button报错", repr(e))
def get_send_d_y_list(self):
self.get_visible_element(douyinObject.xiaoxi)
self.send_button()
self.get_presence_element(douyinObject.xiaoxi).click()
self.get_visible_element(douyinObject.qun_name1).click()
# 翻页到昨天的聊天记录
while True:
try:
self.get_visible_element(douyinObject.zt, timeout=5)
break
except Exception as e:
# 向上滑动
logger.error("没有找到昨天的文本", repr(e))
self.swipe_down()
# 一直向下滑获取屏幕获取今天的消息
while True:
try:
self.get_visible_element(douyinObject.time_class, timeout=5)
break
except Exception as e:
# 向下滑动
logger.error("没有找到不是昨天的文本", repr(e))
self.swipe_up()
# 在滑到爱心来
while True:
try:
self.get_visible_element((By.XPATH, douyinObject.ai_xin.format(mark_text)), timeout=5)
break
except Exception as e:
logger.error("没有今天%s的文本" % (mark_text,), repr(e))
self.swipe_up()
# 获取当前发送爱心的抖音用户的名称
xio_xi_list = []
while True:
# 滑动之前的页面元素
before_swipe = self.d_page_source()
# logger.info("滑动之前的页面元素%s" % (before_swipe,))
user_loc = self.get_find_elements((By.XPATH, douyinObject.user_name.format(mark_text)))
for user_name in user_loc:
xio_xi_list.append(user_name.text)
# 向下滑动
self.swipe_up()
# 滑动之后的页面元素
after_swipe = self.d_page_source()
# logger.info("滑动之后的页面元素%s" % (after_swipe,))
if before_swipe == after_swipe:
break
user_list = [i.split("\")[0] for i in set(xio_xi_list)]
logger.info("获取需要点赞的用户%s" % (user_list,))
return user_list
def go_in_search(self):
# 青少年模式点击,我知道了
iknow = self.get_find_elements(douyinObject.i_ok, timeout=5)
if iknow:
iknow.click()
# 点击消息
self.get_presence_element(douyinObject.xiaoxi).click()
# 进入群聊
self.get_visible_element(douyinObject.qun_name1).click()
# 点击更多
self.get_visible_element(douyinObject.geng_duo).click()
# 等待更多
self.get_visible_element(douyinObject.geng_duo)
# 下滑
self.swipe_down_search_user()
def cao_zuo(self, ming_dan):
# 点击搜索群成员
self.get_visible_element(douyinObject.search_user).click()
# 搜索框输入
self.get_visible_element(douyinObject.search).send_keys(ming_dan)
#
点击第一个用户
self.get_visible_element(douyinObject.user_one).click()
#
点击第一个视频
self.get_visible_element(douyinObject.video_one).click()
time.sleep(10)
# 点击暂定
self.send_button()
#
点击点赞
self.get_visible_element(douyinObject.fabulous).click()
#
收藏
self.get_visible_element(douyinObject.collection).click()
#
评论
self.get_visible_element(douyinObject.comment).click()
self.get_visible_element(douyinObject.comment).send_keys(random.choice(douyinObject.pl))
self.get_visible_element(douyinObject.send).click()
#
分享
self.get_visible_element(douyinObject.forward).click()
# 复制链接
self.get_visible_element(douyinObject.copy_link).click()
# 点击关闭
# self.get_visible_element(douyinObject.close).click()
# 点击返回三次
for i in range(3):
logger.info("切换用户视频,点击返回按钮%s次" % (i + 1,))
self.get_visible_element(douyinObject.fan_hui).click()
def disappear_account(self):
# 等待切换账号出现
for i in range(3):
try:
if self.get_visible_element(douyinObject.switch_account_ing, timeout=5):
# 等待切换账号消息
while True:
try:
self.get_visible_element(douyinObject.switch_account_ing, timeout=5)
time.sleep(2)
except Exception as e:
logger.warning("切换账号消失", repr(e))
break
break
except Exception as e:
logger.warning("第" + str(i) + "次切换账号没有消失", repr(e))
# 切换账号
def switch_account(self, index):
# 点击返回
for i in range(2):
logger.info("切换账号点击返回按钮%s次" % (i + 1,))
self.get_visible_element(douyinObject.fan_hui).click()
# 点击我
self.get_visible_element(douyinObject.we).click()
# 点击切换账号
self.get_visible_element(douyinObject.qie_huan).click()
# 点击切换的用户
self.get_visible_element((By.XPATH, douyinObject.account.format(index))).click()
#
等待切换账号成功
self.disappear_account()
def write_data(self, key, info_data):
with open(self.file_path, encoding="gbk") as jsonFile:
data = json.load(jsonFile)
data[key] = info_data
with open(self.file_path, "w", encoding="gbk") as jsonFile:
json.dump(data, jsonFile, indent=4, ensure_ascii=False)
def load_data(self):
with open(self.file_path, encoding="gbk") as jsonFile:
data = json.load(jsonFile)
return data
@staticmethod
def stop_appium():
# 关闭所有的appium进程
os.system("start /b taskkill /F /t /IM node.exe")
@staticmethod
def start_appium(port, bootstrap, udid):
a = os.popen('netstat -ano | findstr "%s" ' % port)
time.sleep(2)
t1 = a.read()
if "LISTENING" in t1:
print("appium服务已经启动:%s" % t1)
# s = t1.split(" ")
# s1 = [i for i in s if i != '']
# pip = s1[-1].replace("n", "")
else:
# 启动appium服务
# appium -a 127.0.0.1 -p 4740 -U emulator-5554 127.0.0.1:62001 --no-reset
# os.system("start /b appium -a 127.0.0.1 -p %s -U %s --no-reset" % (port, udid))
# appium -a 127.0.0.1 -p 4724 -bp 4725 -U 127.0.0.1:62001
os.system("start /b appium -a 127.0.0.1 -p %s -bp %s -U %s" % (port, bootstrap, udid))
def get_json_text(self, phone):
a = self.load_data()[phone]
if not a:
b = self.load_data()["count"]
b1 = b.copy()
# 对i进行操作
for i in b:
logger.info("现在对%s进行操作" % (i,))
self.cao_zuo(i)
b1.pop(0)
self.write_data(phone, b1)
else:
a1 = a.copy()
for j in a:
# 对j用户进行操作
logger.info("现在对%s进行操作" % (j,))
self.cao_zuo(j)
a1.pop(0)
self.write_data(phone, a1)
desired_caps_phone1 = {"platformName": "Android",
"platformVersion": "7.1.2",
"deviceName": "127.0.0.1:62001",
"appPackage": "com.ss.android.ugc.aweme",
"appActivity": "com.ss.android.ugc.aweme.splash.SplashActivity",
"noReset": "true",
"unicodeKeyboard": 'true',
"resetKeyboard": "true",
"udid": "127.0.0.1:62001"}
desired_caps_phone2 = {"platformName": "Android",
"platformVersion": "10",
"deviceName": "M7KJYL7TKZZHZHSS",
"appPackage": "com.ss.android.ugc.aweme",
"appActivity": "com.ss.android.ugc.aweme.splash.SplashActivity",
"noReset": "true",
"unicodeKeyboard": 'true',
"resetKeyboard": "true",
"udid": "M7KJYL7TKZZHZHSS"}
desired_caps_phone3 = {"platformName": "Android",
"platformVersion": "10",
"deviceName": "D6UCUKQG9DXWCYEY",
"appPackage": "com.ss.android.ugc.aweme",
"appActivity": "com.ss.android.ugc.aweme.splash.SplashActivity",
"noReset": "true",
"unicodeKeyboard": 'true',
"resetKeyboard": "true",
"udid": "D6UCUKQG9DXWCYEY"}
def task1():
Common.stop_appium()
Common.start_appium(4723, 4724, "127.0.0.1:62001")
while True:
try:
app = Common(webdriver.Remote('http://127.0.0.1:4723/wd/hub', desired_caps_phone1))
# 当status的值为true的时候才获取名单
if app.load_data()["status"] == "true":
app.write_data("count", app.get_send_d_y_list())
app.write_data("status", 'false')
# 当前手机的抖音账号
for i in range(2):
if i != 0:
# 切换账号操作, ---当i不等于0的时候才切换账号
app.switch_account(i)
app.go_in_search()
app.get_json_text("phone1")
break
except Exception as e:
print(repr(e))
continue
def task2():
while True:
try:
time.sleep(10)
app = Common(webdriver.Remote('http://127.0.0.1:4725/wd/hub', desired_caps_phone2))
# 当前手机的抖音账号
for i in range(2):
if i != 0:
# 切换账号操作, ---当i不等于0的时候才切换账号
app.switch_account(i)
app.go_in_search()
app.get_json_text("phone2")
break
except Exception as e:
print(repr(e))
continue
def task3():
while True:
try:
time.sleep(10)
app = Common(webdriver.Remote('http://127.0.0.1:4727/wd/hub', desired_caps_phone3))
# 当前手机的抖音账号
for i in range(2):
if i != 0:
# 切换账号操作, ---当i不等于0的时候才切换账号
app.switch_account(i)
app.go_in_search()
app.get_json_text("phone3")
break
except Exception as e:
print(repr(e))
continue
threads = []
t1 = threading.Thread(target=task1)
threads.append(t1)
# t2 = threading.Thread(target=task2)
# threads.append(t2)
#
# t3 = threading.Thread(target=task3)
# threads.append(t3)
if __name__ == '__main__':
for t in threads:
t.start()
最后
以上就是醉熏冬瓜为你收集整理的appium测试的全部内容,希望文章能够帮你解决appium测试所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复