Reaktive Programmieroperatoren in RxJava 2

Wenn Ihre Android-App diese Fünf-Sterne-Bewertungen bei Google Play sammelt, muss sie für mehrere Aufgaben geeignet sein.

Die mobilen Benutzer von heute erwarten, dass sie immer noch in der Lage sind, mit Ihrer App zu interagieren, während sie im Hintergrund arbeitet. Dies mag einfach klingen, Android ist jedoch standardmäßig ein Singlethread. Wenn Sie also die Erwartungen Ihres Publikums erfüllen, müssen Sie früher oder später ein paar zusätzliche Threads erstellen.

Im vorherigen Artikel dieser Serie haben wir eine Einführung in RxJava gegeben, eine reaktive Bibliothek für die JVM, mit deren Hilfe Sie Android-Anwendungen erstellen können, die auf Daten und Ereignisse reagieren, sobald sie auftreten. Sie können diese Bibliothek jedoch auch verwenden, um auf Daten und Ereignisse zu reagieren gleichzeitig.

In diesem Beitrag werde ich Ihnen zeigen, wie Sie die Operatoren von RxJava verwenden können, um die gleichzeitige Verwendung von Android zu einem schmerzfreien Erlebnis zu machen. Am Ende dieses Artikels werden Sie wissen, wie Sie mit RxJava-Operatoren zusätzliche Threads erstellen, die Arbeit angeben, die in diesen Threads ausgeführt werden soll, und dann die Ergebnisse in Androids wichtigstem Haupt-UI-Thread veröffentlichen - alles mit nur einem einige Codezeilen.

Und da keine Technologie perfekt ist, erzähle ich Ihnen auch über die große Gefahr, dass Sie die RxJava-Bibliothek zu Ihren Projekten hinzufügen - bevor Sie Ihnen zeigen, wie Sie Operatoren verwenden, um dieses Problem zu gewährleisten noch nie tritt in deinen eigenen Android-Projekten auf.

Operatoren vorstellen

RxJava verfügt über eine riesige Sammlung von Operatoren, die hauptsächlich dazu dienen sollen, die von Ihnen ausgegebenen Daten zu ändern, zu filtern, zusammenzuführen und umzuwandeln Beobachtbars. Die vollständige Liste der RxJava-Operatoren finden Sie in den offiziellen Dokumenten, und niemand erwartet, dass Sie sich etwas merken jeder einzelne Bediener, Es lohnt sich, diese Liste durchzulesen, um nur einen groben Überblick über die verschiedenen Datentransformationen zu erhalten, die Sie ausführen können.

Die Liste der Operatoren von RxJava ist bereits sehr umfangreich. Wenn Sie jedoch nicht den perfekten Operator für die von Ihnen beabsichtigte Datentransformation finden, können Sie immer mehrere Operatoren miteinander verketten. Anwenden eines Operators auf eine Beobachtbar gibt normalerweise einen anderen zurück Beobachtbar, Sie können also weiterhin Operatoren anwenden, bis Sie die gewünschten Ergebnisse erhalten.

Es gibt viel zu viele RxJava-Operatoren, die in einem einzigen Artikel behandelt werden können, und die offiziellen Dokumente von RxJava sind bereits gut darin, alle Operatoren, die Sie für Datentransformationen verwenden können, vorzustellen das meiste Potenzial, um Ihnen das Leben als Android-Entwickler zu erleichtern: subscribeOn () und observOn ()

Multithreading mit RxJava-Operatoren

Wenn Ihre App die bestmögliche Benutzererfahrung bieten soll, muss sie in der Lage sein, intensive oder lang andauernde Aufgaben auszuführen und gleichzeitig mehrere Aufgaben auszuführen, ohne den wichtigen Haupt-UI-Thread von Android zu blockieren.

Stellen Sie sich zum Beispiel vor, dass Ihre App einige Informationen aus zwei verschiedenen Datenbanken abrufen muss. Wenn Sie diese beiden Aufgaben nacheinander im Android-Haupt-Thread ausführen, wird dies nicht nur viel Zeit in Anspruch nehmen, sondern die Benutzeroberfläche reagiert nicht, bis Ihre App alle Informationen aus beiden Datenbanken abgerufen hat . Nicht gerade eine großartige Benutzererfahrung!

