Thread-Safe und Lock-Free - Queue-Implementierung

13547
Hosam Aly

Ich habe versucht, eine schlossfreie Warteschlangenimplementierung in Java zu erstellen, hauptsächlich für das persönliche Lernen. Die Warteschlange sollte eine allgemeine Warteschlange sein, die eine beliebige Anzahl von Lesern und / oder Schreibern gleichzeitig zulässt.

Bitte überprüfen Sie es und schlagen Sie Verbesserungsvorschläge vor.

( Cross-post von StackOverflow )

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeQueue<T> {
    private static class Node<E> {
        final E value;
        volatile Node<E> next;

        Node(E value) {
            this.value = value;
        }
    }

    private AtomicReference<Node<T>> head, tail;

    public LockFreeQueue() {
        // have both head and tail point to a dummy node
        Node<T> dummyNode = new Node<T>(null);
        head = new AtomicReference<Node<T>>(dummyNode);
        tail = new AtomicReference<Node<T>>(dummyNode);
    }

    /**
     * Puts an object at the end of the queue.
     */
    public void putObject(T value) {
        Node<T> newNode = new Node<T>(value);
        Node<T> prevTailNode = tail.getAndSet(newNode);
        prevTailNode.next = newNode;
    }

    /**
     * Gets an object from the beginning of the queue. The object is removed
     * from the queue. If there are no objects in the queue, returns null.
     */
    public T getObject() {
        Node<T> headNode, valueNode;

        // move head node to the next node using atomic semantics
        // as long as next node is not null
        do {
            headNode = head.get();
            valueNode = headNode.next;
            // try until the whole loop executes pseudo-atomically
            // (i.e. unaffected by modifications done by other threads)
        } while (valueNode != null && !head.compareAndSet(headNode, valueNode));

        T value = (valueNode != null ? valueNode.value : null);

        // release the value pointed to by head, keeping the head node dummy
        if (valueNode != null)
            valueNode.value = null;

        return value;
    }
}
Antworten
44
In getObject () gibt es ein Rennen. Head.get (). Next gegen putObject () prevTailNode.next = newNode. Ich wünschte, die Codereview-Site hätte Zeilennummern. Nachdem die tail.getAndSet (newNode) aufgetreten ist, aber bevor die prevTailNode.next-Zuweisung durchgebrannt wurde, ruft getObject möglicherweise head.get () auf und liest dann headNode.next, die zu diesem Zeitpunkt NULL ist . Ron vor 9 Jahren 0
@ Ron Ich spreche darüber, wie dieses Verhalten in meiner Antwort seltsam ist, aber es ist zumindest Thread-sicher. Craig vor 9 Jahren 0
@Craig Ok ich verstehe. getObject, das null zurückgibt, ist in Ordnung. Ich denke, ein Leistungsnachteil ist, ob der Aufruf von head.compareAndSet in getObject Garantien für den Fortschritt nach vorne bietet. Ron vor 9 Jahren 0
@ Ron Das stimmt. Normalerweise werden nicht blockierende Strukturen geschrieben, um eine bessere Leistung als blockierende Strukturen zu erzielen. Dies kann jedoch nicht garantiert werden. Zumal der Thread, der die CAS-Operationen ausführt, nicht dazu beitragen kann, dass andere Threads einen Fortschritt nach vorne machen. Craig vor 9 Jahren 1

5 Antworten auf die Frage

21
Craig

Ja.

  • Die Kombination aus flüchtigen Operationen und Compare-and-Swap-Operationen reicht aus, um sicherzustellen, dass die Node-Objekte sicher veröffentlicht werden.
  • Der Compare-and-Swap muss volatilein beiden Methoden vor der Zuweisung zu einer Variablen stehen, damit Sie dort gut zurechtkommen. Sie müssen nicht atomar geschehen.

Die Warteschlange zeigt ein merkwürdiges Verhalten. Nehmen wir an, Thread 1 stoppt putObject()nach dem CAS, aber vor der letzten Zuweisung. Der nächste Thread 2 wird putObject()vollständig ausgeführt. Beim nächsten Thread drei Aufrufe getObject(), und es kann keines der ersten beiden Objekte angezeigt werden, obwohl Thread 2 vollständig ausgeführt wurde. Im Speicher wird eine kleine Kette aufgebaut. Erst nach Abschluss von Thread 1 putObject()sind beide Objekte in der Warteschlange sichtbar, was etwas überraschend ist und eine schwächere Semantik aufweist als die meisten nicht blockierenden Datenstrukturen.

Eine andere Sichtweise auf die ungerade API ist, dass es nett ist, eine Nichtblockierung zu haben, put()aber es ist sehr merkwürdig, eine Nichtblockierung zu haben get(). Dies bedeutet, dass die Warteschlange bei wiederholten Abfragen und beim Schlafen verwendet werden muss.

