Python ist eine der beliebtesten Sprachen für Datenverarbeitung und Datenwissenschaft im Allgemeinen. Das Ökosystem bietet viele Bibliotheken und Frameworks, die das Hochleistungs-Computing ermöglichen. Parallele Programmierung in Python kann sich jedoch als recht schwierig herausstellen.
In diesem Lernprogramm werden wir untersuchen, warum Parallelität besonders im Python-Kontext schwierig ist. Dazu werden wir Folgendes durchgehen:
Das Global Interpreter Lock (GIL) ist eines der umstrittensten Themen in der Python-Welt. In CPython, der beliebtesten Implementierung von Python, ist die GIL ein Mutex, der die Dinge Thread-sicher macht. Die GIL erleichtert die Integration in externe Bibliotheken, die nicht threadsicher sind, und macht nicht parallelen Code schneller. Dies kostet jedoch einen Preis. Aufgrund der GIL können wir durch Multithreading keine echte Parallelität erreichen. Grundsätzlich können zwei verschiedene native Threads desselben Prozesses Python-Code nicht gleichzeitig ausführen.
Die Dinge sind jedoch nicht so schlimm, und hier ist der Grund: Was außerhalb des GIL-Bereichs geschieht, ist frei, parallel zu sein. In diese Kategorie fallen lang andauernde Aufgaben wie E / A und zum Glück Bibliotheken wie numpy
.
Python ist also nicht wirklich Multithreading. Aber was ist ein Thread? Lassen Sie uns einen Schritt zurückgehen und die Dinge aus der Perspektive betrachten.
Ein Prozess ist eine grundlegende Abstraktion des Betriebssystems. Es ist ein Programm, das gerade ausgeführt wird, also Code, der ausgeführt wird. Auf einem Computer werden immer mehrere Prozesse ausgeführt, und sie werden parallel ausgeführt.
Ein Prozess kann mehrere Threads haben. Sie führen denselben Code aus, der zum übergeordneten Prozess gehört. Im Idealfall laufen sie parallel, aber nicht unbedingt. Der Grund, warum Prozesse nicht ausreichen, ist, dass Anwendungen reagieren müssen und auf Benutzeraktionen achten müssen, während die Anzeige aktualisiert und eine Datei gespeichert wird.
Wenn das noch ein bisschen unklar ist, hier ein Cheatsheet:
PROZESSE | GEWINDE |
---|---|
Prozesse teilen sich nicht den Speicher | Threads teilen sich den Speicher |
Laich- / Wechselprozesse sind teuer | Das Laichen / Schalten von Fäden ist weniger teuer |
Prozesse erfordern mehr Ressourcen | Threads benötigen weniger Ressourcen (werden manchmal auch als „leichte Prozesse“ bezeichnet) |
Es ist keine Speichersynchronisation erforderlich | Sie müssen Synchronisationsmechanismen verwenden, um sicherzustellen, dass Sie die Daten korrekt verarbeiten |
Es gibt kein Rezept, das für alles geeignet ist. Die Auswahl einer Option hängt stark vom Kontext und der Aufgabe ab, die Sie erreichen möchten.
Jetzt gehen wir noch einen Schritt weiter und tauchen in die Parallelität ein. Parallelität wird oft missverstanden und für Parallelität gehalten. Das ist nicht der Fall. Parallelität bedeutet, dass ein unabhängiger Code für die Zeitplanung kooperativ ausgeführt wird. Nutzen Sie die Tatsache, dass ein Stück Code auf E / A-Vorgänge wartet, und führen Sie während dieser Zeit einen anderen, aber unabhängigen Teil des Codes aus.
In Python können wir ein leichtes gleichzeitiges Verhalten über Greenlets erreichen. Aus Sicht der Parallelisierung ist die Verwendung von Threads oder Greenlets gleichwertig, da keines von ihnen parallel ausgeführt wird. Das Erstellen von Greenlets ist noch kostengünstiger als Threads. Aus diesem Grund werden Greenlets häufig für die Durchführung einer großen Anzahl einfacher E / A-Aufgaben verwendet, wie sie normalerweise in Netzwerk- und Webservern zu finden sind.
Da wir nun den Unterschied zwischen Threads und Prozessen (parallel und gleichzeitig) kennen, können wir veranschaulichen, wie unterschiedliche Aufgaben in den beiden Paradigmen ausgeführt werden. Folgendes tun wir: Wir werden mehrmals eine Aufgabe außerhalb der GIL und eine innerhalb dieser ausführen. Wir führen sie seriell aus, verwenden Threads und verwenden Prozesse. Definieren wir die Aufgaben:
Import OS Importzeit Import Threading Import Multiprocessing NUM_WORKERS = 4 def only_sleep (): "" "Nichts tun, warten, bis ein Zeitgeber abläuft" "" print ("PID:% s, Prozessname:% s, Threadname:% s.) "% (os.getpid (), multiprocessing.current_process (). name, threading.current_thread (). name)) time.sleep (1) def crunch_numbers ():" "" "" "" einige Berechnungen ausführen "" "(" PID :% s, Prozessname:% s, Threadname:% s "% (os.getpid (), multiprocessing.current_process (). name, threading.current_thread (). name)) x = 0, während x < 10000000: x += 1
Wir haben zwei Aufgaben erstellt. Beide sind langwierig, aber nur crunch_numbers
führt aktiv Berechnungen durch. Lass uns laufen nur Schlaf
seriell, multithreaded und mit mehreren Prozessen und vergleichen Sie die Ergebnisse:
## Tasks seriell starten start_time = time.time () für _ im Bereich (NUM_WORKERS): only_sleep () end_time = time.time () print ("Serial time =", end_time - start_time) # Führt Tasks mit den Threads start_time = time aus .time () thread = [threading.Thread (target = only_sleep) für _ im Bereich (NUM_WORKERS)] [thread.start () für Thread in Threads] [Thread.join () für Thread in Threads] end_time = time.time () print ("Threads time =", end_time - start_time) # Ausführen von Tasks mit den Prozessen start_time = time.time () processing = [multiprocessing.Process (target = only_sleep ()) für _ im Bereich (NUM_WORKERS)] [Prozess. start () für Prozess in Prozessen] [process.join () für Prozess in Prozessen] end_time = time.time () print ("Parallel time =", end_time - start_time)
Hier ist die Ausgabe, die ich bekommen habe (Ihre sollte ähnlich sein, obwohl PIDs und Zeiten etwas variieren):
PID: 95726, Prozessname: MainProcess, Threadname: MainThread PID: 95726, Prozessname: MainProcess, Threadname: MainThread PID: 95726, Prozessname: MainProcess, Threadname: MainThread PID: 95726, Prozessname: MainProcess, Threadname : MainThread Serial time = 4.018089056015015 PID: 95726, Prozessname: MainProcess, Threadname: Thread-1 PID: 95726, Prozessname: MainProcess, Threadname: Thread-2 PID: 95726, Prozessname: Thread-: 3 PID: 95726, Prozessname: MainProcess, Thread-Name: Thread-4-Thread-Zeit = 1.0047411918640137 PID: 95728, Prozessname: Process-1, Thread-Name: MainThread PID: 95729, Prozessname: Process-2, Thread-Name: MainThread PID: 95730, Prozessname: Process-3, Threadname: MainThread PID: 95731, Prozessname: Process-4, Threadname: MainThread Parallele Zeit = 1.014023780822754
Hier einige Beobachtungen:
Im Falle der serieller Ansatz, Dinge sind ziemlich offensichtlich. Wir führen die Aufgaben nacheinander aus. Alle vier Läufe werden von demselben Thread des gleichen Prozesses ausgeführt.
Prozesse nutzen Wir reduzieren die Ausführungszeit auf ein Viertel der ursprünglichen Zeit, einfach weil die Aufgaben parallel ausgeführt werden. Beachten Sie, wie jede Aufgabe in einem anderen Prozess ausgeführt wird Haupt-Bedroung
dieses Prozesses.
Threads verwenden wir nutzen die tatsache, dass die aufgaben gleichzeitig ausgeführt werden können. Die Ausführungszeit wird ebenfalls auf ein Viertel reduziert, obwohl nichts parallel läuft. Das geht so: Wir erzeugen den ersten Thread und warten darauf, dass der Timer abläuft. Wir unterbrechen die Ausführung, warten, bis der Timer abläuft, und in dieser Zeit erzeugen wir den zweiten Thread. Wir wiederholen dies für alle Threads. Zu einem Zeitpunkt läuft der Timer des ersten Threads ab, so dass die Ausführung darauf umgestellt und beendet wird. Der Algorithmus wird für den zweiten und für alle anderen Threads wiederholt. Am Ende ist das Ergebnis so, als ob die Dinge parallel laufen würden. Sie werden auch feststellen, dass die vier verschiedenen Threads aus demselben Prozess heraus verzweigen und darin leben: Hauptprozess
.
Möglicherweise stellen Sie sogar fest, dass der Thread-Ansatz schneller ist als der wirklich parallele. Das liegt an dem Aufwand von Laichprozessen. Wie bereits erwähnt, ist das Laichen und Wechseln von Prozessen ein kostspieliger Vorgang.
Lass uns die gleiche Routine machen, aber diesmal die crunch_numbers
Aufgabe:
start_time = time.time () für _ im Bereich (NUM_WORKERS): crunch_numbers () end_time = time.time () print ("Serial time =", end_time - start_time) start_time = time.time () thread = [threading.Thread (target = crunch_numbers) für _ im Bereich (NUM_WORKERS)] [thread.start () für Thread in Threads] [Thread.join () für Thread in Threads] end_time = time.time () print ("Threads time =", end_time - start_time) start_time = time.time () verarbeitet = [Multiprocessing.Process (target = crunch_numbers) für _ im Bereich (NUM_WORKERS)] [process.start () für Prozess in Prozessen] [process.join () für den Prozess in verarbeitet] end_time = time.time () print ("Parallel time =", end_time - start_time)
Hier ist die Ausgabe, die ich habe:
PID: 96285, Prozessname: MainProcess, Threadname: MainThread PID: 96285, Prozessname: MainProcess, Threadname: MainThread PID: 96285, Prozessname: MainProcess, Threadname: MainThread PID: 96285, Prozessname: MainProcess, Threadname : MainThread Serial time = 2.705625057220459 PID: 96285, Prozessname: MainProcess, Threadname: Thread-1 PID: 96285, Prozessname: MainProcess, Threadname: Thread-2 PID: 96285, Prozessname: Thread- 3 PID: 96285, Prozessname: MainProcess, Thread-Name: Thread-4-Thread-Zeit = 2.6961309909820557 PID: 96289, Prozessname: Process-1, Thread-Name: MainThread PID: 96290, Prozessname: Process-2, Thread-Name: MainThread PID: 96291, Prozessname: Process-3, Threadname: MainThread PID: 96292, Prozessname: Process-4, Threadname: MainThread Parallele Zeit = 0.8014059066772461
Der Hauptunterschied liegt hier im Ergebnis des Multithread-Ansatzes. Diesmal funktioniert es sehr ähnlich wie beim seriellen Ansatz. Der Grund dafür ist: Da es Berechnungen ausführt und Python keine echte Parallelität durchführt, laufen die Threads im Wesentlichen nacheinander und führen die Ausführung aufeinander aus, bis sie alle beendet sind.
Python verfügt über umfangreiche APIs für parallele / gleichzeitige Programmierung. In diesem Tutorial behandeln wir die beliebtesten, aber Sie müssen wissen, dass es für jeden Bedarf in dieser Domäne wahrscheinlich bereits etwas gibt, das Ihnen dabei helfen kann, Ihr Ziel zu erreichen.
Im nächsten Abschnitt erstellen wir eine praktische Anwendung in vielen Formen. Dabei werden alle vorgestellten Bibliotheken verwendet. Hier sind die Module / Bibliotheken, die wir behandeln werden:
Einfädeln
: Die Standardmethode für das Arbeiten mit Threads in Python. Es handelt sich um einen API-Wrapper auf höherer Ebene, der über die von der bereitgestellten Funktionalität verfügt _Faden
Dieses Modul ist eine einfache Schnittstelle über die Thread-Implementierung des Betriebssystems.
gleichzeitige Merkmale
: Ein Modulteil der Standardbibliothek, der eine übergeordnete Abstraktionsebene über Threads bereitstellt. Die Threads werden als asynchrone Aufgaben modelliert.
Multiprocessing
: Ähnlich wie Einfädeln
Modul, das eine sehr ähnliche Schnittstelle bietet, jedoch Prozesse anstelle von Threads verwendet.
Gevent und Greenlets
: Greenlets, auch Micro-Threads genannt, sind Ausführungseinheiten, die gemeinsam geplant werden können und Aufgaben ohne viel Aufwand gleichzeitig ausführen können.
Sellerie
: Eine verteilte Task-Warteschlange auf hoher Ebene. Die Aufgaben werden in Warteschlange gestellt und gleichzeitig ausgeführt, wobei verschiedene Paradigmen wie verwendet werden Multiprocessing
oder Gevent
.
Die Theorie zu kennen ist schön und gut, aber der beste Weg zu lernen ist, etwas Praktisches zu bauen, oder? In diesem Abschnitt werden wir eine klassische Anwendung entwickeln, die alle verschiedenen Paradigmen durchläuft.
Lassen Sie uns eine Anwendung erstellen, die die Verfügbarkeit von Websites überprüft. Es gibt viele solcher Lösungen, von denen die bekanntesten wahrscheinlich Jetpack Monitor und Uptime Robot sind. Der Zweck dieser Apps besteht darin, Sie zu benachrichtigen, wenn Ihre Website inaktiv ist, sodass Sie schnell Maßnahmen ergreifen können. So funktionieren sie:
Deshalb ist es wichtig, dass Sie das Problem parallel und gleichzeitig angehen. Wenn die Liste der Websites wächst, kann die serielle Durchsicht der Liste nicht garantieren, dass jede Website alle fünf Minuten überprüft wird. Die Websites könnten stundenlang ausfallen und der Besitzer wird nicht benachrichtigt.
Beginnen wir mit dem Schreiben einiger Hilfsprogramme:
# utils.py Importzeit Importprotokollierung Importanfragen der Klasse WebsiteDownException (Ausnahme): pass def ping_website (Adresse, Timeout = 20): "" "Überprüfen Sie, ob eine Website inaktiv ist. Eine Website wird als inaktiv betrachtet, wenn entweder status_code> = 400 oder Wenn das Zeitlimit abgelaufen ist. Eine WebsiteDownException auslösen, wenn eine der Bedingungen für das Herunterfahren der Website "" "erfüllt ist: try = response.head (Adresse, Zeitlimit = Zeitlimit), wenn response.status_code> = 400: logging.warning ("% s wurde zurückgegeben status_code =% s "% (address, response.status_code)) erhöht WebsiteDownException (), mit der Ausnahme von request.exceptions.RequestException: logging.warning (" Zeitüberschreitung für Website% s abgelaufen "%). WebsiteDownException () def notify_owner (Adresse) erhöhen: "" "Senden Sie dem Besitzer der Adresse eine Benachrichtigung, dass seine Website nicht verfügbar ist. Jetzt werden wir nur 0,5 Sekunden lang schlafen, aber hier würden Sie eine E-Mail-, Pushbenachrichtigungs- oder Textnachrichtenprotokollierung senden. info ("% s website des Inhabers benachrichtigen"% address) time.sleep (0.5) def check_webs ite (address): "" "Dienstprogrammfunktion: Überprüfen Sie, ob eine Website inaktiv ist. Benachrichtigen Sie den Benutzer" "". versuchen Sie es: ping_website (address) außer WebsiteDownException: notify_owner (address)
Wir benötigen eine Website-Liste, um unser System auszuprobieren. Erstellen Sie Ihre eigene Liste oder verwenden Sie meine:
# websites.py WEBSITE_LIST = ["http://envato.com", "http://amazon.co.uk", "http://amazon.com", "http://facebook.com", " http://google.com ',' http://google.fr ',' http://google.es ',' http://google.co.uk ',' http://internet.org ' , 'http://gmail.com', 'http://stackoverflow.com', 'http://github.com', 'http://heroku.com', 'http: // really-cool- available-domain.com "," http://djangoproject.com "," http://rubyonrails.org "," http://basecamp.com "," http://trello.com "," http: //yiiframework.com ',' http://shopify.com ',' http://another-really-interesting-domain.co ',' http://airbnb.com ',' http: // instagram. com ',' http://snunteer.com ',' http://youtube.com ',' http://baidu.com ',' http://yahoo.com ',' http: // live. com ',' http://linkedin.com ',' http://yandex.ru ',' http://netflix.com ',' http://wordpress.com ',' http: // bing. com ',]
Normalerweise sollten Sie diese Liste zusammen mit den Kontaktinformationen des Besitzers in einer Datenbank aufbewahren, damit Sie mit ihnen Kontakt aufnehmen können. Da dies nicht das Hauptthema dieses Tutorials ist, verwenden wir zur Vereinfachung nur diese Python-Liste.
Wenn Sie wirklich gut aufgepasst haben, haben Sie vielleicht zwei wirklich lange Domains in der Liste entdeckt, die keine gültigen Websites sind (ich hoffe, dass niemand sie gekauft hat, wenn Sie dies hier lesen, um mich als falsch zu bezeichnen!). Ich habe diese beiden Domains hinzugefügt, um sicher zu sein, dass bei jedem Durchlauf einige Websites verfügbar sind. Nennen wir auch unsere App UptimeSquirrel.
Lassen Sie uns zunächst den seriellen Ansatz ausprobieren und sehen, wie schlecht er arbeitet. Wir werden dies als Basis betrachten.
# serial_squirrel.py import time start_time = time.time () für Adresse in WEBSITE_LIST: check_website (address) end_time = time.time () print ("Zeit für SerialSquirrel:% ssecs"% (end_time - start_time)) # WARNUNG: root : Zeitüberschreitung für Website http://really-cool-available-domain.com # WARNUNG: root: Zeitüberschreitung für Website http://another-really-interesting-domain.co # WARNUNG: root: Website http: // bing.com lieferte status_code = 405 # Zeit für SerialSquirrel: 15.881232261657715secs
Mit der Implementierung des Threaded-Ansatzes werden wir etwas kreativer. Wir verwenden eine Warteschlange, um die Adressen einzufügen und Arbeitsthreads zu erstellen, um sie aus der Warteschlange zu entfernen und sie zu verarbeiten. Wir werden warten, bis die Warteschlange leer ist, was bedeutet, dass alle Adressen von unseren Arbeitsthreads verarbeitet wurden.
# thread_squirrel.py Importzeit aus Warteschlange importieren Warteschlange aus Threading importieren Thread NUM_WORKERS = 4 task_queue = Queue () def worker (): # Überprüfen Sie die Warteschlange ständig auf Adressen, während sie wahr ist: address = task_queue.get () check_website (address) # Mark Die verarbeitete Aufgabe als erledigt. task_queue.task_done () start_time = time.time () # Erzeuge die Worker-Threads = [Thread (Ziel = Worker) für _ im Bereich (NUM_WORKERS)] # Fügen Sie der Task-Warteschlange die Websites hinzu [task_queue]. put (item) für ein Element in WEBSITE_LIST] # Alle Arbeiter starten [thread.start () für Thread in Threads] # Warten Sie, bis alle Tasks in der Warteschlange verarbeitet sind. task_queue.join () end_time = time.time () print ("Zeit für ThreadedSquirrel:% ssecs"% (end_time - start_time)) # WARNUNG: root: Zeitüberschreitung für Website http://really-cool-available-domain.com # WARNUNG: Root: Zeitüberschreitung für Website http: / /another-really-interesting-domain.co # WARNUNG: root: Website http://bing.com hat status_code = 405 # Zeit für ThreadedSquirrel: 3.1107530 zurückgegeben 59387207secs
Wie bereits erwähnt, gleichzeitige Merkmale
ist eine übergeordnete API für die Verwendung von Threads. Der Ansatz, den wir hier verfolgen, impliziert die Verwendung von a ThreadPoolExecutor
. Wir werden Aufgaben in den Pool einreichen und Futures zurückbekommen, welche Ergebnisse uns zukünftig zur Verfügung stehen werden. Natürlich können wir darauf warten, dass alle Futures zu tatsächlichen Ergebnissen werden.
# future_squirrel.py import time import concurrent.futures NUM_WORKERS = 4 start_time = time.time () mit concurrent.futures.ThreadPoolExecutor (max_workers = NUM_WORKERS) als Ausführender: futures = executor.submit (check_website, address) für die Adresse in WEBSITE_LIST. concurrent.futures.wait (futures) end_time = time.time () print ("Zeit für FutureSquirrel:% ssecs"% (end_time - start_time)) # WARNUNG: root: Zeitüberschreitung für Website http: // really-cool-available -domain.de # WARNUNG: root: Zeitüberschreitung für Website http://another-really-interesting-domain.co # WARNUNG: Root: Website http://bing.com hat status_code = 405 # Zeit für FutureSquirrel: 1.812899112701416secs zurückgegeben
Das Multiprocessing
Die Bibliothek stellt eine fast Drop-In-Ersatz-API für die Einfädeln
Bibliothek. In diesem Fall werden wir dem Ansatz ähnlicher vorgehen gleichzeitige Merkmale
ein. Wir richten ein Multiprocessing.Pool
und übermitteln Sie Aufgaben, indem Sie der Liste der Adressen eine Funktion zuordnen (denken Sie an den klassischen Python) Karte
Funktion).
# multiprocessing_squirrel.py import time import socket import multiprocessing NUM_WORKERS = 4 start_time = time.time () mit multiprocessing.Pool (Prozesse = NUM_WORKERS) als pool: results = pool.map_async (check_website, WEBSITE_LIST) results_waist () end_time = .time () print ("Zeit für MultiProcessingSquirrel:% ssecs"% (end_time - start_time)) # WARNUNG: root: Zeitüberschreitung für Website http://really-cool-available-domain.com # WARNUNG: Root: Zeitüberschreitung abgelaufen für Website http://another-really-interesting-domain.co # WARNUNG: root: Website http://bing.com hat status_code = 405 # Zeit für MultiProcessingSquirrel: 2.8224599361419678secs zurückgegeben
Gevent ist eine beliebte Alternative, um massive Parallelität zu erreichen. Es gibt einige Dinge, die Sie vor der Verwendung wissen müssen:
Code, der gleichzeitig von Greenlets ausgeführt wird, ist deterministisch. Im Gegensatz zu den anderen vorgestellten Alternativen garantiert dieses Paradigma, dass Sie für zwei identische Läufe immer die gleichen Ergebnisse in derselben Reihenfolge erhalten.
Sie müssen die Standardfunktionen der Patches anpassen, damit sie mit gevent zusammenarbeiten. Das meine ich damit. Normalerweise blockiert ein Socket-Vorgang. Wir warten, bis die Operation abgeschlossen ist. In einer Multithread-Umgebung würde der Scheduler einfach zu einem anderen Thread wechseln, während der andere auf E / A wartet. Da wir uns nicht in einer Multithread-Umgebung befinden, korrigiert gevent die Standardfunktionen, sodass sie nicht blockieren und die Kontrolle an den gevent-Scheduler zurückgeben.
Um gevent zu installieren, führen Sie Folgendes aus: pip install gevent
So verwenden Sie gevent, um unsere Aufgabe mit a auszuführen gevent.pool.Pool
:
# green_squirrel.py importiere Zeit von gevent.pool import Pool von gevent import monkey # Beachten Sie, dass Sie mit gevent viele Worker erzeugen können, da die Kosten für das Erstellen und Wechseln sehr niedrig sind. NUM_WORKERS = 4 # Monkey-Patch-Socket-Modul für HTTP-Anforderungen. patch_socket () start_time = time.time () pool = pool (NUM_WORKERS) für die Adresse in WEBSITE_LIST: pool.spawn (check_website, address) # Warten auf den Abschluss von pool.join () end_time = time.time () print (" Zeit für GreenSquirrel:% ssecs "% (end_time - start_time)) # Zeit für GreenSquirrel: 3.8395519256591797secs
Sellerie ist ein Ansatz, der sich meistens von dem unterscheidet, was wir bisher gesehen haben. Es wird in sehr komplexen und leistungsstarken Umgebungen getestet. Die Einrichtung von Celery erfordert etwas mehr Bastelarbeit als alle oben genannten Lösungen.
Zuerst müssen wir Celery installieren:
pip installieren Sie Sellerie
Aufgaben sind die zentralen Konzepte im Celery-Projekt. Alles, was Sie in Celery ausführen möchten, muss eine Aufgabe sein. Celery bietet große Flexibilität für die Ausführung von Aufgaben: Sie können diese synchron oder asynchron, in Echtzeit oder geplant auf demselben Computer oder auf mehreren Computern ausführen und Threads, Prozesse, Eventlet oder Gevent verwenden.
Die Anordnung wird etwas komplexer. Celery verwendet andere Dienste zum Senden und Empfangen von Nachrichten. Diese Nachrichten sind normalerweise Aufgaben oder Ergebnisse von Aufgaben. Wir werden in diesem Tutorial Redis für diesen Zweck verwenden. Redis ist eine großartige Wahl, da es sehr einfach zu installieren und zu konfigurieren ist und es durchaus möglich ist, dass Sie es bereits in Ihrer Anwendung für andere Zwecke verwenden, z. B. für Caching und Pub / Sub.
Sie können Redis installieren, indem Sie die Anweisungen auf der Schnellstartseite von Redis befolgen. Vergessen Sie nicht, das zu installieren Redis
Python-Bibliothek, pip install redis
, und das erforderliche Paket für die Verwendung von Redis und Sellerie: Pip Install Sellerie [Redis]
.
Starten Sie den Redis-Server wie folgt: $ redis-server
Um mit Celery etwas zu bauen, müssen wir zuerst eine Celery-Anwendung erstellen. Danach muss Celery wissen, welche Aufgaben es ausführen kann. Um dies zu erreichen, müssen wir Aufgaben in der Celery-Anwendung registrieren. Wir machen das mit der @ app.task
Dekorateur:
# celler_squirrel.py Importzeit aus utils import_checkwebsite aus Daten importieren WEBSITE_LIST aus Sellerie importieren Sellerie aus Sellerie.ergebnis importieren ResultSet app = Sellerie ('sellerie_squirrel', broker = "redis: // localhost: 6379/0", backend = "redis : // localhost: 6379/0 ") @ app.task def check_website_task (address): return check_website (address) if __name__ ==" __main__ ": start_time = time.time () # Mit 'delay' wird die Task async rs ausgeführt = ResultSet ([check_website_task.delay (Adresse) für Adresse in WEBSITE_LIST]) # Warten Sie, bis die Aufgaben abgeschlossen sind. Rs.get () end_time = time.time () print ("CelerySquirrel:", end_time - start_time) # CelerySquirrel: 2.4979639053344727
Keine Panik, wenn nichts passiert. Denken Sie daran, dass Sellerie eine Dienstleistung ist und wir müssen sie ausführen. Bis jetzt haben wir nur die Aufgaben in Redis platziert, aber Celery haben sie nicht gestartet. Dazu müssen wir diesen Befehl in dem Ordner ausführen, in dem sich unser Code befindet:
Sellerie-Arbeiter -A do_celery --loglevel = debug --concurrency = 4
Führen Sie nun das Python-Skript erneut aus und sehen Sie, was passiert. Eine Sache zu beachten: Beachten Sie, wie wir die Redis-Adresse zweimal an unsere Redis-Anwendung übergeben haben. Das Makler
Parameter gibt an, wo die Aufgaben an Celery übergeben werden Backend
Hier setzt Celery die Ergebnisse an, damit wir sie in unserer App verwenden können. Wenn wir kein Ergebnis angeben Backend
, Es gibt keine Möglichkeit für uns zu wissen, wann die Aufgabe bearbeitet wurde und was das Ergebnis war.
Beachten Sie außerdem, dass sich die Protokolle jetzt in der Standardausgabe des Celery-Prozesses befinden. Überprüfen Sie die Protokolle daher im entsprechenden Terminal.
Ich hoffe, dies war eine interessante Reise für Sie und eine gute Einführung in die Welt der parallelen / gleichzeitigen Programmierung in Python. Dies ist das Ende der Reise, und wir können einige Schlussfolgerungen ziehen:
Einfädeln
und gleichzeitige Merkmale
Bibliotheken.Multiprocessing
bietet eine sehr ähnliche Schnittstelle zu Einfädeln
aber für Prozesse und nicht für Threads.Lernen Sie Python mit unserem kompletten Python-Tutorial, egal ob Sie gerade erst anfangen oder ein erfahrener Programmierer sind, der neue Fähigkeiten erlernen möchte.