如何在Python中使用ThreadPoolExecutor运行两个测试,并将一个的结果作为参数传递给另一个

0 投票
1 回答
20 浏览
提问于 2025-04-13 01:12

我正在使用ThreadPoolExecutor在Python中同时运行两个测试。不过,我遇到了一些问题。假设有一个主要测试和一个次要测试,它们会同时开始执行,但次要测试可能会先完成。我希望在次要测试完成后,能把它的结果返回来,然后在主要测试中使用这个结果作为参数。因为在我需要这个结果的时候,次要测试已经完成了,但我不知道该怎么实现。

我使用Selenium来进行测试,并且从我的main.py文件中调用这些方法,所以我的代码是这样的。

from concurrent.futures import ThreadPoolExecutor

def PruebaPrincipal(opcion_sistema, opcion_ambiente, opcion_usuario, opcion_especifica, tipo_de_credito, resultado_folio):
                        resultado_folio = resultado_folio_future.result()
                        with Flujos_obligatorios_originacion() as bot1:
                            limpiar_pantalla()
                            bot1.get_url(opcion_sistema, opcion_ambiente)
                            bot1.iniciar_sesion(opcion_ambiente, opcion_usuario)
                            bot1.Crear_Solicitud(tipo_de_credito)
                            resultado_database = bot1.Establecer_base_de_datos(opcion_ambiente)
                            curp_cliente = bot1.Llenar_Formulario_Solicitud(opcion_sistema, opcion_especifica, resultado_database, opcion_ambiente)
                            bot1.Solicitud_Tipo_de_Credito(tipo_de_credito)
                            bot1.Solicitud_Datos_del_Credito(resultado_folio, resultado_database, curp_cliente, tipo_de_credito)
                            bot1.cerrar_sesion()
                            bot1.close()

def PruebaSecundaria(opcion_ambiente, opcion_sistema):
                       with Flujos_obligatorios_originacion() as bot2:
                            resultado_folio_future = bot2.Obtener_Folio_Nav(opcion_ambiente, opcion_sistema)
                            return resultado_folio_future

with ThreadPoolExecutor(max_workers=2) as executor:
                       resultado_folio_future = executor.submit(PruebaSecundaria, opcion_ambiente, opcion_sistema)
                       executor.submit(PruebaPrincipal, opcion_sistema, opcion_ambiente, opcion_usuario, opcion_especifica, tipo_de_credito, resultado_folio_future)

我希望次要测试能给我一个值,以便我可以在主要测试中使用,把它作为参数传递。

1 个回答

1

当然可以。当你使用 executor.submit() 时,会得到一个叫做 Future 的东西。然后,如果你在这个 Future 上调用 .result(),这会让程序等到这个 Future 完成后再继续执行,并返回结果(如果出错了,它会抛出一个异常)。

from concurrent.futures import ThreadPoolExecutor, Future


def Suma(valor1: int, valor2: int):
    resultado_suma = valor1 + valor2
    print(f"El resultado de la suma es: {resultado_suma}")
    return resultado_suma


def Multiplicacion(resultado_suma: Future):
    resultado_multiplicacion = resultado_suma.result() * 2
    print(f"El resultado de la multiplicacion es: {resultado_multiplicacion}")
    return resultado_multiplicacion + 15


with ThreadPoolExecutor(max_workers=2) as executor:
    sum_future = executor.submit(Suma, 5, 5)
    mult_future = executor.submit(Multiplicacion, sum_future)
    print("Final result:", mult_future.result())

这段代码会输出

El resultado de la suma es: 10
El resultado de la multiplicacion es: 20
Final result: 35

撰写回答