Danke für deine Kommentare. Ich habe das Feld "value" nicht als "final" definiert, um es auf "null" setzen zu können, um einen potenziellen Speicherverlust zu vermeiden. (Überprüfen Sie die letzte if-Anweisung in getObject ()). Hosam Aly vor 9 Jahren 0
Was das seltsame Verhalten betrifft, stimme ich Ihnen zu, aber dies war eine Entwurfsentscheidung, um den Code zu vereinfachen und die Leistung zu verbessern. Ich denke nicht, dass es wichtig wäre, wenn es für einfache Szenarien von Produzenten und Verbrauchern verwendet wird. Was denkst du? Hosam Aly vor 9 Jahren 0
@ Hosam Ich denke, es ist problematisch auf der Verbraucherseite. Craig vor 9 Jahren 0
@Hosam: Meine Faustregel lautet: entweder "final" oder "volatile". Wenn Sie es nicht "final" machen, machen Sie es zumindest "volatil". (Dies ist genau das, was Craig gesagt hat, und ich möchte es verstärken.) Chris Jester-Young vor 9 Jahren 0
@Craig: Danke. Entschuldigung für meinen späten Kommentar, aber Sie wissen wahrscheinlich, wie heiß es in Ägypten in der vergangenen Zeit war. Es ist wahr, dass diese Warteschlange wiederholtes Abfragen und Einschlafen erfordern würde. Dies ist (IMHO) in Szenarien angemessen, in denen ein einfaches Verhalten wie bei einer Nachrichtenwarteschlange erforderlich ist. Trotzdem mag ich Ihre Idee eines nicht-blockierenden `get ()`. Ich werde darüber nachdenken, wie ich es umsetzen kann. Danke noch einmal. Hosam Aly vor 9 Jahren 0
@ Chris Jester-Young: Ich hätte es vorgezogen, es "final" zu machen, aber ich wollte ein potenzielles Speicherverlust vermeiden. Ich glaube nicht, dass es wirklich wichtig ist, weil es nur zweimal gesetzt wird: 1) im Konstruktor und 2) wenn es nicht mehr zugänglich ist. Ist diese Begründung für Sie sinnvoll? Hosam Aly vor 9 Jahren 0
4
supercat

Ich mag deine 'Put'-Routine nicht. Das "ungerade Verhalten", das andere bemerkt haben, bedeutet, dass der Algorithmus einen der Hauptvorteile des "Lock-Free" -Verfahrens verliert: Immunität gegen Prioritätsinversion oder asynchron terminierte Threads. Wenn ein Thread mitten in eine Warteschlangenschreiboperation eingelegt wird, wird die Warteschlange vollständig unterbrochen, es sei denn, bis die Warteschlange ihre Arbeit beendet hat.

Die Lösung besteht darin, CompareAndSet den Zeiger "next" des letzten Knotens zu setzen, bevor der Zeiger "tail" aktualisiert wird. Wenn der "nächste" Zeiger des letzten Knotens nicht null ist, bedeutet dies, dass er nicht mehr der letzte Knoten ist, und man muss Verknüpfungen durchlaufen, um den echten letzten Knoten zu finden. Wiederholen Sie das CompareAndSet darauf und hoffen Sie, dass es immer noch der letzte Knoten ist .

Danke @supercat. Ihr Punkt zu asynchron beendeten Threads ist sicherlich wertvoll, aber ich bin nicht sicher, wie Ihr Vorschlag ihn behebt. Könnten Sie es bitte mit etwas Code näher erläutern? Hosam Aly vor 9 Jahren 0
@Hosam: Der letzte Eintrag der Warteschlange ist derjenige, dessen "nächster" Zeiger Null ist. Um ein Element zur Warteschlange hinzuzufügen, machen Sie das eigentliche letzte Element, dh das Element, dessen "nächster" Zeiger den Wert null hat, auf das neue Element. Obwohl die Warteschlange einen Zeiger für das letzte Element enthält, zeigt sie nicht immer auf das letzte Element. Es kann auf ein Element verweisen, das früher das letzte Element war, aber nicht mehr. Solange der Code, der versucht, ein Element hinzuzufügen, bereit ist, nach dem tatsächlichen letzten Element zu suchen, ist für die Richtigkeit nur der Zeiger "letzter Artikel" irgendwo in der Warteschlange erforderlich. supercat vor 9 Jahren 0
@Hosam: Das Hinzufügen eines Elements zur Warteschlange führt zwei Dinge aus: -1- Der "nächste" Zeiger des letzten letzten Elements zeigt auf das neue Element. -2- Es wird versucht, den Zeiger "letzter Eintrag" der Warteschlange zu aktualisieren. Wenn es zwei gleichzeitige Versuche gibt, Schritt 1 auszuführen, ist der CompareExchange von einem erfolgreich, und der andere schlägt fehl. Derjenige, dessen CompareExchange fehlschlägt, sucht nach dem neuen Ende der Warteschlange und fügt sich danach selbst hinzu. Beachten Sie, dass der Versuch, den Zeiger für das Ende der Warteschlange zu aktualisieren, gewissermaßen optional ist. Wenn es nie aktualisiert wurde, wurde die Warteschlange immer langsamer, aber gelegentliche Fehler sind kein Problem. supercat vor 9 Jahren 0
4
Tino

