Hoe controleer je of een kanaal gesloten is of niet zonder het te lezen?

Dit is een goed voorbeeld van arbeiders & controller-modus in Go geschreven door @Jimt, in antwoord op
Is er een elegante manier om een ​​andere goroutine in golang te pauzeren en te hervatten?

package main
import (
    "fmt"
    "runtime"
    "sync"
    "time"
)
// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)
    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)
        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }
    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()
    // Wait for all goroutines to finish.
    wg.Wait()
}
func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.
    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }
        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()
            if state == Paused {
                break
            }
            // Do actual work here.
        }
    }
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }
    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }
    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }
    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}

Maar deze code heeft ook een probleem: als u een worker-kanaal in workerswilt verwijderen wanneer worker()wordt afgesloten, treedt er een dead lock op.

Als je close(workers[i])doet, zal de volgende keer dat de controller erin schrijft paniek veroorzaken omdat go niet naar een gesloten kanaal kan schrijven. Als je wat mutex gebruikt om het te beschermen, dan blijft het hangen op workers[i] <- Runningaangezien de workerniets van het kanaal leest en schrijven zal worden geblokkeerd, en mutex zal een dead lock veroorzaken. Je kunt als tijdelijke oplossing ook een grotere buffer geven om te kanaliseren, maar dat is niet goed genoeg.

Dus ik denk dat de beste manier om dit op te lossen is worker()het kanaal sluiten bij het verlaten, als de controller een gesloten kanaal vindt, springt hij eroverheen en doet niets. Maar ik kan in deze situatie niet vinden hoe ik kan controleren of een kanaal al gesloten is of niet. Als ik het kanaal in de controller probeer te lezen, is de controller mogelijk geblokkeerd. Dus ik ben momenteel erg in de war.

PS: Het herstellen van de opgewekte paniek is wat ik heb geprobeerd, maar het zal de goroutine sluiten die de paniek veroorzaakte. In dit geval zal het de controller zijn, dus het heeft geen zin.

Toch denk ik dat het nuttig is voor het Go-team om deze functie in de volgende versie van Go te implementeren.


Antwoord 1, autoriteit 100%

Er is geen manier om een ​​veilige applicatie te schrijven waarbij je moet weten of een kanaal open is zonder er interactie mee te hebben.

De beste manier om te doen wat je wilt doen, is met twee kanalen: een voor het werk en een om aan te geven dat je de staat wilt veranderen (en de voltooiing van die verandering van staat als dat belangrijk is).

Kanalen zijn goedkoop. Complexe ontwerpen die de semantiek overbelasten, is dat niet.

[ook]

<-time.After(1e9)

is een erg verwarrende en niet voor de hand liggende manier van schrijven

time.Sleep(time.Second)

Houd dingen eenvoudig en iedereen (inclusief jij) kan ze begrijpen.


Antwoord 2, autoriteit 90%

Op een hacky manier kan het worden gedaan voor kanalen waarnaar men probeert te schrijven door de opgewekte paniek te herstellen. Maar je kunt niet controleren of een leeskanaal is gesloten zonder ervan te lezen.

Of je doet het

  • lees uiteindelijk de “true” waarde ervan (v <- c)
  • lees de “true” waarde en ‘not closed’ indicator (v, ok <- c)
  • lees een nulwaardeen de ‘gesloten’ indicator (v, ok <- c) (voorbeeld)
  • blokkeert het voor altijd gelezen kanaal (v <- c)

Alleen de laatste leest technisch niet van het kanaal, maar dat heeft weinig zin.


Antwoord 3, autoriteit 9%

Ik weet dat dit antwoord zo laat is, ik heb deze oplossing geschreven, Hacking Ga runtime, het is geen veiligheid, het kan crashen:

import (
    "unsafe"
    "reflect"
)
func isChanClosed(ch interface{}) bool {
    if reflect.TypeOf(ch).Kind() != reflect.Chan {
        panic("only channels!")
    }
    // get interface value pointer, from cgo_export 
    // typedef struct { void *t; void *v; } GoInterface;
    // then get channel real pointer
    cptr := *(*uintptr)(unsafe.Pointer(
        unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
    ))
    // this function will return true if chan.closed > 0
    // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
    // type hchan struct {
    // qcount   uint           // total data in the queue
    // dataqsiz uint           // size of the circular queue
    // buf      unsafe.Pointer // points to an array of dataqsiz elements
    // elemsize uint16
    // closed   uint32
    // **
    cptr += unsafe.Sizeof(uint(0))*2
    cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
    cptr += unsafe.Sizeof(uint16(0))
    return *(*uint32)(unsafe.Pointer(cptr)) > 0
}

Antwoord 4, autoriteit 2%

Ik heb dit probleem vaak gehad met meerdere gelijktijdige goroutines.

Het kan al dan niet een goed patroon zijn, maar ik definieer een a-structeur voor mijn werknemers met een stopkanaal en een veld voor de werkstatus:

