Los geht's Golang Parallelität, Teil 2

Überblick

Eine der einzigartigen Funktionen von Go ist die Verwendung von Kanälen, um sicher zwischen Goroutines zu kommunizieren. In diesem Artikel erfahren Sie, was Kanäle sind, wie sie effektiv verwendet werden können, und einige allgemeine Muster. 

Was ist ein Kanal??

Ein Kanal ist eine synchronisierte In-Memory-Warteschlange, die von Goroutines und regulären Funktionen zum Senden und Empfangen typisierter Werte verwendet werden kann. Die Kommunikation wird über den Kanal serialisiert.

Sie erstellen einen Kanal mit machen() und geben Sie die Art der Werte an, die der Kanal akzeptiert:

ch: = make (chan int)

Go bietet eine nette Pfeilsyntax zum Senden und Empfangen von Kanälen:

 // Wert an einen Kanal senden ch <- 5 // receive value from a channel x := <- ch

Sie müssen den Wert nicht verbrauchen. Es ist in Ordnung, nur einen Wert aus einem Kanal aufzurufen:

<-ch

Channels sind standardmäßig gesperrt. Wenn Sie einen Wert an einen Kanal senden, werden Sie blockiert, bis jemand ihn empfängt. Wenn Sie von einem Kanal empfangen, blockieren Sie auf ähnliche Weise, bis jemand einen Wert an den Kanal sendet.  

Das folgende Programm zeigt dies. Das Main() function macht einen Kanal und startet eine Go-Routine, die "start" druckt, einen Wert aus dem Kanal liest und auch druckt. Dann Main() startet eine weitere Goroutine, die jede Sekunde einen Strich ("-") druckt. Dann schläft es für 2,5 Sekunden, sendet einen Wert an den Kanal und schläft 3 weitere Sekunden, um alle Goroutines zu beenden.

import ("fmt" "time") func main () ch: = make (chan int) // Startet eine Goroutine, die einen Wert aus einem Kanal liest und ihn druckt. go func (ch chan int) fmt.Println (" start ") fmt.Println (<-ch) (ch) // Start a goroutine that prints a dash every second go func()  for i := 0; i < 5; i++  time.Sleep(time.Second) fmt.Println("-")  () // Sleep for two seconds time.Sleep(2500 * time.Millisecond) // Send a value to the channel ch <- 5 // Sleep three more seconds to let all goroutines finish time.Sleep(3 * time.Second) 

Dieses Programm demonstriert sehr gut die Sperrung des Kanals. Die ersten Goroutinendrucke "beginnen" sofort, werden jedoch beim Versuch, vom Kanal zu empfangen, bis zum blockiert Main() Funktion, die 2,5 Sekunden schläft und den Wert sendet. Die andere Goroutine bietet lediglich eine visuelle Anzeige des Zeitflusses, indem regelmäßig jede Sekunde ein Strich gedruckt wird. 

Hier ist die Ausgabe:

Start - - 5 - - -

Gepufferte Kanäle

Dieses Verhalten verbindet die Absender eng mit den Empfängern und ist manchmal nicht das, was Sie wollen. Go bietet verschiedene Mechanismen, um das anzugehen.

Gepufferte Kanäle sind Kanäle, die eine bestimmte (vordefinierte) Anzahl von Werten enthalten können, so dass Sender nicht blockieren, bis der Puffer voll ist, selbst wenn niemand empfängt. 

Um einen gepufferten Kanal zu erstellen, fügen Sie einfach eine Kapazität als zweites Argument hinzu:

ch: = make (chan int, 5)

Das folgende Programm veranschaulicht das Verhalten gepufferter Kanäle. Das Main() Das Programm definiert einen gepufferten Kanal mit einer Kapazität von 3. Dann startet es eine Goroutine, die jede Sekunde einen Puffer aus dem Kanal liest und druckt, und eine weitere, die jede Sekunde einen Strich druckt, um einen visuellen Hinweis auf den Fortschritt der Zeit zu geben. Dann sendet es fünf Werte an den Kanal. 

