网络编程
位置:首页>> 网络编程>> Python编程>> python 基于Appium控制多设备并行执行

python 基于Appium控制多设备并行执行

作者:北漂的雷子  发布时间:2022-12-11 12:00:06 

标签:python,Appium,并行执行

前言:

如何做到,控制多设备并行执行测试用例呢。

思路篇

我们去想下,我们可以获取参数的信息,和设备的信息,那么我们也可以针对每台设备开启不一样的端口服务。那么每个服务都对应的端口,我们在获取设备列表的时候,要和 每个服务对应起来,这样,我们开启一个进城池,我们在进程池里去控制设备,每个进程池 控制不一样的设备即可。

实现篇

首先实现对应的参数篇和对应的设备端口,


def startdevicesApp():
    l_devices_list=[]
    port_list=[]
    alldevices=get_devices()
    if len(alldevices)>0:
        for item in alldevices:
            port=random.randint(1000,6000)
            port_list.append(port)
            desired_caps = {
                    'platformName': 'Android',
                    'deviceName': item,
                    'platformVersion': getPlatForm(item),
                    'appPackage': get_apkname(apk_path),  # 包名
                    'appActivity': get_apk_lautc(apk_path),  # apk的launcherActivity
                    'skipServerInstallation': True,
                "port":port
                }
            l_devices_list.append(desired_caps)
    return  l_devices_list,port_list

  接下来,我们去写一个端口开启服务。


class RunServer(threading.Thread):#启动服务的线程
def __init__(self, cmd):
 threading.Thread.__init__(self)
 self.cmd = cmd
def run(self):
 os.system(self.cmd)
def start(port_list:list):
def __run(url):
 time.sleep(10)
 response = urllib.request.urlopen(url, timeout=5)
 if str(response.getcode()).startswith("2"):
  return True
for i in range(0, len(port_list)):
 cmd = "appium -p %s " % (
  port_list[i])
 if platform.system() == "Windows": # windows下启动server
  t1 =RunServer(cmd)
  p = Process(target=t1.start())
  p.start()
  while True:
   time.sleep(4)
   if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
    break

我们开启服务了,接下来,我们怎样根据不同进程执行测试用例。


def runcase(devics):
#执行测试用例
pass
def run(deviceslist:list):

pool = Pool(len(deviceslist))
for i in deviceslist:
 pool.map(runcase, i)
pool.close()
pool.join()

  接下来,就是我们去组合形成最后的执行的代码。

    最终代码展示


from appium import webdriver
from androguard.core.bytecodes.apk import APK
import os
import random
apk_path = "/Users/lileilei/Downloads/com.tencent.mobileqq_8.5.0_1596.apk"

def get_devices() -> list:
all_devices = []
cmd = "adb devices"
reslut = os.popen(cmd).readlines()[1:]
for item in reslut:
 if item != "\n":
  all_devices.append(str(item).split("\t")[0])
return all_devices

def getPlatForm(dev: str) -> str:
cmd = 'adb -s {} shell getprop ro.build.version.release'.format(dev)
reslut = os.popen(cmd).readlines()[0]
return str(reslut).split("\n")[0]

def get_apkname(apk):
a = APK(apk, False, "r")
return a.get_package()

def get_apk_lautc(apk):
a = APK(apk, False, "r")
return a.get_main_activity()

import platform
from multiprocessing import Process,Pool
import time,urllib.request
import threading
class RunServer(threading.Thread):#启动服务的线程
def __init__(self, cmd):
 threading.Thread.__init__(self)
 self.cmd = cmd
def run(self):
 os.system(self.cmd)
def start(port_list:list):
def __run(url):
 time.sleep(10)
 response = urllib.request.urlopen(url, timeout=5)
 if str(response.getcode()).startswith("2"):
  return True
for i in range(0, len(port_list)):
 cmd = "appium -p %s " % (
  port_list[i])
 if platform.system() == "Windows": # windows下启动server
  t1 =RunServer(cmd)
  p = Process(target=t1.start())
  p.start()
  while True:
   time.sleep(4)
   if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
    break

def startdevicesApp():
l_devices_list=[]
port_list=[]
alldevices=get_devices()
if len(alldevices)>0:
 for item in alldevices:
  port=random.randint(1000,6000)
  port_list.append(port)
  desired_caps = {
    'platformName': 'Android',
    'deviceName': item,
    'platformVersion': getPlatForm(item),
    'appPackage': get_apkname(apk_path), # 包名
    'appActivity': get_apk_lautc(apk_path), # apk的launcherActivity
    'skipServerInstallation': True,
   "port":port
   }
  l_devices_list.append(desired_caps)
return l_devices_list,port_list
def runcase(devics):
#执行测试用例
pass
def run(deviceslist:list):

pool = Pool(len(deviceslist))
for devices in deviceslist:
 pool.map(runcase, devices)
pool.close()
pool.join()
if __name__=="__main__":
l_devices_list,port_list=startdevicesApp()
start(port_list)
run(l_devices_list)

来源:https://www.cnblogs.com/leiziv5/p/14225882.html

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com