23 de abril de 2013

Multicore Blast con Python

El otro día Bruno publicó en una entrada del blog un script en Perl para optimizar el tiempo de cálculo de Blast en ordenadores con procesadores multicore. Un gran script, pero creo que se puede hacer más sencillo usando el módulo subprocess de Python.

Bruno usó el módulo Parallel::ForkManager de Perl. en el pasado ya habíamos mostrado en otra entrada las ventajas y peligros de paralelizar procesos con Perl con el módulo threads.

La solución con Python creo que es la más sencilla, consiste en usar el módulo subprocess. Otro módulo de Python que podríamos usar sería multiprocessing, pero no es muy sencillo retornar el standard output de programas externos como Blast para un procesamiento posterior de los resultados.

En el siguiente ejemplo se paraleliza la ejecución del comando 'blastp -version' que retorna un texto con la versión instalada de blastp. Dicho comando se puede cambiar por cualquier otro, así como añadir código al script para procesar los resultados. En el ejemplo se ejecuta el comando 10 veces con 4 procesos simultáneos, chequeando si han terminado cada 2 segundos.

 #!/usr/bin/python  
 # -*- coding: utf-8 -*-   
 #  
   
 # Importar módulos  
 import time  
 import os, sys  
 import subprocess  
 from pprint import pprint  
   
 maximum_process_number = 4 # Número máximo de procesos simultáneos  
 checking_processes_interval = 2 # (segundos) Tiempo de espera para checkear si han terminado los procesos  
 data_to_process = range(10) # Datos a procesar, como ejemplo una lista de números del 0 al 9  
   
 # El proceso a ejecutar (como ejemplo se muestran los datos de la version de Blastp)  
 # Se puede cambiar por cualquier otro programa y comando que no sea Blast  
 def run_process(processes) :  
      print "Running process"  
      processes.append(subprocess.Popen("blastp -version", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE))  
   
 # Programa principal  
 if __name__ == '__main__':  

      # Definir variables  
      count_processes = 0  
      finished_processes = []  
      processes = []  
      results = {}

      # Procesar múltiples datos  
      for i in data_to_process:

           # Chequear si los procesos han terminado (cuando hay tantos procesos como el máximo permitido)  
           while count_processes == maximum_process_number:  
                for p in processes:  
                     p.wait()  
                     if p.returncode == 0:  
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process finished"  
                     else:   
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process failed"  
                # Si no han terminado, esperar el tiempo establecido  
                if count_processes == maximum_process_number:  
                     print "Waiting for finished processes"  
                     time.sleep(checking_processes_interval)  

           # Añadir procesos a la cola de ejecución  
           run_process(processes)  
           count_processes += 1  
           print "Process started", i, count_processes  

      # Chequear si los procesos han terminado los últimos procesos  
      while count_processes != 0:  
           for p in processes:  
                p.wait()  
                if p.returncode == 0:  
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Last processes finishing"  
                else:   
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Process failed"  
           if count_processes == maximum_process_number:  
                print count_processes,maximum_process_number  
                print "Waiting for last finished processes"  
                time.sleep(checking_processes_interval) 
 
      # Imprimir resultados  
      print "Output from the processes:"  
      pprint(results)  

No hay comentarios:

Publicar un comentario