Während die Lösung von OP für mich richtig aussieht, erfindet Rons Verbesserung eine Rennbedingung:

Thread 1 (in getObject ()):

} while (!refHead.compareAndSet(head, next));

T value = next.value;

Thread 1 ist hier ausgesetzt, wurde also noch nicht ausgeführt

next.value = null;

Wir wissen, dass value! = Null und refHead jetzt als nächstes kommt, also refHead.get (). Value! = Null

Thread 2 (in getObject ()):

    head = refHead.get();
    assert head.value == null;

Hier beißt die Behauptung sogar, dass alles wieder in Ordnung ist, nachdem Thread 1 fortgefahren ist.

3
Ron

Ich habe Ihren Code ein wenig angepasst, aber ich denke, Ihr Ansatz ist gut.

Wie ich in dem Kommentar anspielte, gibt es keine Garantie für die Fairness zwischen den Threads compareAndSetting des Kopfes, so dass ein wirklich unglücklicher Thread für eine Weile hängen bleiben könnte, wenn es viele Verbraucher gibt.

Ich denke nicht, dass diese Datenstruktur Nullen enthalten sollte, da es keine Möglichkeit gibt, zwischen dem Erhalten einer Null und dem Abrufen aus einer leeren Warteschlange zu unterscheiden, also werfe ich die NPE.

Ich habe die Logik in getObject ein wenig überarbeitet, um redundante Nullprüfungen zu entfernen.

Und ich habe einige Vars umbenannt, um ungarisch zu sein.

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeQueue<T> {
private static class Node<E> {
    E value;
    volatile Node<E> next;

    Node(E value) {
        this.value = value;
    }
}

private final AtomicReference<Node<T>> refHead, refTail;
public LockFreeQueue() {
    // have both head and tail point to a dummy node
    Node<T> dummy = new Node<T>(null);
    refHead = new AtomicReference<Node<T>>(dummy);
    refTail = new AtomicReference<Node<T>>(dummy);
}

/**
 * Puts an object at the end of the queue.
 */
public void putObject(T value) {
    if (value == null) throw new NullPointerException();

    Node<T> node = new Node<T>(value);
    Node<T> prevTail = refTail.getAndSet(node);
    prevTail.next = node;
}

/**
 * Gets an object from the beginning of the queue. The object is removed
 * from the queue. If there are no objects in the queue, returns null.
 */
public T getObject() {
    Node<T> head, next;

    // move head node to the next node using atomic semantics
    // as long as next node is not null
    do {
        head = refHead.get();
        next = head.next;
        if (next == null) {
            // empty list
            return null;
        }
        // try until the whole loop executes pseudo-atomically
        // (i.e. unaffected by modifications done by other threads)
    } while (!refHead.compareAndSet(head, next));

    T value = next.value;

    // release the value pointed to by head, keeping the head node dummy
    next.value = null;

    return value;
}
}
Das gleiche Problem wie der ursprüngliche Code ... muss flüchtig sein. Craig vor 9 Jahren 0
Ich behaupte, dass es nicht flüchtig sein muss. Bei Threads, die in das Wertefeld schreiben und aus diesem lesen, sind Speicherzäune über AtomicReferences aufgetreten. Ron vor 9 Jahren 0
Oh ja! Ich muss JCIP noch einmal lesen! Craig vor 9 Jahren 0
Vielen Dank. Das Werfen der NPE ist eine gute Idee, und das Entfernen der redundanten "Null" -Kontrollen ist eine willkommene Ergänzung. Wie @Tino jedoch darauf hinweist, könnte die "assert" in einem Rennen ausfallen (http://codereview.stackexchange.com/questions/224/is-this-lock-free-queue-implementation-thread-safe/446#). 446). Hosam Aly vor 9 Jahren 0
2
Andriy Plokhotnyuk

Similar structure and operations for it to get lock free multi-producer single-consumer (MPSC) queue described by Dmitry Vyukov.

I have used it as an internal queue for Scalaz actors to work with it in MPSC mode.