Каналы в go
являются попыткой решить проблему безопасности данных в конкуретной среде.
Это неидеальная попытка реализовать монитор Хоара. Неидеальность связана с тем, что подавляющая часть решений в go
продиктована несколькими соображениями:
Все эти компромиссы имеют свою цену, поэтому не стоит увлекаться. Каналы именно такое средство.
Основная проблема состоит в том, что конкурентные потоки могут обращаться к одним и тем же данных.
Нет проблем, когда эти обращения только на чтение. Проблема появляется тогда, когда хотя бы один поток начинает менять данные. Без специальных мер невозможно гарантировать, что чтение будет происходить после записи, а не во время записи. Если такое состояние происходит -- вместо данных после чтения может оказаться мусор.
Эта проблема получила название гонок данных. Проблема далеко не умозрительная. [Тут] можно почитать, как аппарат лучевой терапии из-за этой ошибки убивал пациентов (должен был лечить).
Поток-1 | Поток-2 | Результат |
---|---|---|
Пишет | Читает | Мусор |
Пишет | Пишет | Мусор |
Читает | Пишет | Мусор |
Читает | Читает | Норм |
В 75% случаев -- будет мусор. Полагаться на случай нельзя.
Условная схема канала представлена ниже:
flowchart LR
subgraph channel
mutex
in
out
FIFO
subgraph property
len
cap
_isClose_
end
end
flow1 --> |data|in
mutex --> in
in --> FIFO
FIFO --> out
mutex --> FIFO
out --> |data|flow2
mutex --> out
На уровне рантайма это:
Канал требует строгой дисциплины из-за своих особенностей.
Простые правила помогут избежать непредсказуемого поведения программы:
При соблюдении первых трёх правил -- в других случаях ничего плохого не случится. Но часто придётся работать с каналами, которые не подчиняются первым трём правилам. Об этом стоит помнить.
Мало передать структуры и данные через каналы, чтобы избежать гонок данных.
Надо понимать, что как только данные попали в канал по ссылке -- теперь есть две ссылки, которые указывают на одни и теже данные.
Поэтому два железных правила:
Второе правило должно быть исключением. Если какой-либо глобальный объект разделяется между потоками -- он должен быть доступен через объект приложения, а не через каналы.
К каналу, как и к срезу применима функция len
:
countMsg := len(chMsg)
if countMsg == 0{
return
}
// Что-то делаем не с пустым каналом
Проблема в том, что в канал пишет один поток, а длину (обычно) проверяет другой. И пока второй поток что-то делает с каналом -- первый в это время может ещё дописать данных. Или другой поток может прочитать данные из этого же канала.
На текущую длину канала в потребителе полагаться нальзя!
Единственный надёжный метод -- итерация по каналу в цикле.
Пример показывает:
как правильно закрыть канал.
// Создаёт канал и пишет в него некоторое количество сообщений
// Этот же канал и возвращает. Причём закрытый. Причём только для чтения.
func runWriter()<- chan int{
chProduct := make(chan int, 1_000)
for i:=0; i<200; i++ {
chProduct <- i
}
// Закрытие канала здесь никак не влияет на способность потом читать из канала
close(chProduct)
return chProduct
}
// Получает канал при вызове и читает из него, пока не закончатся данные
// Не играет значения, что канал закрыт. Выход из цикла произойдёт автоматически
// по факту исчерпания данных.
func runReader (chProd <-chan int){
for val := range chProd {
log.Printf("runReader(): i=%v\n", i)
}
}
chProd := runWriter()
runReader(chProd)
select
умеет работать не только с каналами на чтение, но и на запись.
select{
case <-chEnd: // Канал на азкрытие
return
case msg:=<-chMsg: // Прилетело сообщение
sf.useMsg(msg)
case sysMsg, isOk:=<-chSysMsg: // Прилетело системное сообщение
if !isOk{ // Канал закрыт
return
}
sf.useSysMsg(sysMsg)
case chSig<-1: // Условное действие на отправку, если свободен сигнал/
sf.sendSig()
}
// Скорость освобождения канала зависит от скорости чтения потребителя из него
chOut := make(chan int, 10) // Канал с ограничением на ёмкость
// В коде ниже нет проверки на закрытие каналов чтения
for {
select{
case msg :=<- chSig1: // Взять из первого канала
chOut <- msg
case msg :=<- chSig2: // Взять из второго канала
chOut <- msg
}
}
// Скорость освобождения канала зависит от скорости чтения потребителя из него
chOut1 := make(chan int, 10) // Канал с ограничением на ёмкость
chOut2 := make(chan int, 10) // Канал с ограничением на ёмкость
func run(chIn <-chan int){
for msgIn :=range chIn{
switch msgIn.OutChan {
case 1: // Отправить в первый канал
chOut1 <- msg.PayLoad
case 2: // Отправить во второй канал
chOut <- msg.PayLoad2
}
}
}
chEnd := make(chan struct{}, 1)
go func(){
defer close(chEnd)
time.Sleep(time.Second * 1)
}()
<-chEnd
log.Println("End work")
Примеры использования каналов: