Procesamiento paralelo con Python 3

Procesamiento paralelo con Python 3

Una de mis cosas favoritas de Python es su increíble módulo de subproceso. Sin embargo, por muy bueno que sea, todavía no quiero tener que escribir el mismo código dos veces.

Entonces, cuando trabajo con subprocesos en cualquier idioma, generalmente prefiero crear una clase que pueda manejar todo por mí. Aquí le mostraré cómo trabajar con subprocesos, envolver la lógica en una clase y configurarla para que todo lo que tenga que hacer sea pasar una lista de comandos.

Configuración

Asegúrese de tener Python 3 instalado. Cree un directorio para usarlo como directorio de su proyecto y asígnele el nombre que desee, usaré "proc_manager". Directamente debajo del directorio de su proyecto, cree dos archivos: "test_proc.py" y "test.py". Luego cree otro directorio en la carpeta de su proyecto llamado "clases". Dentro de clases crea un nuevo archivo llamado “__init__.py”, que dejaremos en blanco, y crea otro archivo llamado “proc.py”, que será nuestro archivo de clase. El propósito detrás de la creación de "__init__.py" es notificar a Python e importaremos archivos desde este directorio.

Copiado con éxito.

La estructura de archivos de su proyecto debería verse así:

 /proc_manager prueba_proc.py prueba.py /clases __init__.py proc.py

Copiar al portapapeles

Empezando

Primero, necesitaremos un script de prueba para imitar un subproceso. Sin embargo, lo más importante es que tenga en cuenta el hecho de que cuando realiza un procesamiento en paralelo, muy rara vez los procesos volverán en el mismo orden en que se generaron. Por lo tanto, haremos que el script acepte un argumento entero para definir el tiempo de suspensión para que los procesos se completen en momentos aleatorios.

El código

Pegue lo siguiente en "test.py" y guárdelo.

 importar análisis de argumentos, tiempo analizador = argparse.ArgumentParser(descripción='proceso de prueba') parser.add_argument('sleep', type=int, help='integer que define el tiempo de reposo', default=10) argumentos = analizador.parse_args() tiempo.dormir(argumentos.dormir) print("tiempo dormido:" + str(args.sleep))

Copiar al portapapeles

Lo primero que estoy haciendo es importar dos bibliotecas, primero es argparse para analizar los argumentos que llegan desde la línea de comando, y el otro es time para que podamos llamar a time.sleep mientras pasamos el entero de suspensión. A lo largo de las próximas líneas, estamos instanciando la clase argparse mientras pasamos una descripción, creando la configuración de argparse con el argumento "dormir" y llamando a parse_args que devuelve nuestro diccionario de argumentos. Finalmente, llamamos a time.sleep, le pasamos el argumento "sleep" y generamos "time slept:" concatenado con el argumento sleep. Otra cosa que quizás desee hacer en el futuro si tiene la intención de usar esto como un proceso simulado para la prueba es agregar alguna lógica de falla aleatoria para probar el manejo de errores. Una vez que haya guardado eso, debería poder probarlo con el comando "python3 test.py 5",

A continuación vamos a crear nuestra clase. Dentro del directorio de clases, abra el archivo "proc.py". Lo primero que haremos será agregar nuestras importaciones en la parte superior, así que pegue lo siguiente en la parte superior del archivo.

 de subproceso importar Popen, PIPE desde tempfile importar archivo temporal

Copiar al portapapeles

La primera línea simplemente importa el módulo de subprocesos, y todo lo que queremos de él es Popen para generar procesos y PIPE, que se usa para enviar al búfer. La segunda línea importa el módulo tempfile, que usaremos para la salida estándar porque el búfer de salida a veces no es lo suficientemente grande y puede provocar un punto muerto. Más sobre eso cuando lleguemos a eso.

Puedes leer:  Velocidad vs Rapidez: Diferencias y Aplicaciones

A continuación, pegue la definición de clase y algunas propiedades iniciales necesarias.

 proceso de clase: dicProcessList = {}#lista de procesos inicial dicActiveProcesses = {}#procesos activos actuales dicCompletedProcesses = {}#procesos completados dicProcessOutPuts = {}#salida estándar de los procesos intLimit = 4#limite el número de procesos a ejecutar en un momento dado

Copiar al portapapeles

Aquí estamos definiendo 4 contenedores, tres de los cuales usaremos para hacer malabares con los procesos para determinar qué está pendiente, activo y completado; el cuarto contendrá nuestra salida. También estamos definiendo un límite de enteros que se usará para limitar la cantidad de procesos simultáneos que se ejecutan en un momento dado.

Pegar en el constructor.

 def __init__(self, intLimit: int = 4): self.intLimit = intLimit

Copiar al portapapeles

Esto define el parámetro intLimit, establece la sugerencia de tipo aceptada como un número entero y un valor predeterminado de 4 procesos simultáneos en un momento dado.

Pasado en un método para determinar si estamos al máximo de procesos concurrentes.

 def limitMaxed(self): return len(self.dicActiveProcesses) = self.intLimit

Copiar al portapapeles

Esto hace una cosa simple, pero la lógica será necesaria en un par de lugares diferentes.

A continuación, pegue un método para recorrer nuestros comandos, determine si estamos por debajo del máximo simultáneo y, de ser así, llame a otro método para generar el proceso.

 def spawnProcesses(self): if(no self.limitMaxed()): para strKey en la lista (self.dicProcessList): lstCmd = self.dicProcessList[strKey] self.dicActiveProcesses[strKey] = self.runProcess(lstCmd, strKey) self.dicProcessList.pop(strKey) if(self.limitMaxed()): romper

Copiar al portapapeles

Lo primero que hacemos aquí es verificar el estado máximo concurrente para ver si ya estamos en el máximo, si no, continuar. Aquí estamos recorriendo las claves en el diccionario dicProcessList, que se usará para mantener la asociación con los procesos en ejecución y completados. Luego extraemos el comando de dicProcessesList en una variable local. En la siguiente línea pasamos el comando y su clave asociativa a otro método que devuelve el objeto puntero del subproceso, el cual asignaremos a dicActiveProcesses usando la misma clave asociativa. Luego sacaremos ese proceso de dicProcessList para eliminarlo de la lista de comandos pendientes y, finalmente, verificaremos el estado máximo concurrente para determinar si ese proceso nos lleva al límite. Si estamos en el límite, salga del bucle; de ​​lo contrario, continúe con la siguiente iteración.

Pegue un método para generar nuestros procesos.

 def ejecutarProceso(self, lstCmd, strKey): self.dicProcessOutPuts[strKey] = Archivo Temporal() devuelve Popen(lstCmd, stdout=self.dicProcessOutPuts[strKey], stderr=PIPE)

Copiar al portapapeles

Aquí estamos definiendo un método llamado runProcess, que acepta el comando a ejecutar y una clave entera para mantener la asociación durante todo el proceso. La primera línea del método define un archivo temporal y asigna su puntero a nuestro diccionario dicProcessOutPuts mientras usa la clave como índice. Luego invocamos a Popen pasándole nuestro comando, salida estándar a nuestro archivo temporal y error estándar a PIPE para el búfer de salida. Hay un par de cosas de las que hablar aquí. Primero, el comando, como notará, es una lista en lugar de una cadena. Esto se debe a que llamaremos a Popen con la opción predeterminada de shell=False, que es la forma recomendada de interactuar con el shell a través de Python y ayuda a evitar la inyección de datos no deseada. Definitivamente recomiendo investigar un poco en esta área y siempre usar shell=False. Mi consejo es, si alguna vez te encuentras en una posición en la que quieres usar shell=True para hacer que algo funcione, simplemente no lo hagas. La forma en que está estructurado el comando, el primer elemento de la matriz es el programa que se invocará, y cada elemento después de 0 es un parámetro que se pasará al programa. La otra cosa que necesito mencionar aquí es el razonamiento detrás del uso de un archivo temporal para la salida estándar. En la mayoría de las partes de la red, encontrará desarrolladores que simplemente usan PIPE para la salida estándar sin mucha explicación, y eso está bien si sabe que la salida estándar será lo suficientemente pequeña como para ser manejada por el búfer de salida. El problema con ese concepto aquí es que estamos desarrollando un proceso dinámico que debería poder manejar todos los escenarios, incluidos aquellos con suficiente estándar para inundar el búfer de salida y bloquear el proceso.

Puedes leer:  Encapsulamiento en Programación

Pegue un método para verificar el estado de nuestros procesos, migrar los procesos completados al diccionario de procesos completos y asignar la salida estándar al diccionario de salida.

 def pollProcesses(self): para strKey en la lista (self.dicActiveProcesses): proc = self.dicActiveProcesses[strKey] si proc.poll() no es Ninguno: (strNone, strStdErr) = proc.communicate() self.dicProcessOutPuts[strKey].seek(0) strStdOut = self.dicProcessOutPuts[strKey].read() self.dicProcessOutPuts[strKey].close() self.dicCompletedProcesses[strKey] = {"stdout":strStdOut, "stderr":strStdErr, "retcode":proc.returncode} self.dicActiveProcesses.pop(strKey)

Copiar al portapapeles

Aquí estamos recorriendo las claves de cada proceso, extrayendo el objeto de subproceso del diccionario de procesos activos y luego ejecutando proc.poll para determinar si el proceso está completo o no. Si el proceso devuelve algo menos Ninguno, llamamos a proc.communicate para obtener el error estándar. Estoy usando strNone en lugar de donde normalmente vería una variable para la salida estándar para indicar que no obtendremos nada de la salida estándar ya que estamos usando un archivo temporal. Luego llamamos a seek en nuestro archivo temporal y le pasamos 0 para mover el puntero de regreso al principio del archivo, extraemos el contenido en la variable strStdOut y cerramos el recurso del archivo temporal. Luego, asignamos la salida estándar, el error estándar y el código de retorno al diccionario de procesos completados mientras sacamos el proceso del diccionario de procesos activos para dejar espacio para el siguiente proceso.

Pegue un método para recuperar información asociada con los procesos completados.

 def getProcessData(self, strKey: str = False): si (no strKey): devolver self.dicCompletedProcesses elif strKey en self.dicCompletedProcesses: devolver self.dicCompletedProcesses[strKey] demás: falso retorno

Copiar al portapapeles

Este método toma un parámetro, que es un índice entero con un valor predeterminado de False. Hemos mantenido la clave asociada a la lista de comandos original pasada al método de ejecución, que cubriré después de este segmento. Si strKey es False o no se pasó nada al método, simplemente devolveremos la lista completa de datos; de lo contrario, buscaremos en el diccionario completo la clave solicitada y, si está presente, devolveremos ese elemento; de lo contrario, devolveremos False.

Finalmente, pegue el método de ejecución.

 def ejecutar(self, dicProcessList: dict): self.dicProcessList = dicProcessList mientras que es cierto: self.spawnProcesses() self.pollProcesses() if(len(self.dicProcessList) == 0 and len(self.dicActiveProcesses) == 0): romper

Copiar al portapapeles

En este método aceptamos el diccionario dicProcessList, que es la lista de comandos inicial. Configuré esto como un diccionario en lugar de solo una lista para que los desarrolladores puedan usar índices de cadenas más significativos en lugar de numéricos si lo prefieren. En este punto, simplemente llamamos a spawnProcesses y pollProcesses en un ciclo hasta que el diccionario de procesos activos y el diccionario de comandos originales estén vacíos, y una vez que ese es el caso, rompemos.

Ahora abra el archivo llamado "test.py" y copie y pegue el siguiente código en él.

 importar sistema operativo, aleatorio de clases.proc importación Proc strCurrentDir = os.path.dirname(os.path.abspath(__file__)) dicProcesos = {} para i en el rango (5): dicProcesses["proceso" + str(i)] = ["python3", os.path.join(strCurrentDir, "test.py"), str(random.randint(2, 10))] objPrcessManager = Proc() objPrcessManager.run(dicProcesos) imprimir(objPrcessManager.getProcessData("proceso2")) imprimir(objPrcessManager.getProcessData())

Copiar al portapapeles

Este es solo un fragmento de código que probará nuestra clase. Importaremos os, random y luego nuestra clase desde el directorio de clases en la línea 3. La variable strCurrentDirectory contendrá el directorio de este archivo, que luego se puede usar en nuestros subcomandos para hacer referencia a nuestro script test.py creado al principio. Defino el diccionario dicProcesses, recorro un número aleatorio que he establecido en un máximo de 5 bucles y luego creo el diccionario de comandos que se pasará a la clase proc. Luego creo una instancia de la clase Proc, llamo a ejecutar mientras le paso nuestro diccionario y luego llamo a getProcessData en nuestro objeto una vez que se completa el método de ejecución.

Si te ha gustado este artículo puedes leer más como este en: Desarrollo.

Maria Rodriguez

Seguir leyendo

Subir