type Worker struct {
    data chan struct
    quit chan bool
    stopped bool
}

Dan kun je een controller een stopfunctie laten aanroepen voor de werknemer:

func (w *Worker) Stop() {
    w.quit <- true
    w.stopped = true
}
func (w *Worker) eventloop() {
    for {
        if w.Stopped {
            return
        }
        select {
            case d := <-w.data:
                //DO something
                if w.Stopped {
                    return
                }
            case <-w.quit:
                return
        }
    }
}

Dit geeft je een redelijk goede manier om een ​​duidelijke stop op je werkers te krijgen zonder dat er iets blijft hangen of fouten genereert, wat vooral goed is als je in een container draait.


Antwoord 5, autoriteit 2%

Nou, je kunt de default-tak gebruiken om het te detecteren, want een gesloten kanaal wordt geselecteerd, bijvoorbeeld: de volgende code selecteert default, channel, channel, de eerste selectie is niet geblokkeerd.

func main() {
    ch := make(chan int)
    go func() {
        select {
        case <-ch:
            log.Printf("1.channel")
        default:
            log.Printf("1.default")
        }
        select {
        case <-ch:
            log.Printf("2.channel")
        }
        close(ch)
        select {
        case <-ch:
            log.Printf("3.channel")
        default:
            log.Printf("3.default")
        }
    }()
    time.Sleep(time.Second)
    ch <- 1
    time.Sleep(time.Second)
}

Afdrukken

2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel

Let op, zie commentaar van @Angad onder dit antwoord:

Het werkt niet als je een gebufferd kanaal gebruikt en het bevat
ongelezen gegevens


Antwoord 6

Je kunt je kanaal ook op nul zetten en het sluiten. Op die manier kun je controleren of het nihil is.

voorbeeld in de speeltuin:
https://play.golang.org/p/v0f3d4DisCz

bewerken:
Dit is eigenlijk een slechte oplossing zoals aangetoond in het volgende voorbeeld,
omdat het instellen van het kanaal op nul in een functie het zou breken:
https://play.golang.org/p/YVE2-LV9TOp


Antwoord 7

ch1 := make(chan int)
ch2 := make(chan int)
go func(){
    for i:=0; i<10; i++{
        ch1 <- i
    }
    close(ch1)
}()
go func(){
    for i:=10; i<15; i++{
        ch2 <- i
    }
    close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
    ok1, ok2 = true, true
    select{
        case v,ok1 = <-ch1:
        if ok1 {fmt.Println(v)}
        default:
    }
    select{
        case v,ok2 = <-ch2:
        if ok2 {fmt.Println(v)}
        default:
    }
    if !ok1 && !ok2{return}
}

}


Antwoord 8

Uit de documentatie:

Een kanaal kan worden afgesloten met de ingebouwde functie close. Het meerwaardige toewijzingsformulier van de ontvangstoperator meldt of een ontvangen waarde werd verzonden voordat het kanaal werd gesloten.

https://golang.org/ref/spec#Receive_operator

Voorbeeld van Golang in Action toont dit geval:

// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
func init() {
    rand.Seed(time.Now().UnixNano())
}
// main is the entry point for all Go programs.
func main() {
    // Create an unbuffered channel.
    court := make(chan int)
    // Add a count of two, one for each goroutine.
    wg.Add(2)
    // Launch two players.
    go player("Nadal", court)
    go player("Djokovic", court)
    // Start the set.
    court <- 1
    // Wait for the game to finish.
    wg.Wait()
}
// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()
    for {
        // Wait for the ball to be hit back to us.
        ball, ok := <-court
        fmt.Printf("ok %t\n", ok)
        if !ok {
            // If the channel was closed we won.
            fmt.Printf("Player %s Won\n", name)
            return
        }
        // Pick a random number and see if we miss the ball.
        n := rand.Intn(100)
        if n%13 == 0 {
            fmt.Printf("Player %s Missed\n", name)
            // Close the channel to signal we lost.
            close(court)
            return
        }
        // Display and then increment the hit count by one.
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++
        // Hit the ball back to the opposing player.
        court <- ball
    }
}

Antwoord 9

het is gemakkelijker om eerst te controleren of het kanaal elementen heeft, die ervoor zorgen dat het kanaal leeft.

func isChanClosed(ch chan interface{}) bool {
    if len(ch) == 0 {
        select {
        case _, ok := <-ch:
            return !ok
        }
    }
    return false 
}

Antwoord 10

Als u dit kanaal luistert, kunt u altijd dat kanaal gesloten zijn.

case state, opened := <-ws:
    if !opened {
         // channel was closed 
         // return or made some final work
    }
    switch state {
        case Stopped:

Maar onthoud, u kunt één kanaal twee keer niet sluiten. Dit zal paniek verhogen.

Other episodes