import ("fmt" "time") func main () ch: = make (chan int, 3) // Eine Goroutine starten, die jede Sekunde einen Wert aus dem Kanal liest und druckt, dass sie func (ch chan int) for time.Sleep (time.Second) fmt.Printf ("Goroutine erhielt:% d \ n"), <-ch)  (ch) // Start a goroutine that prints a dash every second go func()  for i := 0; i < 5; i++  time.Sleep(time.Second) fmt.Println("-")  () // Push values to the channel as fast as possible for i := 0; i < 5; i++  ch <- i fmt.Printf("main() pushed: %d\n", i)  // Sleep five more seconds to let all goroutines finish time.Sleep(5 * time.Second) 

Was passiert zur Laufzeit? Die ersten drei Werte werden vom Kanal sofort gepuffert und die Main() Funktionsblöcke. Nach einer Sekunde erhält die Goroutine einen Wert und die Main() Funktion kann einen anderen Wert drücken. Eine weitere Sekunde vergeht, die Goroutine erhält einen anderen Wert und die Main() Funktion kann den letzten Wert drücken. Zu diesem Zeitpunkt empfängt die Goroutine jede Sekunde Werte vom Kanal. 

Hier ist die Ausgabe:

main () gedrückt: 0 main () gedrückt: 1 main () gedrückt: 2 - Goroutine erhalten: 0 main () gedrückt: 3 - Goroutine erhalten: 1 main () gedrückt: 4 - Goroutine erhalten: 2 - Goroutine erhalten: 3 - Goroutine erhielt: 4

Wählen

Gepufferte Kanäle (sofern der Puffer groß genug ist) können das Problem temporärer Schwankungen beheben, wenn nicht genügend Empfänger vorhanden sind, um alle gesendeten Nachrichten zu verarbeiten. Es gibt aber auch das umgekehrte Problem, dass blockierte Empfänger auf die Verarbeitung von Nachrichten warten. Go hat dich bedeckt. 

Was ist, wenn Sie möchten, dass Ihre Goroutine noch etwas tut, wenn in einem Kanal keine Nachrichten verarbeitet werden sollen? Ein gutes Beispiel ist, wenn Ihr Receiver auf Nachrichten von mehreren Kanälen wartet. Sie möchten nicht auf Kanal A sperren, wenn Kanal B gerade Nachrichten enthält. Das folgende Programm versucht, die Summe von 3 und 5 mit der vollen Leistung der Maschine zu berechnen. 

Die Idee ist, eine komplexe Operation (z. B. eine Fernabfrage an eine verteilte Datenbank) mit Redundanz zu simulieren. Das Summe() function (beachten Sie, wie sie als verschachtelte Funktion definiert ist.) Main()) akzeptiert zwei int-Parameter und gibt einen int-Kanal zurück. Eine interne anonyme Goroutine schläft eine zufällige Zeit bis zu einer Sekunde und schreibt dann die Summe in den Kanal, schließt sie und gibt sie zurück.

Jetzt die wichtigsten Anrufe Summe (3, 5) viermal und speichert die resultierenden Kanäle in den Variablen ch1 bis ch4. Die vier Anrufe an Summe() kehren Sie sofort zurück, weil der zufällige Schlaf innerhalb der Goroutine passiert Summe() Funktion ruft auf.

Hier kommt der coole Teil. Das wählen Aussage lässt die Main() Funktion wartet auf alle Kanäle und reagiert auf den ersten, der zurückkehrt. Das wählen Anweisung funktioniert ein bisschen wie die Schalter Aussage.

func main () r: = rand.New (rand.NewSource (time.Now (). UnixNano ())) sum: = func (a int, b int) <-chan int  ch := make(chan int) go func()  // Random time up to one second delay := time.Duration(r.Int()%1000) * time.Millisecond time.Sleep(delay) ch <- a + b close(ch) () return ch  // Call sum 4 times with the same parameters ch1 := sum(3, 5) ch2 := sum(3, 5) ch3 := sum(3, 5) ch4 := sum(3, 5) // wait for the first goroutine to write to its channel select  case result := <-ch1: fmt.Printf("ch1: 3 + 5 = %d", result) case result := <-ch2: fmt.Printf("ch2: 3 + 5 = %d", result) case result := <-ch3: fmt.Printf("ch3: 3 + 5 = %d", result) case result := <-ch4: fmt.Printf("ch4: 3 + 5 = %d", result)  

