lesson02.md 12 KB

Каналы

Содержание

Каналы в go являются попыткой решить проблему безопасности данных в конкуретной среде. Это неидеальная попытка реализовать монитор Хоара. Неидеальность связана с тем, что подавляющая часть решений в go продиктована несколькими соображениями:

  • простой язык заставляет думать над алгоритмом, а не способом проявить свой интеллект;
  • новый человек в коллективе должен легко читать незнакомый код;
  • при необходимости -- можно использовать опасные средства для увеличения эффективности, если ситуация навязывает такое решение;

Все эти компромиссы имеют свою цену, поэтому не стоит увлекаться. Каналы именно такое средство.

Данные в конкурентной среде

Основная проблема состоит в том, что конкурентные потоки могут обращаться к одним и тем же данных.

Нет проблем, когда эти обращения только на чтение. Проблема появляется тогда, когда хотя бы один поток начинает менять данные. Без специальных мер невозможно гарантировать, что чтение будет происходить после записи, а не во время записи. Если такое состояние происходит -- вместо данных после чтения может оказаться мусор.

Эта проблема получила название гонок данных. Проблема далеко не умозрительная. [Тут] можно почитать, как аппарат лучевой терапии из-за этой ошибки убивал пациентов (должен был лечить).

Поток-1 Поток-2 Результат
Пишет Читает Мусор
Пишет Пишет Мусор
Читает Пишет Мусор
Читает Читает Норм

В 75% случаев -- будет мусор. Полагаться на случай нельзя.

Каналы

Условная схема канала представлена ниже:

flowchart LR
    subgraph channel
        mutex
        in
        out
        FIFO
        subgraph property
            len
            cap
            _isClose_
        end
    end
    flow1 --> |data|in
    mutex --> in
    in --> FIFO
    FIFO --> out
    mutex --> FIFO
    out --> |data|flow2
    mutex --> out

На уровне рантайма это:

  • мьютекс;
  • очередь;
  • заданная ёмкость;
  • длина (сколько элементов из ёмкости занято);
  • признак "закрыт ли канал".

Что не так с каналами

Канал требует строгой дисциплины из-за своих особенностей.

  • канал нельзя закрыть дважды -- будет паника;
  • нельзя записать в закрытый канал -- будет паника;
  • закрытый канал может содержать полезные данные;
  • если функция работает слишком долго -- рантайм теоретически может долго не проверять состояние каналов (особенно на машинах с одним ядром или с явным ограничением на один поток).

Как с этим жить

Простые правила помогут избежать непредсказуемого поведения программы:

  • кто канал создал -- тот в него и пишет;
  • кто канал создал -- тот его и закрывает;
  • кто канал создал -- тот и отдаёт канал, но только для чтения;
  • кто канал читает -- ничего с ним не делает;
  • при длительных операциях надо специальным вызовом давать возможность рантайму немного поработать.

При соблюдении первых трёх правил -- в других случаях ничего плохого не случится. Но часто придётся работать с каналами, которые не подчиняются первым трём правилам. Об этом стоит помнить.

Неявные правила работа с данными

Мало передать структуры и данные через каналы, чтобы избежать гонок данных.

Надо понимать, что как только данные попали в канал по ссылке -- теперь есть две ссылки, которые указывают на одни и теже данные.

Поэтому два железных правила:

  • что попало в канал -- то пропало;
  • что попало в канал, но надо иметь доступ из нескольких потоков -- структура должна иметь охрану данных собственным мьютексом.

Второе правило должно быть исключением. Если какой-либо глобальный объект разделяется между потоками -- он должен быть доступен через объект приложения, а не через каналы.

Как определить есть ли что-то в канале

К каналу, как и к срезу применима функция len:

countMsg := len(chMsg)
if countMsg == 0{
    return
}
// Что-то делаем не с пустым каналом

Что не так с длиной канала

Проблема в том, что в канал пишет один поток, а длину (обычно) проверяет другой. И пока второй поток что-то делает с каналом -- первый в это время может ещё дописать данных. Или другой поток может прочитать данные из этого же канала.

На текущую длину канала в потребителе полагаться нальзя!

Единственный надёжный метод -- итерация по каналу в цикле.

Пример

Пример показывает:

  • как правильно создать канал;
  • как правильно читать из канала;
  • как правильно закрыть канал.

    
    // Создаёт канал и пишет в него некоторое количество сообщений
    //   Этот же канал и  возвращает. Причём закрытый. Причём только для чтения.
    func runWriter()<- chan int{
    chProduct := make(chan int, 1_000)
    for i:=0; i<200; i++ {
        chProduct <- i
    }
    // Закрытие канала здесь никак не влияет на способность потом читать из канала
    close(chProduct)
    return chProduct
    }
    
    // Получает канал при вызове и читает из него, пока не закончатся данные
    //  Не играет значения, что канал закрыт. Выход из цикла произойдёт автоматически
    //  по факту исчерпания данных.
    func runReader (chProd <-chan int){
    for val := range chProd {
        log.Printf("runReader(): i=%v\n", i)
    }
    }
    
    chProd := runWriter()
    runReader(chProd)
    

Работа с select

select умеет работать не только с каналами на чтение, но и на запись.

select{
    case <-chEnd: // Канал на азкрытие
        return
    case msg:=<-chMsg: // Прилетело сообщение
        sf.useMsg(msg)
    case sysMsg, isOk:=<-chSysMsg: // Прилетело системное сообщение
        if !isOk{ // Канал закрыт
            return
        }
        sf.useSysMsg(sysMsg)
    case chSig<-1: // Условное действие на отправку, если свободен сигнал/
        sf.sendSig()
}

Пример мультиплексора

// Скорость освобождения канала зависит от скорости чтения потребителя из него
chOut := make(chan int, 10) // Канал с ограничением на ёмкость

// В коде ниже нет проверки на закрытие каналов чтения
for {
    select{
        case msg :=<- chSig1: // Взять из первого канала
            chOut <- msg
        case msg :=<- chSig2: // Взять из второго канала
            chOut <- msg
    }
}

Пример демультиплексора

// Скорость освобождения канала зависит от скорости чтения потребителя из него
chOut1 := make(chan int, 10) // Канал с ограничением на ёмкость
chOut2 := make(chan int, 10) // Канал с ограничением на ёмкость

func run(chIn <-chan int){
    for msgIn :=range chIn{
        switch msgIn.OutChan {
            case 1: // Отправить в первый канал
                chOut1 <- msg.PayLoad
            case 2: // Отправить во второй канал
                chOut <- msg.PayLoad2
        }
    }
}

Пример семафора

chEnd := make(chan struct{}, 1)

go func(){
    defer close(chEnd)
    time.Sleep(time.Second * 1)
}()

<-chEnd

log.Println("End work")

Примеры использования каналов:

Статья на Хабре Видосик на Ютубе