Eine weitaus bessere Lösung ist die Erstellung von zwei zusätzlichen Threads, bei denen Sie beide Aufgaben gleichzeitig ausführen können, ohne dass der Haupt-UI-Thread blockiert wird. Dieser Ansatz bedeutet, dass die Arbeit doppelt so schnell abgeschlossen wird, und Der Benutzer kann weiterhin mit der Benutzeroberfläche Ihrer App interagieren. Möglicherweise wissen Ihre Benutzer möglicherweise nicht einmal, dass Ihre App im Hintergrund einige intensive und lang andauernde Arbeit leistet. Alle Datenbankinformationen werden einfach wie von Zauberhand in der Benutzeroberfläche Ihrer Anwendung angezeigt!

Im Lieferumfang von Android sind einige Tools enthalten, mit denen Sie zusätzliche Threads erstellen können, einschließlich Bedienungs und IntentServices, aber diese Lösungen sind schwierig zu implementieren und können schnell zu komplexem, ausführlichem Code führen. Wenn Sie das Multithreading nicht korrekt implementieren, kann es vorkommen, dass Sie eine Anwendung finden, die Speicher verliert und alle möglichen Fehler auslöst.

Um das Multithreading auf Android noch mehr Kopfschmerzen zu verursachen, ist der Haupt-UI-Thread von Android der einzige Thread, der die Benutzeroberfläche Ihrer App aktualisieren kann. Wenn Sie die Benutzeroberfläche Ihrer App mit dem Ergebnis der durchgeführten Arbeit aktualisieren möchten jeder andere Thread, dann müssen Sie normalerweise eine erstellen Handler im Haupt-UI-Thread, und verwenden Sie dann diese Handler um Daten von Ihrem Hintergrundthread zum Hauptthread zu übertragen. Dies bedeutet mehr Code, mehr Komplexität und mehr Möglichkeiten, dass sich Fehler in Ihr Projekt einschleichen.

RxJava bietet jedoch zwei Operatoren, mit denen Sie diese Komplexität und das Fehlerpotenzial weitgehend vermeiden können.

Beachten Sie, dass Sie diese Operatoren in Verbindung mit verwenden Scheduler, Dies sind im Wesentlichen Komponenten, mit denen Sie angeben können Fäden. Für jetzt denke nur an Planer als gleichbedeutend mit dem Wort Faden.

  • subscribeOn (Scheduler): Standardmäßig ein Beobachtbar gibt seine Daten in dem Thread aus, in dem das Abonnement deklariert wurde, d. h .abonnieren Methode. In Android ist dies im Allgemeinen der Haupt-UI-Thread. Du kannst den ... benutzen subscribeOn () Operator, um einen anderen zu definieren Planer bei dem die Beobachtbar sollte seine Daten ausführen und ausgeben.
  • observOn (Scheduler): Mit diesem Operator können Sie Ihre umleiten BeobachtbarEmissionen zu einem anderen Planer, den Thread effektiv wechseln, wo die BeobachtbarEs werden Benachrichtigungen gesendet und der Thread, in dem die Daten verbraucht werden.

RxJava enthält eine Reihe von Schedulern, die Sie zum Erstellen verschiedener Threads verwenden können, darunter:

  • Scheduler.io (): Entwickelt für E / A-bezogene Aufgaben. 
  • Schedulers.computation (): Entwickelt für Rechenaufgaben. Standardmäßig ist die Anzahl der Threads im Berechnungszeitplaner auf die Anzahl der auf Ihrem Gerät verfügbaren CPUs beschränkt.
  • Schedulers.newThread (): Erstellt einen neuen Thread.

Nun haben Sie einen Überblick über alle beweglichen Teile. Schauen wir uns einige Beispiele an subscribeOn () und observOn () werden verwendet und sehen einige Scheduler in Aktion.

subscribeOn ()

In Android werden Sie normalerweise verwendet subscribeOn () und eine begleitende Planer Um den Thread zu wechseln, wenn langwierige oder intensive Arbeit ausgeführt wird, besteht kein Risiko, dass der Haupt-UI-Thread blockiert wird. Beispielsweise können Sie sich entscheiden, eine große Datenmenge auf dem Computer zu importieren io () Scheduler oder führen Sie einige Berechnungen an Berechnung() Planer.

Im folgenden Code erstellen wir einen neuen Thread, in dem Beobachtbar führt seine Operationen aus und gibt die Werte aus 1, 2, und 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Dies ist alles, was Sie benötigen, um einen Thread zu erstellen und mit dem Ausgeben von Daten in diesem Thread zu beginnen. Möglicherweise möchten Sie jedoch eine Bestätigung, dass dieses Observable tatsächlich mit einem neuen Thread arbeitet. Eine Methode ist, den Namen des Threads, den Ihre Anwendung derzeit verwendet, in Android Studio auszudruckenLogcat Monitor.