Manchmal willst du das nicht Main() Diese Funktion blockiert das Warten, bis die erste Goroutine beendet ist. In diesem Fall können Sie einen Standardfall hinzufügen, der ausgeführt wird, wenn alle Kanäle gesperrt sind.

Ein Web Crawler-Beispiel

In meinem vorherigen Artikel habe ich eine Lösung für die Web-Crawler-Übung der Tour of Go gezeigt. Ich habe Goroutinen und eine synchronisierte Karte verwendet. Ich habe die Übung auch mit Hilfe von Kanälen gelöst. Der vollständige Quellcode für beide Lösungen ist auf GitHub verfügbar.

Betrachten wir die relevanten Teile. Zunächst wird hier eine Struktur an einen Kanal gesendet, wenn eine Goroutine eine Seite analysiert. Es enthält die aktuelle Tiefe und alle auf der Seite gefundenen URLs.

Typverknüpfungen struct urls [] string tie int

Das fetchURL () Die Funktion akzeptiert eine URL, eine Tiefe und einen Ausgabekanal. Es verwendet den von der Übung bereitgestellten Abruf, um die URLs aller Links auf der Seite abzurufen. Es sendet die Liste der URLs als einzelne Nachricht an den Kanal des Kandidaten als Links Struktur mit einer dekrementierten Tiefe. Die Tiefe gibt an, wie weit wir weiter kriechen sollten. Wenn die Tiefe 0 erreicht, sollte keine weitere Verarbeitung erfolgen.

func fetchURL (URL-String, Tiefe int, Kandidaten-Channel-Links) body, urls, err: = fetcher.Fetch (url) fmt.Printf ("gefunden:% s% q \ n", URL, body), wenn err! nil fmt.Println (err) Kandidaten <- linksurls, depth - 1 

Das ChannelCrawl () Funktion koordiniert alles. Es verfolgt alle URLs, die bereits in einer Karte abgerufen wurden. Es ist nicht erforderlich, den Zugriff zu synchronisieren, da sich keine andere Funktion oder Goroutine berührt. Es definiert auch einen Kandidaten-Kanal, in den alle Goroutinen ihre Ergebnisse schreiben.

Dann beginnt es aufzurufen parseUrl als Goroutinen für jede neue URL. Die Logik verfolgt, wie viele Goroutinen durch die Verwaltung eines Zählers gestartet wurden. Immer wenn ein Wert aus dem Kanal gelesen wird, wird der Zähler dekrementiert (weil die sendende Goroutine nach dem Senden beendet wird), und wenn eine neue Goroutine gestartet wird, wird der Zähler inkrementiert. Wenn die Tiefe Null wird, werden keine neuen Goroutinen gestartet, und die Hauptfunktion liest so lange aus dem Kanal, bis alle Goroutinen abgeschlossen sind.

// ChannelCrawl crawlt Links von einer Seed-URL-Funktion. ChannelCrawl (URL-String, Tiefe int, Abrufabruf) Anfangs-URL zum Start des Kandidaten-Kanals go fetchURL (URL, Tiefe, Kandidaten) für Zähler> 0 candentLinks: = <-candidates counter-- depth = candidateLinks.depth for _, candidate := range candidateLinks.urls  // Already fetched. Continue… if fetched[candidate]  continue  // Add to fetched mapped fetched[candidate] = true if depth > 0 counter ++ go fetchURL (Kandidat, Tiefe, Kandidaten)

Fazit

Die Kanäle von Go bieten viele Optionen für die sichere Kommunikation zwischen den Goroutinen. Die Syntaxunterstützung ist sowohl prägnant als auch illustrativ. Es ist ein echter Segen, gleichzeitige Algorithmen auszudrücken. Es gibt viel mehr Kanäle als ich hier vorgestellt habe. Ich möchte Sie dazu ermutigen, in die verschiedenen Nebenläufigkeitsmuster einzutauchen, die sie ermöglichen.