Dies ist der Artikel am 25. Tag von Python Advent Calendar 2015 von Adventar.
Dieser Artikel beschreibt die Testwiederholung von Pytest als einfaches Beispiel für die Erstellung einer Job-Pipeline mit Luigi.
Luigi ist ein Job-Pipeline-Konstruktionswerkzeug von Python. Mit Luigi können die folgenden Dinge, die zum Erstellen einer Job-Pipeline erforderlich sind, in Python-Code ausgedrückt werden.
--Aufgabe ausführen
Es scheint, dass das Hauptziel darin besteht, eine Job-Pipeline mit Aufgaben zu erstellen, die einige Zeit in Anspruch nehmen, z. B. die Jobausführung von Hadoop und Spark, das Laden von Daten aus der Datenbank in / usw. und ein Modul zum Verknüpfen mit diesen Tools. Wird standardmäßig unterstützt (http://luigi.readthedocs.org/en/stable/api/luigi.contrib.html#submodules).
Es ist eine unbestreitbare Tatsache, dass es sich in diesem Beispiel um ein Kuhschwert handelt, aber ich hatte das Verdienst, die Pipeline innerhalb eines festen Rahmens erweitern zu können, und habe mich daher entschlossen, sie zu verwenden, um mich daran zu gewöhnen.
Die Grundlagen der Aufgabendefinition von Luigi sind wie folgt.
luigi.Task
erbt.luigi.Task
erbt.
--run (self): Taskausführungsprozess
--requires (self): Aufgabenabhängigkeiten
--output (self): Speichern Sie die Verarbeitung der Ergebnisse der AufgabenausführungDas Folgende ist die Aufgabendefinition zum Ausführen von "pytest".
Aufgabe des Pytests
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
class PytestTask(luigi.Task):
#Aufgabenargumente
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
#Prozess zur Ausführung von Aufgaben
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
# self.output()Sie können den Stream dazu bringen, das Ausführungsergebnis zu schreiben.
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
#Aufgabenabhängigkeiten
#Gibt eine Liste abhängiger Aufgaben zurück.(ex. return [A(), B()])
#Diesmal aus verschiedenen Gründen eine leere Liste(
def requires(self):
return []
#Speichern Sie die Verarbeitung der Ergebnisse der Aufgabenausführung
# luigi.Gibt eine von Target abgeleitete Klasse zurück. Im folgenden Beispiel wird das Ausführungsergebnis im lokalen Dateisystem gespeichert.
# (ex) http://luigi.readthedocs.org/en/stable/api/luigi.html#luigi.Target
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
Dieses Mal wollte ich eine Pipeline bauen, die nicht nur "pytest" von Luigi aus ausführt, sondern auch die folgenden Anforderungen erfüllt und den Test automatisch erneut ausführt.
Luigi kann nicht nur statische Abhängigkeiten hinzufügen, indem oben "erforderlich (selbst)" erforderlich ist, sondern auch abhängig von den Bedingungen dynamisch Aufgabenabhängigkeiten hinzufügen.
Aufgabe zur erneuten Ausführung, wenn der Test nicht erfolgreich war
#Eine Datei, die Tests aufzeichnet, die während des letzten Laufs fehlgeschlagen sind
lastfailed = '.cache/v/cache/lastfailed'
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
#Wenn alle erfolgreich sind, wird ein leeres Wörterbuch generiert
success = bool(not json.load(i))
i.close()
return success
def run(self):
#Einmal ausführen und bei Erfolg beenden
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
#Ab dem zweiten Mal mit der Option lf ausführen
for i in range(0, self.repeat - 1):
# yield <Aufgabeninstanz>Kann dynamische Abhängigkeiten mit hinzufügen
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
#Die Ausführung endet bei Erfolg
if self.is_success(target):
out.write('success')
out.close()
return
#Das Scheitern blieb bis zum Ende
out.write('failure')
out.close()
def output(self):
return luigi.LocalTarget('test_repeats.txt')
Zusätzlich zu der oben beschriebenen Aufgabendefinition lautet das gesamte Programm, das den Pipeline-Startprozess enthält, wie folgt.
pytest_pipeline.py
import json
import os
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE
import luigi
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
lastfailed = '.cache/v/cache/lastfailed'
class PytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
#Die Anzahl der Wiederholungen wird als Argument von außen angegeben
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
success = bool(not json.load(i))
i.close()
return success
def output(self):
return luigi.LocalTarget('test_repeats.txt')
def run(self):
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
for i in range(0, self.repeat - 1):
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
if self.is_success(target):
out.write('success')
out.close()
return
out.write('failure')
out.close()
#Pipeline-Startvorgang
if __name__ == '__main__':
argv = ['RepeatPytestTask']
if len(sys.argv) > 1:
argv.extend(sys.argv[1:])
luigi.run(argv)
Indem Sie dem obigen Programm die Anzahl der Wiederholungen ("--repeat") geben und es ausführen, kann eine Testpipeline realisiert werden, die automatisch neu ausgeführt wird, wenn sie fehlschlägt.
Ausführung der Job-Pipeline
#Luigi wird ausgegeben(self)Wird ausgegeben=Betrachten Sie die zu erledigende Aufgabe.
#Wenn Sie die Aufgabe von Anfang an ausführen möchten, löschen Sie alle Ausgaben.
$ rm -rf test_repeat_1.txt test_repeats.txt test_repeat_2.txt
#Für die Verarbeitung in großem Maßstab kann ein Taskplaner separat erstellt werden.
#Diesmal ist es ein kleiner Prozess, also planen Sie ihn lokal(--local-Scheduler-Option)
# http://luigi.readthedocs.org/en/stable/central_scheduler.html?highlight=scheduler%20server
$ python pytest_pipeline.py --local-scheduler --repeat 3
Recommended Posts