Praktischerweise haben wir im vorherigen Beitrag Get Started With RxJava eine Anwendung erstellt, die in verschiedenen Phasen des Observable-Lebenszyklus Nachrichten an Logcat Monitor sendet, sodass wir diesen Code vielfach wiederverwenden können.

Öffnen Sie das Projekt, das Sie in diesem Beitrag erstellt haben, und passen Sie den Code so an, dass er den oben genannten verwendet Beobachtbar als Quelle Beobachtbar. Dann füge das hinzu subscribeOn () Operator und geben Sie an, dass die Nachrichten, die an Logcat gesendet werden, den Namen des aktuellen Threads enthalten sollen.

Ihr fertiges Projekt sollte ungefähr so ​​aussehen:

import android.support.v7.app.AppCompatActivity; import android.os.Bundle; import android.util.Log; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; importieren io.reactivex.schedulers.Schedulers; public class MainActivity erweitert AppCompatActivity public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Beobachter Observer = neuer Observer() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe") + Thread.currentThread (). GetName ());  @Override public void onNext (Integer-Wert) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!") + Thread.currentThread (). GetName ()); ; 

Vergewissern Sie sich, dass der Logcat-Monitor von Android Studio geöffnet ist (durch Auswahl von Android-Monitor tab, gefolgt von Logcat) und führen Sie Ihr Projekt dann entweder auf einem physischen Android-Gerät oder einem AVD aus. Sie sollten die folgende Ausgabe im Logcat-Monitor sehen:

Hier können Sie das sehen .abonnieren wird auf dem Haupt-UI-Thread aufgerufen, aber das Observable arbeitet mit einem völlig anderen Thread.

Das subscribeOn () Der Operator hat den gleichen Effekt, unabhängig davon, wo Sie ihn in der beobachtbaren Kette platzieren. Sie können jedoch nicht mehrere verwenden subscribeOn () Betreiber in derselben Kette. Wenn Sie mehr als eine angeben subscribeOn (), dann wird deine Kette nur benutze die subscribeOn () das ist das der Quelle am nächsten beobachtbare.

observOn ()

nicht wie subscribeOn (), wo du platzierst observOn () in deiner Kette tut Dies ist von Bedeutung, da dieser Operator nur den Thread ändert, der von den angezeigten Observables verwendet wird stromabwärts

Wenn Sie beispielsweise Folgendes in Ihre Kette einfügen, wird für jedes Beobachtbare, das ab diesem Zeitpunkt in der Kette erscheint, der neue Thread verwendet.

.observOn (Schedulers.newThread ())

Diese Kette wird im neuen Thread weiter ausgeführt, bis sie auf einen anderen trifft observOn () Operator, zu diesem Zeitpunkt wechselt es zu dem von diesem Operator angegebenen Thread. Sie können den Thread steuern, an den bestimmte Observables Benachrichtigungen senden, indem Sie mehrere einfügen observOn () Betreiber in Ihre Kette.

Bei der Entwicklung von Android-Apps werden Sie im Allgemeinen verwendet observOn () Um das Ergebnis der Arbeit an Hintergrund-Threads an den Haupt-UI-Thread von Android zu senden. Der einfachste Weg, Emissionen auf den Haupt-UI-Thread von Android umzuleiten, ist die Verwendung von AndroidSchedulers.mainThread Scheduler, Dies ist Teil der RxAndroid-Bibliothek und nicht der RxJava-Bibliothek. 

Die RxAndroid-Bibliothek enthält Android-spezifische Bindungen für RxJava 2 und ist damit eine wertvolle zusätzliche Ressource für Android-Entwickler (und etwas, das wir im nächsten Beitrag dieser Serie genauer betrachten werden)..

Öffnen Sie die Modulebene, um RxAndroid zu Ihrem Projekt hinzuzufügen build.gradle Datei und fügen Sie die neueste Version der Bibliothek zum Abschnitt Abhängigkeiten hinzu. Zum Zeitpunkt des Schreibens war die neueste Version von RxAndroid 2.0.1. Daher füge ich Folgendes hinzu:

Abhängigkeiten … kompilieren 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Nachdem Sie diese Bibliothek zu Ihrem Projekt hinzugefügt haben, können Sie festlegen, dass die Ergebnisse eines Observable mit einer einzigen Codezeile an den Haupt-UI-Thread Ihrer App gesendet werden:

.observOn (AndroidSchedulers.mainThread ())

Wenn man bedenkt, dass die Kommunikation mit dem Haupt-UI-Thread Ihrer App eine vollständige Seite der offiziellen Android-Dokumente belegt, ist dies eine enorme Verbesserung, die Sie beim Erstellen von Multithread-Android-Anwendungen möglicherweise viel Zeit sparen kann.

RxJavas großer Nachteil

Während RxJava Android-Entwicklern viel zu bieten hat, ist keine Technologie perfekt und RxJava hat eine große Gefahr, die Ihre App zum Absturz bringen kann.

Standardmäßig führt RxJava einen Push-basierten Workflow aus: Die Daten werden von einem Upstream erzeugt Beobachtbar, und wird dann dem zugewiesenen nachgeschoben Beobachter. Das Hauptproblem bei einem Push-basierten Workflow ist, wie einfach es für den Hersteller ist (in diesem Fall der Beobachtbar) um Gegenstände für den Verbraucher zu schnell abzugeben (Beobachter) herstellen.

Ein geschwätziger Beobachtbar und langsam Beobachter kann schnell zu einem Rückstand an nicht verbrauchten Artikeln führen, was Systemressourcen verschlingt und möglicherweise sogar zu einem Fehler führt OutOfMemoryException. Dieses Problem ist bekannt als Gegendruck.

Wenn Sie vermuten, dass in Ihrer App ein Gegendruck auftritt, gibt es einige mögliche Lösungen, einschließlich der Verwendung eines Bedieners, um die Anzahl der produzierten Elemente zu reduzieren.

Musterperioden erstellen mit Probe() und Gasfirst ()

Wenn ein Beobachtbar Wenn eine große Anzahl von Elementen ausgegeben wird, ist es möglicherweise nicht für die zugewiesenen Elemente erforderlich Beobachter bekommen jeden einzelne dieser Elemente.

Wenn Sie sicher einige ignorieren können BeobachtbarDann gibt es einige Operatoren, mit denen Sie Stichprobenzeiträume erstellen und dann bestimmte Werte auswählen können, die während dieser Zeiträume ausgegeben werden:

  • Das Probe() Der Bediener prüft die Ausgabe des Observable in von Ihnen festgelegten Intervallen und nimmt dann das letzte Element, das während dieses Stichprobenzeitraums ausgegeben wurde. Zum Beispiel, wenn Sie einschließen .Probe (5, SEKUNDEN) In Ihrem Projekt erhält der Observer dann den letzten Wert, der in jedem 5-Sekunden-Intervall ausgegeben wurde. 
  • Das throttleFirst () Der Bediener nimmt den ersten Wert, der während des Abtastzeitraums ausgegeben wurde. Zum Beispiel, wenn Sie einschließen .Gasfirst (5, SEKUNDEN) Der Observer empfängt dann den ersten Wert, der in jedem 5-Sekunden-Intervall ausgegeben wird.  

Emissionen dosieren mit Puffer()

Wenn Sie Emissionen nicht sicher überspringen können, können Sie möglicherweise noch einen gewissen Druck von einem Kampf nehmen Beobachter Emissionen in Chargen gruppieren und dann weiterleiten en masse. Die Verarbeitung von Chargenemissionen ist in der Regel effizienter als die Verarbeitung mehrerer Emissionen getrennt. Daher sollte dieser Ansatz die Verbrauchsrate verbessern.

Sie können mit dem Emissionstransfer Emissionen erzeugen Puffer() Operator. Hier verwenden wir Puffer() alle über einen Zeitraum von drei Sekunden ausgestrahlten Sendungen zu stapeln:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Alternativ können Sie verwenden Puffer() um eine Charge zu erstellen, die aus einer bestimmten Anzahl von Emissionen besteht. Zum Beispiel erzählen wir hier Puffer() Emissionen in Vierergruppen bündeln:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Observables durch Flowables ersetzen

Eine alternative Methode zur Verringerung der Anzahl von Emissionen besteht darin, die Abgasnorm zu ersetzen Beobachtbar Das verursacht Probleme mit einem Fließfähig.

In RxJava 2 beschloss das RxJava-Team, den Standard zu teilen Beobachtbar Es gibt zwei Typen: die reguläre Art, die wir uns in dieser Serie angesehen haben, und Fließfähigs.

Fließfähigs funktionieren ähnlich wie Beobachtbars, aber mit einem großen Unterschied: Fließfähigs senden nur so viele Objekte, wie der Beobachter dies wünscht. Wenn du einen hast Beobachtbar Wenn dabei mehr Elemente ausgegeben werden, als von seinem zugewiesenen Beobachter verbraucht werden können, sollten Sie in Betracht ziehen, zu einem zu wechseln Fließfähig stattdessen.

Bevor Sie mit der Verwendung beginnen können Fließfähigs in Ihren Projekten müssen Sie die folgende Importanweisung hinzufügen:

import io.reactivex.Flowable;

Sie können dann erstellen Fließfähigs verwendet genau die gleichen Techniken, die zum Erstellen verwendet wurden Beobachtbars. Beispielsweise wird für jedes der folgenden Codeausschnitte ein erstellt Fließfähig das ist in der Lage, Daten auszusenden:

Fließfähig flowable = Flowable.fromArray (neuer String [] "Süden", "Norden", "Westen", "Osten");… flowable.subscribe ()
Fließfähig flowable = Flowable.range (0, 20);… flowable.subscribe ()

An diesem Punkt fragen Sie sich vielleicht: Warum sollte ich jemals Gebrauch machen? Beobachtbars, wenn ich nur verwenden kann Fließfähigs und müssen sich nicht um den Gegendruck kümmern? Die Antwort lautet: a Fließfähig verursacht mehr Aufwand als ein regulärer Beobachtbar, Wenn Sie also eine leistungsfähige App erstellen möchten, sollten Sie bei bleiben Beobachtbars, es sei denn, Sie vermuten, dass Ihre Anwendung mit Gegendruck zu kämpfen hat.

Einzel

EIN Fließfähig ist nicht die einzige Variation Beobachtbar das finden Sie in RxJava, da die Bibliothek auch die Single Klasse.

Einzel sind nützlich, wenn Sie nur einen Wert ausgeben müssen. In diesen Szenarien erstellen Sie ein Beobachtbar kann sich wie übertrieben fühlen, aber a Single ist so konzipiert, dass nur ein einzelner Wert ausgegeben und dann abgeschlossen wird, entweder durch Aufruf von:

  • onSuccess (): Das Single gibt seinen einzigen Wert aus.  
  • onError ()Wenn das Single ist nicht in der Lage, sein Element auszugeben, dann wird diese Methode das Ergebnis übergeben Wurffähig.

EIN Single ruft nur eine dieser Methoden auf und wird sofort beendet.

Schauen wir uns ein Beispiel an Single wieder in Aktion, um Zeit zu sparen, verwenden wir Code wieder:

import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; public class MainActivity erweitert AppCompatActivity public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  privater SingleObserver getSingleObserver () gibt einen neuen SingleObserver zurück() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (String-Wert) Log.e (TAG, "onSuccess:" + -Wert);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Führen Sie Ihr Projekt auf einem AVD-Gerät oder einem physischen Android-Gerät aus. In Logcat Monitor von Android Studio wird die folgende Ausgabe angezeigt:

Wenn Sie Ihre Meinung ändern und eine konvertieren möchten Single In ein Beobachtbar RxJava hat zu jedem Zeitpunkt alle Operatoren, die Sie benötigen, einschließlich:

  • verbinden mit(): Führt mehrere zusammen Einzel in einem einzigen Beobachtbar
  • concatWith (): Kettet die Elemente, die von mehreren Elementen ausgegeben werden Einzel zusammen, um eine zu bilden Beobachtbar Emission. 
  • toObservable (): Konvertiert a Single In ein Beobachtbar Dadurch wird das ursprünglich von der Single ausgegebene Element ausgegeben und anschließend abgeschlossen.

Zusammenfassung

In diesem Beitrag haben wir einige RxJava-Operatoren untersucht, mit denen Sie mehrere Threads erstellen und verwalten können, ohne die Komplexität und das Fehlerpotenzial, die mit Multithreading auf Android traditionell verbunden sind. Wir haben auch gesehen, wie Sie die RxAndroid-Bibliothek verwenden können, um mit dem wichtigsten UI-Thread von Android über eine einzige Codezeile zu kommunizieren, und wie Sie sicherstellen, dass der Gegendruck in Ihrer Anwendung kein Problem darstellt.

Wir haben die RxAndroid-Bibliothek in dieser Serie einige Male angesprochen, aber diese Bibliothek ist mit Android-spezifischen RxJava-Bindungen gepackt, die für die Arbeit mit RxJava auf der Android-Plattform von unschätzbarem Wert sein können Schauen Sie sich die RxAndroid-Bibliothek genauer an.

Bis dahin findest du einige unserer anderen Beiträge zur Kodierung für Android!