Работа с потоками в C#

1. Начало работы

Обзор и ключевые понятия

C# поддерживает параллельное выполнение кода через многопоточность. Поток – это независимый путь исполнения, способный выполняться одновременно с другими потоками.
Программа на C# запускается как единственный поток, автоматически создаваемый CLR и операционной системой (“главный” поток), и становится многопоточной при помощи создания дополнительных потоков. Вот простой пример и его вывод:
ПРИМЕЧАНИЕ
Все примеры предполагают, что импортируются следующие пространства имен (если этот момент специально не оговаривается):
using System;
using System.Threading;
class ThreadTest 
{
  static void Main()
  {
    Thread t = new Thread(WriteY);
    t.Start();            // Выполнить WriteY в новом потоке
    while (true) 
      Console.Write("x"); // Все время печатать 'x'
  }
 
  static void WriteY() 
  {
    while (true) 
      Console.Write("y"); // Все время печатать 'y'
  }
}
Вывод:
xxxxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyyyyyyyyyxxxxxxxxxxxxxxxxxxxxxxyyy
yyyyyyyyyyyyyxxxxyyyyyyyyyyyyyyyyyyxxxxxxxxxxxxyyyyyyyyyyyyyyyxxxxxxxxxxxxxxxx
xxxxxxxxxyyyyyyyyyyyyyyyyyyyyyxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyyyyyxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyx
xxxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyyyyyyyyyxxxxxxxxxxxxxxxxxxxxxxyyyy
yyyyyyyyyyyyxxxxy...
В главном потоке создается новый поток t, исполняющий метод, который непрерывно печатает символ ‘y’. Одновременно главный поток непрерывно печатает символ ‘x’.
CLR назначает каждому потоку свой стек, так что локальные переменные хранятся раздельно. В следующем примере мы определяем метод с локальной переменной, а затем выполняем его одновременно в главном и во вновь созданном потоках:
static void Main() 
{
  new Thread(Go).Start();      // Выполнить Go() в новом потоке
  Go();                         // Выполнить Go() в главном потоке
}
 
static void Go() 
{
  // Определяем и используем локальную переменную 'cycles'
  for (int cycles = 0; cycles < 5; cycles++)
    Console.Write('?');
}
Консольный вывод:
??????????
Отдельный экземпляр переменной cycles создается в стеке каждого потока, так что выводится, как и ожидалось, десять знаков ‘?’.
Вместе с тем потоки разделяют данные, относящиеся к тому же экземпляру объекта, что и сами потоки:
class ThreadTest 
{
  bool done;
  
  static void Main()
  {
    ThreadTest tt = new ThreadTest(); // Создаем общий объект
    new Thread(tt.Go).Start();
    tt.Go();
  }
  
  // Go сейчас – экземплярный метод
  void Go() 
  {
    if (!done) { done = true; Console.WriteLine("Done"); }
  }
}
Так как оба потока вызывают метод Go() одного и того же экземпляра ThreadTest, они разделяют поле done. Результат – “Done”, напечатанное один раз вместо двух:
Done
Для статических полей работает другой способ разделения данных между потоками. Вот тот же самый пример, но со статическим полем done:
class ThreadTest 
{
  static bool done;    // Статическое поле, разделяемое потоками
  
  static void Main() 
  {
    new Thread(Go).Start();
    Go();
  }
  
  static void Go() 
  {
    if (!done) { done = true; Console.WriteLine("Done"); }
  }
}
Оба примера демонстрируют также другое ключевое понятие – потоковую безопасность (или скорее её отсутствие). Фактически результат исполнения программы не определен: возможно (хотя и маловероятно), "Done" будет напечатано дважды. Однако если мы поменяем порядок вызовов в методе Go(), шансы увидеть “Done” напечатанным два раза повышаются радикально:
static void Go() 
{
  if (!done)
  {
    Console.WriteLine("Done");
    done = true;
  }
}
Консольный вывод:
Done
Done (появляется в большинстве случаев!)
Проблема состоит в том, что один поток может выполнить оператор if, пока другой поток выполняет WriteLine, т.е. до того как done будет установлено в true.
Лекарство состоит в получении эксклюзивной блокировки на время чтения и записи разделяемых полей. C# обеспечивает это при помощи оператора lock:
class ThreadSafe 
{
  static bool done;
  static object locker = new object();
 
  static void Main() 
  {
    new Thread(Go).Start();
    Go();
  }
 
  static void Go() 
  {
    lock (locker) 
    {
      if (!done)
      {
        Console.WriteLine("Done");
        done = true;
      }
    }
  }
}
Когда два потока одновременно борются за блокировку (в нашем случае объекта locker), один поток переходит к ожиданию (блокируется), пока блокировка не освобождается. В данном случае это гарантирует, что только один поток может одновременно исполнять критическую секцию кода, и "Done" будет напечатано только один раз. Код, защищенный таким образом от неопределённости в плане многопоточного исполнения, называется потокобезопасным.
Временная приостановка (блокирование) – основной способ координации, или синхронизации действий потоков. Ожидание эксклюзивной блокировки – это одна из причин, по которым поток может блокироваться. Другая причина – если поток приостанавливается (Sleep) на заданный промежуток времени:
Thread.Sleep(TimeSpan.FromSeconds(30)); // Блокировка на 30 секунд
Также поток может ожидать завершения другого потока, вызывая его метод Join:
Thread t = new Thread(Go);     // Go – статический метод
t.Start();
t.Join();                       // Ожидаем завершения потока
Будучи блокированным, поток не потребляет ресурсов CPU.

Как работает многопоточность

Управление многопоточностью осуществляет планировщик потоков, эту функцию CLR обычно делегирует операционной системе. Планировщик потоков гарантирует, что активным потокам выделяется соответствующее время на выполнение, а потоки, ожидающие или блокированные, к примеру, на ожидании эксклюзивной блокировки, или пользовательского ввода – не потребляют времени CPU.
На однопроцессорных компьютерах планировщик потоков использует квантование времени – быстрое переключение между выполнением каждого из активных потоков. Это приводит к непредсказуемому поведению, как в самом первом примере, где каждая последовательность символов ‘X’ и ‘Y’ соответствует кванту времени, выделенному потоку. В Windows XP типичное значение кванта времени – десятки миллисекунд – выбрано как намного большее, чем затраты CPU на переключение контекста между потоками (несколько микросекунд).
На многопроцессорных компьютерах многопоточность реализована как смесь квантования времени и подлинного параллелизма, когда разные потоки выполняют код на разных CPU. Необходимость квантования времени все равно остается, так как операционная система должна обслуживать как свои собственные потоки, так и потоки других приложений.
Говорят, что поток вытесняется, когда его выполнение приостанавливается из-за внешних факторов типа квантования времени. В большинстве случаев поток не может контролировать, когда и где он будет вытеснен.

Потоки vs. процессы

Все потоки одного приложения логически содержатся в пределах процесса – модуля операционной системы, в котором исполняется приложение.
В некоторых аспектах потоки и процессы схожи – например, время разделяется между процессами, исполняющимися на одном компьютере, так же, как между потоками одного C#-приложения. Ключевое различие состоит в том, что процессы полностью изолированы друг от друга. Потоки разделяют память (кучу) с другими потоками этого же приложения. Благодаря этому один поток может поставлять данные в фоновом режиме, а другой – показывать эти данные по мере их поступления.

Когда использовать потоки

Типовое приложение с многопоточностью выполняет длительные вычисления в фоновом режиме. Главный поток продолжает выполнение, в то время как рабочий поток выполняет фоновую задачу. В приложениях Windows Forms, когда главный поток занят длительными вычислениями, он не может обрабатывать сообщения клавиатуры и мыши, и приложение перестает откликаться. По этой причине следует запускать отнимающие много времени задачи в рабочем потоке, даже если главный поток в это время демонстрирует пользователю модальный диалог с надписью “Работаю... Пожалуйста, ждите”, так как программа не может перейти к следующей операции, пока не закончена текущая. Такое решение гарантирует, что приложение не будет помечено операционной системой как “Не отвечающее”, соблазняя пользователя с горя прикончить процесс. Опять же, в этом случае модальный диалог может предоставить кнопку “Отмена”, так как форма продолжает получать сообщения, пока задача выполняется в фоновом потоке. Класс BackgroundWorker наверняка пригодится вам при реализации такой модели.
В случае приложений без UI, например, служб Windows, многопоточность имеет смысл, если выполняемая задача может занять много времени, так как требуется ожидание ответа от другого компьютера (сервера приложений, сервера баз данных или клиента). Запуск такой задачи в отдельном рабочем потоке означает, что главный поток немедленно освобождается для других задач.
Другое применение многопоточность находит в методах, выполняющих интенсивные вычисления. Такие методы могут выполняться быстрее на многопроцессорных компьютерах, если рабочая нагрузка разнесена по нескольким потокам (количество процессоров можно получить через свойство Environment.ProcessorCount).
C#-приложение можно сделать многопоточным двумя способами: либо явно создавая дополнительные потоки и управляя ими, либо используя возможности неявного создания потоков .NET Framework – BackgroundWorker, пул потоков, потоковый таймер, Remoting-сервер, Web-службы или приложение ASP.NET. В двух последних случаях альтернативы многопоточности не существует. Однопоточный web-сервер не просто плох, он попросту невозможен! К счастью, в случае серверов приложений, не хранящих состояние (stateless), многопоточность реализуется обычно довольно просто, сложности возможны разве что в синхронизации доступа к данным в статических переменных.

Когда потоки не нужны

Многопоточность наряду с достоинствами имеет и свои недостатки. Самое главный из них – значительное увеличение сложности программ. Сложность увеличивают не дополнительные потоки сами по себе, а необходимость организации их взаимодействия. От того, насколько это взаимодействие является преднамеренным, зависит продолжительность цикла разработки, а также количество спорадически проявляющихся и трудноуловимых ошибок в программе. Таким образом, нужно либо поддерживать дизайн взаимодействия потоков простым, либо не использовать многопоточность вообще, если только вы не имеете противоестественной склонности к переписыванию и отладке кода.
Кроме того, чрезмерное использование многопоточности отнимает ресурсы и время CPU на создание потоков и переключение между потоками. В частности, когда используются операции чтения/записи на диск, более быстрым может оказаться последовательное выполнение задач в одном или двух потоках, чем одновременное их выполнение в нескольких потоках. Далее будет описана реализация очереди Поставщик/Потребитель, предоставляющей такую функциональность.

Создание и запуск потоков

Для создания потоков используется конструктор класса Thread, принимающий в качестве параметра делегат типа ThreadStart, указывающий метод, который нужно выполнить. Делегат ThreadStart определяется так:
public delegate void ThreadStart();
Вызов метода Start начинает выполнение потока. Поток продолжается до выхода из исполняемого метода. Вот пример, использующий полный синтаксис C# для создания делегата ThreadStart:
class ThreadTest 
{
  static void Main() 
  {
    Thread t = new Thread(new ThreadStart(Go));
    t.Start();   // Выполнить Go() в новом потоке.
    Go();        // Одновременно запустить Go() в главном потоке.
  }

  static void Go() { Console.WriteLine("hello!"); }
В этом примере поток выполняет метод Go() одновременно с главным потоком. Результат – два почти одновременных «hello»:
hello!
hello!
Поток можно создать, используя для присваивания значений делегатам более удобный сокращенный синтаксис C#:
static void Main() 
{
  Thread t = new Thread(Go); // Без явного использования ThreadStart
  t.Start();
  ...
}

static void Go() { ... }
В этом случае делегат ThreadStart выводится компилятором автоматически. Другой вариант сокращенного синтаксиса использует анонимный метод для создания потока:
static void Main() 
{
  Thread t = new Thread(delegate() { Console.WriteLine("Hello!"); });
  t.Start();
}
Поток имеет свойство IsAlive, возвращающее true после вызова Start() и до завершения потока.
Поток, который закончил исполнение, не может быть начат заново.

Передача данных в ThreadStart

Допустим, что в рассматриваемом выше примере мы захотим более явно различать вывод каждого из потоков, например, по регистру символов. Можно добиться этого, передавая соответствующий флаг в метод Go(), но в этом случае нельзя использовать делегат ThreadStart, так он не принимает аргументов. К счастью, .NET Framework определяет другую версию делегата – ParameterizedThreadStart, которая может принимать один аргумент:
public delegate void ParameterizedThreadStart(object obj);
Предыдущий пример можно переписать так:
class ThreadTest 
{
  static void Main() 
  {
    Thread t = new Thread(Go);
    t.Start(true);             // == Go(true) 
    Go(false);
  }

  static void Go(object upperCase) 
  {
    bool upper = (bool)upperCase;
    Console.WriteLine(upper ? "HELLO!" : "hello!");
  }
}
Консольный вывод:
hello!
HELLO!
В этом примере компилятор автоматически выводит делегат ParameterizedThreadStart, так как метод Go() принимает в качестве параметра один object. С тем же успехом можно было написать:
Thread t = new Thread(new ParameterizedThreadStart(Go));
t.Start(true);
Особенность использования ParameterizedThreadStart состоит в том, что перед использованием нужно привести аргумент из типа object к нужному типу (в данном случае bool). К тому же существует только версия, принимающая единственный аргумент.
В качестве альтернативы можно использовать анонимный метод:
static void Main() 
{
  Thread t = new Thread(delegate(){ WriteText("Hello"); });
  t.Start();
}

static void WriteText(string text) { Console.WriteLine(text); }
Удобство состоит в том, что нужный метод (в данном случае WriteText) можно вызвать с любым количеством аргументов и безо всякого приведения типов. Однако нужно принять во внимание особенность семантики анонимных методов, связанную с внешней переменной, которая становится очевидной в следующем примере:
static void Main() 
{
  string text = "Before";
  Thread t = new Thread(delegate() { WriteText(text); });
  text = "After";
  t.Start();
}

static void WriteText(string text) { Console.WriteLine(text); }
Консольный вывод:
After
ПРЕДУПРЕЖДЕНИЕ
Анонимные методы открывают причудливые возможности непреднамеренного взаимодействия через внешние переменные, если они изменяются кем-либо после старта потока. Планового взаимодействия (обычно через поля класса) как правило более чем достаточно! Лучше всего, как только началось исполнение потока, рассматривать внешние переменные как переменные только для чтения – за исключением разве что реализаций с соответствующими блокировками на обеих сторонах.
Другой способ передачи данных в поток состоит в запуске в потоке метода определенного экземпляра объекта, а не статического метода. Тогда свойства выбранного экземпляра объекта будут определять поведение потока, как в следующем варианте оригинального примера:
class ThreadTest 
{
  bool upper;
 
  static void Main() 
  {
    ThreadTest instance1 = new ThreadTest();
    instance1.upper = true;
    Thread t = new Thread(instance1.Go);
    t.Start();
    ThreadTest instance2 = new ThreadTest();
    instance2.Go();  // Запуск в главном потоке - с upper=false
  }
 
  void Go(){ Console.WriteLine(upper ? "HELLO!" : "hello!"); }

Именование потоков

Поток можно поименовать, используя свойство Name. Это предоставляет большое удобство при отладке: имена потоков можно вывести в Console.WriteLine и увидеть в окне Debug – Threads в Microsoft Visual Studio. Имя потоку может быть назначено в любой момент, но только один раз – при попытке изменить его будет сгенерировано исключение.
Главному потоку приложения также можно назначить имя – в следующем примере доступ к главному потоку осуществляется через статическое свойство CurrentThread класса Thread:
class ThreadNaming 
{
  static void Main() 
  {
    Thread.CurrentThread.Name = "main";
    Thread worker = new Thread(Go);
    worker.Name = "worker";
    worker.Start();
    Go();
  }

  static void Go() 
  {
    Console.WriteLine("Hello from " + Thread.CurrentThread.Name);
  }
}
Консольный вывод:
Hello from main
Hello from worker

Основные и фоновые потоки

По умолчанию потоки создаются как основные, что означает, что приложение не будет завершено, пока один из таких потоков будет исполняться. C# также поддерживает фоновые потоки, они не продлевают жизнь приложению, а завершаются сразу же, как только все основные потоки будут завершены.
ПРИМЕЧАНИЕ
Изменение статуса потока с основного на фоновый не изменяет его приоритет или статус в планировщике потоков.
Статус потока переключается с основного на фоновый при помощи свойства IsBackground, как показано в следующем примере:
class PriorityTest 
{
  static void Main(string[] args) 
  {
    Thread worker = new Thread(delegate() { Console.ReadLine(); });

    if (args.Length > 0)
      worker.IsBackground = true;

    worker.Start();
  }
}
Если программа вызывается без аргументов, рабочий поток выполняется по умолчанию как основной поток и ожидает на ReadLine, пока пользователь не нажмет Enter. Тем временем главный поток завершается, но приложение продолжает исполняться, так как рабочий поток еще жив.
Если же программу запустить с аргументами командной строки, рабочий поток получит статус фонового и программа завершится практически сразу после завершения главного потока, с уничтожением потока, ожидающего ввода пользователя с помощью метода ReadLine.
Когда фоновый поток завершается таким способом, все блоки finally внутри потока игнорируются. Поскольку невыполнение кода в finally обычно нежелательно, будет правильно ожидать завершения всех фоновых потоков перед выходом из программы, назначив нужный таймаут (при помощи Thread.Join). Если по каким-то причинам рабочий поток не завершается за выделенное время, можно попытаться аварийно завершить его (Thread.Abort), а если и это не получится, позволить умереть ему вместе с процессом (также не помешает записать информацию о проблеме в лог).
Превращение рабочего потока в фоновый может быть последним шансом завершить приложение, так как не умирающий основной поток не даст приложению завершиться. Зависший основной поток особенно коварен в приложениях Windows Forms, так как приложение завершается, когда завершается его главный поток (по крайней мере, для пользователя), но его процесс продолжает выполняться. В диспетчере задач оно исчезнет из списка приложений, хотя имя его исполняемого файла останется в списке исполняющихся процессов. Пока пользователь не найдет и не прибьет его, процесс продолжит потреблять ресурсы и, возможно, будет препятствовать запуску или нормальному функционированию вновь запущенного экземпляра приложения.
ПРЕДУПРЕЖДЕНИЕ
Обычная причина появления приложений, которые не могут завершиться должным образом – это такие “забытые” основные потоки.

Приоритеты потоков

Свойство Priority определяет, сколько времени на исполнение будет выделено потоку относительно других потоков того же процесса. Существует 5 градаций приоритета потока:
enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }
Значение приоритета становится существенным, когда одновременно исполняются несколько потоков.
Установка приоритета потока на максимум еще не означает работу в реальном времени (real-time), так как существуют еще приоритет процесса приложения. Чтобы работать в реальном времени, нужно использовать класс Process из пространства имен System.Diagnostics для поднятия приоритета процесса (если что, я вам этого не говорил):
Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.High;
От ProcessPriorityClass.High один шаг до наивысшего приоритета процесса – Realtime. Устанавливая приоритет процесса в Realtime, вы говорите операционной системе, что хотите, чтобы ваш процесс никогда не вытеснялся. Если ваша программа случайно попадет в бесконечный цикл, операционная система может быть полностью заблокирована. Спасти вас в этом случае сможет только кнопка выключения питания. По этой причине ProcessPriorityClass.High считается максимальным приоритетом процесса, пригодным к употреблению.
Если real-time приложение имеет пользовательский интерфейс, может быть не желательно поднимать приоритет его процесса, так как обновление экрана будет съедать чересчур много времени CPU – тормозя весь компьютер, особенно если UI достаточно сложный. (Хотя, когда я это пишу, программа интернет-телефонии Skype не дает такого эффекта, может быть, потому, что ее интерфейс достаточно прост.) Уменьшение приоритета главного потока в сочетании с повышением приоритета процесса гарантирует, что real-time поток не будет вытесняться перерисовкой экрана, но не спасает от тормозов весь компьютер, так как операционная система все еще будет выделять много времени CPU всему процессу в целом. Идеальное решение состоит в том, чтобы держать работу в реальном времени и пользовательский интерфейс в различных процессах (с разными приоритетами), поддерживающих связь через Remoting или shared memory. Разделяемая память требует обращения к Win32 API (погуглите про CreateFileMapping и MapViewOfFile).

Обработка исключений

Обрамление кода создания и запуска потока блоками try/catch/finally имеет мало смысла. Посмотрите следующий пример:
public static void Main()
{
  try 
  {
    new Thread(Go).Start();
  }
  catch(Exception ex) 
  {
    // Сюда мы никогда не попадем!
    Console.WriteLine("Исключение!");
  }
 
  static void Go() { throw null; }
}
try/catch здесь фактически совершенно бесполезны, и NullReferenceException во вновь созданном потоке обработано не будет. Вы поймете почему, если вспомните, что поток имеет свой независимый путь исполнения. Решение состоит в добавлении обработки исключений непосредственно в метод потока:
public static void Main() 
{
  new Thread(Go).Start();
}
 
static void Go() 
{
  try 
  {
    ...
    throw null;      // это исключение будет поймано ниже
    ...
  }
  catch(Exception ex) 
  {
    Логирование исключения и/или сигнал другим потокам
    ...
  }
}
Начиная с .NET 2.0, необработанное исключение в любом потоке приводит к закрытию всего приложения, а значит игнорирование исключений – это не наш метод. Следовательно, блок try/catch необходим в каждом методе потока – по крайней мере, в приложениях не для собственного употребления – чтобы избежать закрытия приложения из-за необработанного исключения. Это может быть довольно обременительно, особенно для программистов Windows Forms, которые используют глобальный перехватчик исключений, как показано ниже:
using System;
using System.Threading;
using System.Windows.Forms;
 
static class Program 
{
  static void Main() 
  {
    Application.ThreadException += HandleError;
    Application.Run(new MainForm());
  }
 
  static void HandleError(object sender, ThreadExceptionEventArgs e) 
  {
    Логирование исключения, завершение или продолжение работы
  }
}
Событие Application.ThreadException возникает, когда исключение генерируется в коде, который был вызван (возможно, по цепочке) из обработчика сообщения Windows (например, от клавиатуры, мыши и т.д.) – короче говоря, практически из любого кода приложения Windows Forms. Поскольку это замечательно работает, появляется чувство ложной безопасности, - что все исключения будут обработаны этим центральным обработчиком. Исключения, возникающие в рабочих потоках – хороший пример исключений, которые не ловятся в Application.ThreadException (код в методе Main – другой такой пример, включая конструктор MainForm, отрабатывающий до запуска цикла обработки сообщений).
.NET Framework предоставляет низкоуровневое событие для глобальной обработки исключений - AppDomain.UnhandledException. Это событие происходит, когда есть необработанное исключение в любом потоке и для любых типов приложений (с пользовательским интерфейсом или без него). Однако, хотя это и хороший способ регистрации необработанных исключений, он не предоставляет никакого способа предотвратить закрытие приложения или подавить сообщение .NET о необработанном исключении.
Итак, явная обработка исключений требуется во всех потоковых методах. Упростить работу можно, используя классы-обертки, например, BackgroundWorker (рассматриваемый в 3-й части).

2. Базовые сведения о синхронизации

Важнейшие средства синхронизации

В следующих таблицах приведена информация об инструментах .NET для координации (синхронизации) потоков:
КонструкцияНазначение
SleepБлокировка на указанное время
JoinОжидание окончания другого потока
Простейшие методы блокировки
КонструкцияНазначениеДоступна из других процессов?Скорость
lockГарантирует, что только один поток может получить доступ к ресурсу или секции кода.нетбыстро
MutexГарантирует, что только один поток может получить доступ к ресурсу или секции кода. Может использоваться для предотвращения запуска нескольких экземпляров приложения.дасредне
SemaphoreГарантирует, что не более заданного числа потоков может получить доступ к ресурсу или секции кода.дасредне
Блокировочные конструкции
(Для автоматической блокировки также могут использоваться контексты синхронизации.)
КонструкцияНазначениеДоступна из других процессов?Скорость
EventWaitHandleПозволяет потоку ожидать сигнала от другого потока.дасредне
Wait and Pulse*Позволяет потоку ожидать, пока не выполнится заданное условие блокировки.нетсредне
Сигнальные конструкции
КонструкцияНазначениеДоступна из других процессов?Скорость
Interlocked*Выполнение простых не блокирующих атомарных операций.Да – через разделяемую памятьочень быстро
volatile*Для безопасного не блокирующего доступа к полям.Да – через разделяемую памятьочень быстро
Не блокирующие конструкции синхронизации
* - Рассматриваются в части 4

Блокировка

Когда поток остановлен в результате использования конструкций, перечисленных в вышеприведенных таблицах, говорят, что он блокирован. Будучи блокированным, поток немедленно перестает получать время CPU, устанавливает свойство ThreadState в WaitSleepJoin и остается в таком состоянии, пока не разблокируется. Разблокировка может произойти в следующих четырех случаях (кнопка выключения питания не считается!):
  • выполнится условие разблокировки;
  • истечет таймаут операции (если он был задан);
  • по прерыванию через Thread.Interrupt;
  • по аварийному завершению через Thread.Abort.
Поток не считается блокированным, если его выполнение приостановлено нерекомендуемым методом Suspend.

Sleeping и Spinning

Вызов Thread.Sleep блокирует текущий поток на указанное время (либо до прерывания):
static void Main()
{
  Thread.Sleep(0);   // отказаться от одного кванта времени CPU
  Thread.Sleep(1000);                   // заснуть на 1000 миллисекунд
  Thread.Sleep(TimeSpan.FromHours(1));  // заснуть на 1 час
  Thread.Sleep(Timeout.Infinite);       // заснуть до прерывания
}
Если быть более точным, Thread.Sleep отпускает CPU и сообщает, что потоку не должно выделяться время в указанный период. Thread.Sleep(0) отпускает CPU для выделения одного кванта времени следующему потоку в очереди на исполнение.
Уникальность Thread.Sleep среди других методов блокировки в том, что он приостанавливает прокачку сообщений Windows в приложениях Windows Forms или COM-окружении потока в однопоточном апартаменте. Из-за этого продолжительная блокировка главного (UI) потока приложения Windows Forms приводит к тому что приложение перестает откликаться – и следовательно, использования Thread.Sleep нужно избегать независимо от того, действительно ли прокачка очереди сообщений технически приостановлена. В старой COM-среде ситуация сложнее, там иногда может быть желательна блокировка при помощи Sleep с одновременной прокачкой очереди сообщений. Крис Брумм (Chris Brumme) из Microsoft подробно обсуждает это в своем блоге.
Класс Thread также предоставляет метод SpinWait, который не отказывается от времени CPU, а наоборот, загружает процессор в цикле на заданное количество итераций. 50 итераций эквивалентны паузе примерно в микросекунду, хотя это зависит от скорости и загрузки CPU. Технически SpinWait – не блокирующий метод: ThreadState такого потока не устанавливается в WaitSleepJoin, и поток не может быть прерван из другого потока. SpinWait редко используется – его главное применение это ожидание ресурса, который должен освободится очень скоро (в течении микросекунд) без вызова Sleep и траты процессорного времени на переключение потока. Однако эта методика выгодна только на многопроцессорных компьютерах, на однопроцессорном компьютере у ресурса нет никакого шанса освободиться, пока ожидающий на SpinWait поток не растратит остаток кванта времени, а значит, требуемый результат недостижим изначально. А частые или продолжительные вызовы SpinWait впустую растрачивает время CPU.

Блокирование против ожидания в цикле

Поток может ожидать выполнения некоторого условия, непосредственно прокручивая цикл проверки, например:
while (!proceed)
  ;
или:
while (DateTime.Now < nextStartTime)
  ;
Однако это очень расточительная трата процессорного времени: поскольку CLR и операционная система убеждены, что поток выполняет важные вычисления, ему выделяются соответствующие ресурсы. Поток, который крутится в таком состоянии, не считается заблокированным, в отличие от потока, ожидающего на EventWaitHandle (конструкции, обычно используемой для таких задач).
Иногда используется гибрид блокирования и ожидания в цикле:
while (!proceed)
  Thread.Sleep(x);    // "Spin-Sleeping!"
Чем больше x, тем эффективнее используется CPU. Платой за компромисс становится увеличение латентности. При превышении 20 мс накладные расходы незначительны – если условие в while не особенно сложное.
За исключением незначительной задержки эта комбинация блокирования и периодических опросов может работать весьма неплохо (вопросы параллельного доступа к флагу proceed рассматриваются в 4-й части). Возможно, самое частое её использование – когда программист уже потерял надежду запустить в работу более продвинутые сигнальные конструкции!

Ожидание завершения потока

Поток можно заблокировать до завершения другого потока вызовом метода Join:
class JoinDemo 
{
  static void Main() 
  {
    Thread t = new Thread(delegate() { Console.ReadLine(); });
    t.Start();
    t.Join();    // ожидать, пока поток не завершится
    Console.WriteLine("Thread t's ReadLine complete!");
  }
}
Метод Join может также принимать в качестве аргумента timeout - в миллисекундах или как TimeSpan. Если указанное время истекло, а поток не завершился, Join возвращает false. Join с timeout функционирует как Sleep – фактически следующие две строки кода приводят к одинаковому результату:
Thread.Sleep(1000);
Thread.CurrentThread.Join(1000);
(Отличие заметно только в однопоточных COM-апартаментах, и состоит в отношении к прокачке очереди сообщений Windows: Join не затрагивает прокачку сообщений, а Sleep ее приостанавливает.)

Блокирование и потоковая безопасность

Блокировка обеспечивает монопольный доступ и используется, чтобы обеспечить выполнение одной секции кода только одним потоком одновременно. Для примера рассмотрим следующий класс:
class ThreadUnsafe 
{
  static int val1, val2;
 
  static void Go() 
  {
    if (val2 != 0)
      Console.WriteLine(val1 / val2);
    val2 = 0;
  }
}
Он не является потокобезопасным: если бы метод Go вызывался двумя потоками одновременно, можно было бы получить ошибку деления на 0, так как переменная val2 могла быть установлена в 0 в одном потоке, в то время когда другой поток находился бы между if и Console.WriteLine.
Вот как при помощи блокировки можно решить эту проблему:
class ThreadSafe 
{
  static object locker = new object();
  static int val1, val2;
 
  static void Go() 
  {
    lock (locker) 
    {
      if (val2 != 0)
        Console.WriteLine(val1 / val2);

      val2 = 0;
    }
  }
}
Только один поток может единовременно заблокировать объект синхронизации (в данном случае locker), а все другие конкурирующие потоки будут приостановлены, пока блокировка не будет снята. Если за блокировку борются несколько потоков, они ставятся в очередь ожидания – "ready queue" – и обслуживаются, как только это становится возможным, по принципу “первым пришел – первым обслужен”. Эксклюзивная блокировка, как уже говорилось, обеспечивает последовательный доступ к тому, что она защищает, так что выполняемые потоки уже не могут наложиться друг на друга. В данном случае мы защитили логику внутри метода Go, так же, как и поля val1 и val2.
Поток, заблокированный на время ожидания освобождения блокировки, имеет свойство ThreadState, установленное в WaitSleepJoin. Позже мы обсудим, как поток, заблокированный в таком состоянии, может быть принудительно освобожден из другого потока вызовом методов Interrupt или Abort. Это достаточно мощная возможность, используемая обычно для завершения рабочего потока.
Оператор lock языка C# фактически является синтаксическим сокращением для вызовов методов Monitor.Enter и Monitor.Exit в рамках блоков try-finally. Вот во что фактически разворачивается реализация метода Go из предыдущего примера:
Monitor.Enter(locker);
try 
{
  if (val2 != 0)
    Console.WriteLine(val1 / val2);

  val2 = 0;
}

finally { Monitor.Exit(locker); }  
Вызов Monitor.Exit без предшествующего вызова Monitor.Enter для того же объекта синхронизации вызовет исключение.
Monitor также предоставляет метод TryEnter, позволяющий задать время ожидания в миллисекундах или в виде TimeSpan. Метод возвращает true, если блокировка была получена, и false, если блокировка не была получена за заданное время. TryEnter может также быть вызван без параметров и в этом случае возвращает управление немедленно.

Выбор объекта синхронизации

Любой объект, видимый взаимодействующим потокам, может быть использован как объект синхронизации, если это объект ссылочного типа. Также строго рекомендуется, чтобы объект синхронизации был private-полем класса, во избежание случайного воздействия внешнего кода, блокирующего этот объект. Согласно этим правилам, объектом синхронизации вполне может стать сам защищаемый объект, как, например, список в следующем примере:
class ThreadSafe() 
{
  List <string> list = new List <string>();
 
  void Test() 
  {
    lock (list) 
    {
      list.Add("Item 1");
      ...
Обычно используется выделенное поле (как locker в предыдущих примерах), так как это позволяет точнее контролировать область видимости и степень детализации блокировки. Использование объекта или типа в качестве объекта синхронизации, то есть:
lock (this)
{
  ...
}
или:
lock (typeof(Widget)) // Для защиты статических данных
{
  ...
}
не одобряется, так как предполагает public-область видимости объекта синхронизации.
Блокировка не запрещает вообще любой доступ к объекту. Другими словами, вызов x.ToString() не будет заблокирован из-за того, что другой поток вызвал lock(x) – чтобы произошла блокировка, оба потока должны вызвать lock(x).

Вложенные блокировки

Поток может неоднократно блокировать один и тот же объект многократными вызовами Monitor.Enter или вложенными lock-ами. Объект будет освобожден, когда будет выполнено соответствующее количество раз Monitor.Exit или произойдет выход из самой внешней конструкции lock. Поэтому допустима естественная семантика, когда один метод вызывает другой следующим образом:
static object x = new object();
 
static void Main() 
{
  lock (x) 
  {
     Console.WriteLine(“I have the lock”);
     Nest();
     Console.WriteLine(“I still have the lock”);
  } // Здесь блокировка будет снята!
}
 
static void Nest() 
{
  lock (x) 
  {
    ... 
  } 

  //Блокировка еще не снята!
}
Поток может быть блокирован только на самом первом, внешнем lock-е.

Когда блокировать

Как правило, любое поле, доступное нескольким потокам, должно читаться и записываться с блокировкой. Даже в самом простом случае, операции присваивания одиночному полю, необходима синхронизация. В следующем классе ни приращение, ни присваивание не потокобезопасны:
class ThreadUnsafe 
{
  static int x;
  static void Increment() { x++; }
  static void Assign()    { x = 123; }
}
А вот их потокобезопасные варианты:
class ThreadUnsafe 
{
  static object locker = new object();
  static int x;
 
  static void Increment()
  {
    lock (locker)
      x++;
  }

  static void Assign()
  {
    lock (locker)
      x = 123;
  }
}
В качестве альтернативы блокировке в таких простых случаях можно использовать неблокирующие конструкции синхронизации. Это рассматривается в части 4 (наряду с причинами, по которым в данном случае требуется синхронизация).
Блокировки и атомарность
Если группа переменных всегда читается и записывается в пределах одной блокировки, можно сказать, что переменные читаются и пишутся атомарно. Предположим, что поля x и y всегда читаются и пишутся с блокировкой на объекте locker:
lock (locker)
{
  if (x != 0)
    y /= x;
}
Можно сказать, что доступ к x и y атомарный, так как данный кусок кода не может быть прерван действиями другого потока, которые бы изменили x, y или результат операции. Невозможно получить ошибку деления на ноль, если обращение к x и y производится в эксклюзивной блокировке.

Соображения о производительности

Блокировка сама по себе очень быстра: она требует десятков наносекунд, если собственно блокирования не происходит. Если требуется блокирование, то последующее переключение задач занимает уже микросекунды или даже миллисекунды на перепланировку потоков. Однако сравните это с часами, которые вы должны будете потратить, не поставив lock там, где надо.
При неправильном использовании у блокировки могут быть и негативные последствия – уменьшение возможности параллельного исполнения потоков, взаимоблокировки, гонки блокировок. Возможности для параллельного исполнения уменьшаются, когда слишком много кода помещено в конструкцию lock, заставляя другие потоки простаивать все время, пока этот код исполняется. Взаимоблокировка наступает, когда каждый из двух потоков ожидает на блокировке другого потока и, таким образом, ни тот, ни другой не могут двинуться дальше. Гонкой блокировок называется ситуация, когда любой из двух потоков может первым получить блокировку, однако программа ломается, если первым это сделает “неправильный” поток.
Взаимоблокировка – общий синдром многих объектов синхронизации. Хорошее правило, помогающее избегать взаимоблокировок, состоит в том, чтобы начинать с блокировки минимального количества объектов, и увеличивать степень детализации блокировок, когда размер кода в блокировке чрезмерно увеличивается.

Потоковая безопасность

Потокобезопасный код – это код, не имеющий никаких неопределенностей при любых сценариях многопоточного исполнения. Потокобезопасность достигается прежде всего блокировками и сокращением возможностей взаимодействия между потоками.
Метод, который является потокобезопасным при любых сценариях, называется реентерабельным. Типы общего назначения редко являются полностью потокобезопасными по следующим причинам:
  • разработка с учетом полной потоковой безопасности может быть очень трудоемкой, особенно если тип имеет много полей (каждое поле потенциально может участвовать в многопоточном взаимодействии)
  • потоковая безопасность может сказаться на производительности (независимо от того, используется ли реально многопоточность)
  • потокобезопасный тип не обязательно делает использующую его программу потокобезопасной, а дальнейшая работа по ее обеспечению может сделать потокобезопасность типа избыточной.
Поэтому потокобезопасность реализуется обычно только там, где она действительно требуется в многопоточном сценарии.
Есть, однако, несколько обходных путей для получения больших и сложных классов, безопасных в многопоточном окружении. Один из них – пожертвовать деталировкой, блокируя большие секции кода, вплоть до целого объекта, и принуждая к последовательному доступу к нему на высоком уровне. Эта тактика также является ключевой при использовании непотокобезопасных объектов в потокобезопасном коде – точно так же обеспечивается эксклюзивная блокировка при доступе к любому свойству, методу или полю непотокобезопасного объекта.
ПРЕДУПРЕЖДЕНИЕ
Исключая примитивные типы, очень немногие типы .NET Framework безопасны для чего-то большего, чем доступ только для чтения. Потокобезопасноть обеспечивает разработчик – обычно используя эксклюзивные блокировки.
Другой обходной путь состоит в минимизацию взаимодействия потоков через минимизацию общих данных. Это превосходный подход, который используется в не имеющих состояния приложениях среднего звена и web-серверах. Поскольку запросы множества клиентов могут прийти одновременно, каждый запрос обрабатывается в своем собственном потоке (в соответствии с архитектурой ASP.NET, Web-служб и Remoting), и это означает, что вызываемые при этом методы должны быть потокобезопасны. Дизайн без использования состояния (популярный по причине универсальности) действительно ограничивает взаимодействие, так как классы не хранят данные между запросами. Взаимодействие потоков ограничено только статическими полями, созданными, например, для кэширования часто используемых данных, и предоставляемыми инфраструктурой сервисами типа аутентификации и аудита.
Потокобезопасность и типы .NET Framework
Для преобразования кода в потокобезопасный можно использовать блокировки. Хороший пример - почти все непримитивные типы .NET Framework непотокобезопасны, и все же они могут использоваться в многопоточном коде, если любой доступ к любому объекту защищен блокировкой. Вот пример, в котором два потока одновременно добавляют элементы в один и тот же список, а затем перечисляют все элементы списка:
class ThreadSafe() 
{
  static List <string> list = new List <string>();
 
  static void Main() 
  {
    new Thread(AddItems).Start();
    new Thread(AddItems).Start();
  }
 
  static void AddItems() 
  {
    for (int i = 0; i < 100; i++)
      lock (list)
        Add("Item " + list.Count);
 
    string[] items;

    lock (list)
      items = list.ToArray();

    foreach (string s in items)
      Console.WriteLine(s);
  }
}
В данном случае блокировка происходит на самом объекте-списке, что прекрасно работает в этом простом сценарии. В случае двух взаимодействующих списков блокировку пришлось бы делать на одном общем объекте, возможно, выделенном поле, если бы один из списков не проявил себя как явный кандидат.
Перечисление .NET-коллекций также не является потокобезопасной операцией, так как если другой поток меняет список в процессе перечисления, генерируется исключение. Чтобы не ставить блокировку на весь процесс перечисления, в данном примере элементы копируются в массив. Это позволяет избежать чрезмерной блокировки в том случае, если действия с элементами при перечислении отнимают слишком много времени.
А вот интересный вопрос: если бы класс List был полностью потокобезопасным, что это изменило бы? Потенциально очень немногое! Для примера рассмотрим добавление элемента к нашему гипотетическому потокобезопасному списку:
if (!myList.Contains(newItem)) myList.Add(newItem);
Независимо от потокобезопасности собственно списка данная конструкция определенно не потокобезопасна! Заблокирован должен быть весь этот код целиком, чтобы предотвратить вытеснение потока между проверкой и добавлением нового элемента. Также блокировка должна быть использована везде, где изменяется список. К примеру, следующая конструкция должна быть обернута в блокировку:
myList.Clear();
для гарантии, что ее исполнение не будет прервано. Другими словами блокировки пришлось бы использовать точно так же, как с существующими непотокобезопасными классами. Встроенная потокобезопасность фактически была бы бесполезной тратой времени!
Этот момент может быть спорным при написании заказных компонентов – зачем нужна встроенная потокобезопасность, если она, скорее всего, окажется избыточной?
Есть и контраргумент: внешняя блокировка объекта работает, только если все конкурирующие потоки знают о ее необходимости и используют ее – а это может быть не так при широком использовании объекта. Хуже всего дела обстоят со статическими полями в публичных типах. Для примера представьте, что статическое свойство структуры DateTime – DateTime.Now – непотокобезопасное, и два параллельных запроса могут привести к неправильным результатам или исключению. Единственная возможность исправить положение с использованием внешней блокировки - lock(typeof(DateTime)) при каждом обращении к DateTime.Now – сработала бы, если бы все программисты согласились делать так и только так. Но это вряд ли возможно, потому что многие считают блокировку типа Плохой Штукой.
По этой причине статические поля структуры DateTime гарантированно потокобезопасны. Это обычное поведение типов в .NET Framework – статические члены потокобезопасны, нестатические – нет. Так же следует проектировать и собственные типы – во избежание неразрешимых загадок потокобезопасности.
СОВЕТ
При создании компонентов для широкого использования хорошая политика состоит в том, чтобы программировать, по крайней мере, не препятствуя потокобезопасности. Это означает, что нужно быть особенно осторожным со статическими типами - неважно, используются ли они только приватно или доступны снаружи.

Interrupt и Abort

Заблокированный поток может быть преждевременно разблокирован двумя путями:
  • С помощью Thread.Interrupt.
  • С помощью Thread.Abort.
Это должно быть сделано из другого потока; ожидающий поток бессилен что-либо сделать в блокированном состоянии.

Interrupt

Вызов Interrupt для блокированного потока принудительно освобождает его с генерацией исключения ThreadInterruptedException, как показано в следующем примере:
class Program 
{
  static void Main() 
  {
    Thread t = new Thread(delegate() 
      {
        try 
        {
          Thread.Sleep(Timeout.Infinite);
        }
        catch(ThreadInterruptedException) 
        {
          Console.Write("Forcibly ");
        }

        Console.WriteLine("Woken!");
      });
 
    t.Start();
    t.Interrupt();
  }
}
Консольный вывод:
Forcibly Woken!
Прерывание потока освобождает его только от текущего (или следующего) ожидания, но не завершает поток (если, конечно, ThreadInterruptedException не останется необработанным).
Если Interrupt вызывается для неблокированного потока, поток продолжает исполнение до точки следующей блокировки, в которой и генерируется исключение ThreadInterruptedException. Это поведение освобождает от необходимости вставлять проверки:
if ((worker.ThreadState & ThreadState.WaitSleepJoin) > 0)
  worker.Interrupt();
которые не являются потокобезопасными, так как могут быть прерваны другим потоком между оператором if и worker.Interrupt.
Вызов Interrupt без должных на то оснований таит в себе опасность, так как любой метод framework-а, или другой сторонний метод в стеке вызовов может получить его раньше, чем ваш код, которому он предназначался. Все, что для этого требуется – чтобы поток хотя бы кратковременно встал на простой блокировке или синхронизации доступа к ресурсу, и любой ждущий своего часа Interrupt тут же сработает. Если метод изначально не разрабатывался с учетом возможности такого прерывания (с соответствующим кодом очистки в блоках finally), объекты могут остаться в неработоспособном состоянии, или ресурсы будут освобождены не полностью.
Прерывать исполнение потока безопасно, если вы точно знаете, чем сейчас занят поток. Позже мы рассмотрим сигнальные конструкции, обеспечивающие такую возможность.

Abort

Блокированный поток также может быть принудительно освобожден при помощи метода Abort. Эффект аналогичен Interrupt, только вместо ThreadInterruptedException генерируется ThreadAbortException. Кроме того, это исключение будет повторно сгенерировано в конце блока catch (в попытке успокоить поток навеки), если только в блоке catch не будет вызван Thread.ResetAbort. До вызова Thread.ResetAbort ThreadState будет иметь значение AbortRequested.
Большое отличие между Interrupt и Abort состоит в том, что происходит, если их вызвать для неблокированного потока. Если Interrupt ничего не делает, пока поток не дойдет до следующей блокировки, то Abort генерирует исключение непосредственно в том месте, где сейчас находится поток – может быть, даже не в вашем коде. Аварийное завершение неблокированного потока может иметь существенные последствия, которые подробнее рассматриваются ниже, в разделе "Аварийное завершение потоков".

Состояния потока


Рисунок 1: Диаграмма состояний потока
Запросить состояние потока можно с помощью его свойства ThreadState. Рисунок 1 демонстрирует “уровни” перечисления ThreadState. ThreadState спроектирован ужасно, это комбинация трех уровней состояний с использованием битовых флагов, члены перечисления в пределах каждого уровня являются взаимоисключающими. Вот эти три уровня:
  • running/blocking/aborting (как показано на рисунке 1).
  • Background/foreground (ThreadState.Background).
  • Запрос приостановки/приостановка с использованием не рекомендуемого метода Suspend (ThreadState.SuspendRequested и ThreadState.Suspended).
В результате ThreadState – битовая комбинация из членов каждого уровня! Вот примеры ThreadState:
  • Unstarted.
  • Running.
  • WaitSleepJoin.
  • Background, Unstarted.
  • SuspendRequested, Background, WaitSleepJoin.
Перечисление также имеет два члена, которые никогда не используются в текущей реализации CLR: StopRequested и Aborted.
И это еще не все. ThreadState.Running имеет значение 0, так что следующее выражение работать не будет:
if ((t.ThreadState & ThreadState.Running) > 0) ...
и вместо этого нужно проверять наличие ThreadState.Running путем исключения или в качестве альтернативы, использовать свойство IsAlive. Свойство IsAlive, однако, может вам не подойти, так как возвращает true, если поток блокирован или приостановлен (false возвращается только до того, как поток начался, и после того, как он завершится).
Если не принимать во внимание нерекомендуемые методы Suspend и Resume, можно написать обертку, скрывающую почти все члены перечисления первого уровня, и делающую возможным простой тест ThreadState:
public static ThreadState SimpleThreadState(ThreadState ts)
{
  return ts & ThreadState.Aborted & ThreadState.AbortRequested
            & ThreadState.Stopped & ThreadState.Unstarted
            & ThreadState.WaitSleepJoin;
}
ThreadState неоценим при отладке или поиске узких мест в производительности. Однако он плохо подходит для координации действий нескольких потоков, так как не существует механизма, который бы позволил протестировать ThreadState и затем действовать на основе этой информации без потенциального изменения ThreadState в этот промежуток времени.

Wait Handles

Оператор lock (aka Monitor.Enter/Monitor.Exit) – один из примеров конструкций синхронизации потоков. Lock является самым подходящим средством для организации монопольного доступа к ресурсу или секции кода, но есть задачи синхронизации (типа подачи сигнала начала работы ожидающему потоку), для которых lock будет не самым адекватным и удобным средством.
В Win32 API имеется богатый набор конструкций синхронизации, и они доступны в .NET Framework в виде классов EventWaitHandleMutex и Semaphore. Некоторые из них практичнее других: Mutex, например, по большей части дублирует возможности lock, в то время как EventWaitHandle предоставляет уникальные возможности сигнализации.
Все три класса основаны на абстрактном классе WaitHandle, но весьма отличаются по поведению. Одна из общих особенностей – это способность именования, делающая возможной работу с потоками не только одного, но и разных процессов.
EventWaitHandle имеет два производных класса – AutoResetEvent и ManualResetEvent (не имеющие никакого отношения к событиям и делегатам C#). Обоим классам доступны все функциональные возможности базового класса, единственное отличие состоит в вызове конструктора базового класса с разными параметрами.
В части производительности, все WaitHandle обычно исполняются в районе нескольких микросекунд. Это редко имеет значение с учетом контекста, в котором они применяются.
AutoResetEvent – наиболее часто используемый WaitHandle-класс и основная конструкция синхронизации, наряду с lock.

AutoResetEvent

AutoResetEvent очень похож на турникет – один билет позволяет пройти одному человеку. Приставка “auto” в названии относится к тому факту, что открытый турникет автоматически закрывается или “сбрасывается” после того, как позволяет кому-нибудь пройти. Поток блокируется у турникета вызовом WaitOne (ждать (wait) у данного (one) турникета, пока он не откроется), а билет вставляется вызовом метода Set. Если несколько потоков вызывают WaitOne, за турникетом образуется очередь. Билет может “вставить” любой поток – другими словами, любой (неблокированный) поток, имеющий доступ к объекту AutoResetEvent, может вызвать Set, чтобы пропустить один блокированный поток.
Если Set вызывается, когда нет ожидающих потоков, хэндл будет находиться в открытом состоянии, пока какой-нибудь поток не вызовет WaitOne. Эта особенность помогает избежать гонок между потоком, подходящим к турникету, и потоком, вставляющим билет (“опа, билет вставлен на микросекунду раньше, очень жаль, но вам придется подождать еще сколько-нибудь!”). Однако многократный вызов Set для свободного турникета не разрешает пропустить за раз целую толпу – сможет пройти только один человек, все остальные билеты будут потрачены впустую.
WaitOne принимает необязательный параметр timeout – метод возвращает false, если ожидание заканчивается по таймауту, а не по получению сигнала. WaitOne также можно обучить выходить из текущего контекста синхронизации для продолжения ожидания (если используется режим с автоматической блокировкой) во избежание чрезмерного блокирования.
Метод Reset обеспечивает закрытие открытого турникета, безо всяких ожиданий и блокировок.
AutoResetEvent может быть создан двумя путями. Во-первых, с помощью своего конструктора:
EventWaitHandle wh = new AutoResetEvent(false);
Если аргумент конструктора true, метод Set будет вызван автоматически сразу после создания объекта.
Другой метод состоит в создании объекта базового класса, EventWaitHandle:
EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.Auto);
Конструктор EventWaitHandle также может использоваться для создания объекта ManualResetEvent (если задать в качестве параметра EventResetMode.Manual).
Метод Close нужно вызывать сразу же, как только WaitHandle станет не нужен – для освобождения ресурсов операционной системы. Однако если WaitHandle используется на протяжении всей жизни приложения (как в большинстве примеров этого раздела), этот шаг можно опустить, так как он будет выполнен автоматически при разрушении домена приложения.
В следующем примере запускается рабочий поток, который просто ожидает сигнала от другого потока.
class BasicWaitHandle 
{
  static EventWaitHandle wh = new AutoResetEvent(false);
 
  static void Main() 
  {
    new Thread(Waiter).Start();
    Thread.Sleep(1000);                 // Подождать некоторое время...
    wh.Set();                            // OK – можно разбудить
  }

  static void Waiter() 
  {
    Console.WriteLine("Ожидание...");
    wh.WaitOne();                        // Ожидать сигнала
    Console.WriteLine("Получили сигнал");
  }
}
Консольный вывод:
Ожидание... (пауза) Получили сигнал
Создание межпроцессных EventWaitHandle
Конструктор EventWaitHandle также позволяет создавать именованные EventWaitHandle, способные действовать через границы процессов. Имя задается обыкновенной строкой и может быть любым. Если задаваемое имя уже используется на компьютере, возвращается ссылка на существующий EventWaitHandle, в противном случае операционная система создает новый. Вот пример:
EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.Auto,
  "MyCompany.MyApp.SomeName");
Исполнив этот код, два приложения получили бы возможность сигнализировать друг другу из любого потока обоих процессов.
Получите и распишитесь
Предположим, мы хотим исполнять задачи в фоновом режиме без затрат на создание каждый раз нового потока для новой задачи. Этой цели можно достигнуть, используя единственный рабочий поток с постоянным циклом, ожидающим появление задачи. Получив задачу, он приступает к ее выполнению. После окончания выполнения поток снова переходит в режим ожидания. Это обычный многопоточный сценарий. Наряду с избавлением от накладных расходов на создание потоков мы получаем последовательно исполняющиеся задачи, устраняя потенциальные проблемы взаимодействия между потоками и чрезмерное потребление ресурсов.
Однако нужно решить что делать, если рабочий поток еще занят исполнением предыдущей задачи, а уже появилась следующая. Можно, например, блокировать исполнение, пока не завершена предыдущая задача. Это можно реализовать, используя два объекта типа AutoResetEvent – ready, который открывается (устанавливается путем вызова метода Set) рабочим потоком, когда он готов к работе, и go, который открывается вызывающим потоком, когда появляется новая задача. В следующем примере для демонстрации задачи используется простое строковое поле (объявленное с ключевым словом volatile для гарантии того, что оба потока будут видеть его в одном и том же состоянии):
class AcknowledgedWaitHandle 
{
  static EventWaitHandle ready = new AutoResetEvent(false);
  static EventWaitHandle go = new AutoResetEvent(false);
  static volatile string task;
 
  static void Main() 
  {
    new Thread(Work).Start();
 
    // Сигнализируем рабочему потоку 5 раз
    for (int i = 1; i <= 5; i++) 
    {
      ready.WaitOne();  // Сначала ждем, когда рабочий поток будет готов
      task = "a".PadRight(i, 'h'); // Назначаем задачу
      go.Set();         // Говорим рабочему потоку, что можно начинать
    }
 
    // Сообщаем о необходимости завершения рабочего потока,
    // используя null-строку
    ready.WaitOne();
    task = null;
    go.Set();
  }
 
  static void Work() 
  {
    while (true) 
    {
      ready.Set();                  // Сообщаем о готовности
      go.WaitOne();                 // Ожидаем сигнала начать...

      if (task == null)
        return;                     // Элегантно завершаемся

      Console.WriteLine(task);
    }
  }
}
Консольный вывод:
ah
ahh
ahhh
ahhhh
Обратите внимание, что для завершения рабочего потока используется задача со значением null. Для рабочего потока в данном случае успешно можно было бы использовать вызов Interrupt или Abort – но только сразу после ready.WaitOne, так как в этом случае нам известно состояние рабочего потока – непосредственно перед go.WaitOne или на этом вызове – и можно избежать осложнений при прерывании неизвестного кода. Использование Interrupt или Abort также потребовало бы добавить обработку исключений в рабочем потоке.
Очередь Поставщик/Потребитель (Producer/Consumer)
Еще один распространенный сценарий работы с потоками – фоновая обработка задач из очереди. Это называется очередью Поставщик/Потребитель: Поставщик ставит задачи в очередь, Потребитель извлекает задачи из очереди в рабочем потоке. Очень похоже на предыдущий пример, за исключением того, что вызывающий поток не блокируется, если рабочий все еще занят предыдущей задачей.
Очередь Поставщик/Потребитель масштабируема – потребителей может быть несколько, каждый обслуживает одну и ту же очередь, но в своем потоке. Это хороший способ использования преимуществ многопроцессорных систем, но оно все же ограничивает количество параллельно работающих потоков, чтобы избежать известных подводных камней (издержек на переключение контекста и борьбы за ресурсы).
В следующем примере единственный AutoResetEvent используется для сигнализации рабочему потоку, который приостанавливается, только если ему больше нечего исполнять (очередь пуста). Для очереди используется Queue<>, доступ к ней защищен блокировкой для обеспечения потоковой безопасности. Рабочий поток завершается, если встречает null-задачу:
using System;
using System.Threading;
using System.Collections.Generic;
 
class ProducerConsumerQueue : IDisposable 
{
  EventWaitHandle wh = new AutoResetEvent(false);
  Thread worker;
  object locker = new object();
  Queue<string> tasks = new Queue<string>();
 
  public ProducerConsumerQueue() 
  {
    worker = new Thread(Work);
    worker.Start();
  }
 
  public void EnqueueTask(string task)
  {
    lock (locker)
      tasks.Enqueue(task);

    wh.Set();
  }
 
  public void Dispose() 
  {
    EnqueueTask(null);      // Сигнал Потребителю на завершение
    worker.Join();          // Ожидание завершения Потребителя
    wh.Close();             // Освобождение ресурсов
  }
 
  void Work() 
  {
    while (true) 
    {
      string task = null;

      lock (locker)
      {
        if (tasks.Count > 0) 
        {
          task = tasks.Dequeue();
          if (task == null)
            return;
        }
      }

      if (task != null) 
      {
        Console.WriteLine("Выполняется задача: " + task);
        Thread.Sleep(1000); // симуляция работы...
      }
      else
        wh.WaitOne();       // Больше задач нет, ждем сигнала...
    }
  }
}
А это код тестирования очереди:
class Test 
{
  static void Main() 
  {
    using(ProducerConsumerQueue q = new ProducerConsumerQueue()) 
    {
      q.EnqueueTask("Привет!");

      for (int i = 0; i < 10; i++)
        q.EnqueueTask("Сообщение " + i);

      q.EnqueueTask("Пока!");
    }
    // Выход из using приводит к вызову Dispose, который ставит
    // в очередь null-задачу и ожидает, пока Потребитель не завершится.
  }
}
Консольный вывод:
Выполняется задача: Привет!
Выполняется задача: Сообщение 0
Выполняется задача: Сообщение 1
Выполняется задача: Сообщение 2
Выполняется задача: Сообщение 3
Выполняется задача: Сообщение 4
Выполняется задача: Сообщение 5
Выполняется задача: Сообщение 6
Выполняется задача: Сообщение 7
Выполняется задача: Сообщение 8
Выполняется задача: Сообщение 9
Выполняется задача: Пока!
Обратите внимание, что WaitHandle явно закрывается, когда для ProducerConsumerQueue вызывается Dispose(), так как в течении жизни приложения возможно создание и разрушение многих объектов типа ProducerConsumerQueue.

ManualResetEvent

ManualResetEvent – это разновидность AutoResetEvent. Отличие состоит в том, что он не сбрасывается автоматически, после того как поток проходит через WaitOne, и действует как шлагбаум – Set открывает его, позволяя пройти любому количеству потоков, вызвавших WaitOneReset закрывает шлагбаум, потенциально накапливая очередь ожидающих следующего открытия.
Эту функциональность можно эмулировать при помощи булевой переменной "gateOpen" (объявленной как volatile) в комбинации со "spin-sleeping" – повторением проверок флага и ожидания в течении короткого промежутка времени.
ManualResetEvent может использоваться для сигнализации о завершении какой-либо операции или инициализации потока и готовности к выполнению работы.

Mutex

Мьютекс обеспечивает те же самые функциональные возможности, что и оператор lock в C#, что делает его не очень востребованным. Единственное преимущество состоит в том, что Mutex доступен из разных процессов, обеспечивая блокировку на уровне компьютера, в отличии от оператора lock, который действует только на уровне приложения.
Mutex относительно быстр, но lock быстрее в сотни раз. Получение мьютекса занимает несколько микросекунд, вызов lock – десятки наносекунд (если не происходит собственно блокировки).
Метод WaitOne для Mutex получает исключительную блокировку, блокируя поток, если это необходимо. Исключительная блокировка может быть снята вызовом метода ReleaseMutex. Точно также как оператор lock в C#, Mutex может быть освобожден только из того же потока, что его захватил.
Типовое использование мьютекса для взаимодействия процессов – обеспечение возможности запуска только одного экземпляра программы единовременно. Вот как это делается:
class OneAtATimePlease 
{
  // Используем уникальное имя приложения,
  // например, с добавлением имени компании
  static Mutex mutex = new Mutex(false, "oreilly.com OneAtATimeDemo");
  
  static void Main() 
  {
    // Ожидаем получения мьютекса 5 сек – если уже есть запущенный
    // экземпляр приложения - завершаемся.
    if (!mutex.WaitOne(TimeSpan.FromSeconds(5), false)) 
    {
      Console.WriteLine("В системе запущен другой экземпляр программы!");
      return;
    }

    try 
    {
      Console.WriteLine("Работаем - нажмите Enter для выхода...");
      Console.ReadLine();
    }
    finally { mutex.ReleaseMutex(); }
  }
}
Полезное свойство Mutex-а – если приложение завершается без вызова ReleaseMutex, CLR освобождает мьютекс автоматически.

Semaphore

Semaphore похож на ночной клуб – он имеет определенную вместимость, которую обеспечивает вышибала. После заполнения никто уже не может войти в ночной клуб, очередь образуется снаружи. Далее, если один человек покидает клуб, один из начала очереди может пройти внутрь. Конструктор Semaphore принимает минимум два параметра – число еще доступных мест и общую вместимость ночного клуба.
Semaphore с емкостью, равной единице, подобен Mutex или lock, за исключением того, что он не имеет потока-хозяина. Любой поток может вызвать Release для Semaphore, в то время как в случае с Mutex или lock только поток, захвативший ресурс, может его освободить.
В следующем примере по очереди запускаются десять потоков, выполняющих вызов SleepSemaphore гарантирует, что не более трех потоков могут вызвать Sleep одновременно:
class SemaphoreTest 
{
  static Semaphore s = new Semaphore(3, 3);  // Available=3; Capacity=3
 
  static void Main() 
  {
    for (int i = 0; i < 10; i++)
      new Thread(Go).Start();
  }
 
  static void Go() 
  {
    while (true) 
    {
      s.WaitOne();
      // Только 3 потока могут находиться здесь одновременно
      Thread.Sleep(100);
      s.Release();
    }
  }
}

WaitAny, WaitAll и SignalAndWait

Кроме Set и WaitOne, есть еще несколько статических методов класса WaitHandle для более крепких орешков синхронизации.
Методы WaitAnyWaitAll и SignalAndWait облегчают взаимодействие нескольких WaitHandle, возможно разных типов.
SignalAndWait, возможно, самый полезный метод – он в рамках единой атомарной операции вызывает WaitOne для одного WaitHandle, и Set – для другого.
Классическим вариантом использования этого метода является использование с парой EventWaitHandle для подготовки встречи двух потоков в нужной точке в нужное время. Подойдут и AutoResetEvent и ManualResetEvent. Первый поток делает следующее:
WaitHandle.SignalAndWait(wh1, wh2);
в то время как второй поток – наоборот:
WaitHandle.SignalAndWait(wh2, wh1);
WaitHandle.WaitAny ожидает освобождения одного (любого) WaitHandle из переданного ему списка, WaitHandle.WaitAll ожидает освобождения сразу всех переданных ему WaitHandle. Используя аналогию с турникетом метро, эти методы организуют общую очередь одновременно для всех турникетов – с прохождением через первый открывшийся турникет (WaitAny) или с ожиданием, пока они не откроются все (WaitAll).
На самом деле, значение WaitAll сомнительно из-за странной связи с потоковыми апартаментами, унаследованными от COM-архитектуры. WaitAll требует, чтобы вызывающий поток находился в многопоточном апартаменте, в то время как эта модель может быть наименее подходящей, особенно для приложений Windows Forms, которые должны выполнять столь мирские задачи, как взаимодействие с буфером обмена.
К счастью, .NET Framework обеспечивает более продвинутый сигнальный механизм для случаев, когда WaitHandle являются неудобными или неподходящими – Monitor.Wait и Monitor.Pulse.

Контексты синхронизации

Вместо ручной блокировки можно осуществлять блокировки декларативно. Используя наследование от ContextBoundObject и применяя атрибут Synchronization, можно поручить CLR делать блокировки автоматически. Вот пример:
using System;
using System.Threading;
using System.Runtime.Remoting.Contexts;
 
[Synchronization]
public class AutoLock : ContextBoundObject 
{
  public void Demo() 
  {
    Console.Write("Старт...");
    Thread.Sleep(1000);        // Поток не может быть вытеснен здесь
    Console.WriteLine("стоп"); // спасибо автоматической блокировке!
  } 
}
 
public class Test 
{
  public static void Main() 
  {
    AutoLock safeInstance = new AutoLock ();
    new Thread(safeInstance.Demo).Start(); // Запустить метод 
    new Thread(safeInstance.Demo).Start(); // Demo 3 раза
    safeInstance.Demo();                   // одновременно.
  }
}
Консольный вывод:
Старт...стоп
Старт...стоп
Старт...стоп
В этом случае CLR гарантирует, что только один поток сможет одновременно исполнять код в safeInstance. CLR делает это, создавая отдельный объект синхронизации и блокируя его при каждом вызове метода или свойства safeInstance. Область блокировки, в данном случае – объект safeInstance, называют контекстом синхронизации.
Как это работает? Ответ находится в пространстве имен атрибута Synchronization – System.Runtime.Remoting.Contexts. О ContextBoundObject можно думать как об “удаленном” объекте, перехватывающем все вызовы методов. Чтобы сделать этот перехват возможным, CLR, когда создается AutoLock, фактически возвращает прокси-объект со всеми методами и свойствами AutoLock, который работает как посредник. Именно через это посредничество и работает автоблокировка. В целом перехват добавляет около микросекунды к вызову каждого метода.
Автоматическая синхронизация не может быть использована для защиты членов статических типов и классов, не являющихся наследниками ContextBoundObject (например, Windows Form).
Для внутренних вызовов блокировка работает так же, как и для вызовов извне. Казалось бы, можно ожидать, что следующий пример будет работать точно так же, как и предыдущий:
[Synchronization]
public class AutoLock : ContextBoundObject 
{
  public void Demo()
  {
    Console.Write("Start...");
    Thread.Sleep(1000);
    Console.WriteLine("end");
  }
 
  public void Test()
  {
    new Thread(Demo).Start();
    new Thread(Demo).Start();
    new Thread(Demo).Start();
    Console.ReadLine();
  }
 
  public static void Main() 
  {
    new AutoLock ().Test();
  }
}
(Обратите внимание, куда вкрался вызов Console.ReadLine). Поскольку в некий момент времени только один поток может выполнять код данного объекта, три новых потока блокированы на вызове метода Demo, пока метод Test не будет завершен – а для этого нужно, чтобы завершился вызов Console.ReadLine. Следовательно, результат будет такой же, как в предыдущем примере, но только после нажатия клавиши Enter. Этот потокобезопасный молоток достаточно велик, чтобы воспрепятствовать любой полезной многопоточности внутри класса!
Кроме того, мы не решили проблему, описанную ранее: если бы AutoLock был классом коллекции, например, все еще требовался бы lock вокруг следующего выражения, вызванного из другого класса:
if (safeInstance.Count > 0)
  safeInstance.RemoveAt(0);
за исключением случая, когда этот класс сам синхронизирован при помощи ContextBoundObject.
Контекст синхронизации может распространяться за пределы одиночного объекта. По умолчанию, если синхронизированный объект создается в коде другого объекта, оба разделяют один контекст (другими словами, одну большую блокировку!) Это поведение можно изменить, задавая флаг в конструкторе атрибута Synchronization с использованием констант, определенных в классе SynchronizationAttribute:
КонстантаЗначение
NOT_SUPPORTEDЭквивалентно неиспользованию атрибутов синхронизации.
SUPPORTEDПрисоединиться к существующему контексту синхронизации, если создание происходит из синхронизированного объекта, иначе не использовать синхронизацию.
REQUIRED(default)Присоединиться к существующему контексту синхронизации, если создание происходит из синхронизированного объекта, иначе создать новый контекст.
REQUIRES_NEWВсегда создавать новый контекст синхронизации.
Так, если объект класса SynchronizedA создает объект класса SynchronizedB, у них будут разные контексты синхронизации, если SynchronizedB декларирован следующим образом:
[Synchronization(SynchronizationAttribute.REQUIRES_NEW)]
public class SynchronizedB : ContextBoundObject
{
  ...
Чем больше расширяется контекст синхронизации, тем проще управление, но и тем меньше возможностей для полезного параллелизма. С другой стороны, отдельные контексты синхронизации чреваты взаимоблокировками. Вот пример:
using System;
using System.Runtime.Remoting.Contexts;
using System.Threading;

[Synchronization]
public class Deadlock : ContextBoundObject
{
  public Deadlock Other;

  public void Demo()
  {
    Thread.Sleep(1000);
    Other.Hello();
  }

  void Hello() { Console.WriteLine("hello"); }
}

public class Test
{
  static void Main()
  {
    Deadlock dead1 = new Deadlock();
    Deadlock dead2 = new Deadlock();
    dead1.Other = dead2;
    dead2.Other = dead1;
    new Thread(dead1.Demo).Start();
    dead2.Demo();
  }
}
Поскольку каждый экземпляр Deadlock создается внутри несинхронизированного класса Test, каждый экземпляр получает свой контекст синхронизации, и, следовательно, свою собственную блокировку. Когда оба объекта вызывают методы друг друга, взаимоблокировки долго ждать не приходится (одну секунду, если быть точным!). Проблема особенно коварна, когда классы Deadlock и Test написаны разными группами программистов. Неразумно ждать, что ответственные за класс Test хотя бы знают о том, что они где-то неправы, не говоря уж о решении проблемы. В этом отличие контекстов от использования явных блокировок, где взаимоблокировки обычно более очевидны.

Реентерабельность

Потокобезопасные методы иногда называют реентерабельными, так как выполнение их может быть заморожено и начато снова в другом потоке без побочных эффектов. В широком смысле, термины потокобезопасный и реентерабельный можно считать синонимами или очень близкими по значению.
Реентерабельность, однако, имеет другой, более мрачный смысл в режиме автоматической блокировки. Если атрибут Synchronization применяется с аргументом reEntrant, установленным в true:
[Synchronization(true)]
блокировка контекста синхронизации будет временно освобождена, когда исполнение покидает контекст. В предыдущем примере это помешало бы возникновению взаимоблокировки, что хорошо. Есть, однако, и побочный эффект – в это время любой поток может вызвать любой метод первоначального объекта (повторный вход в контекст синхронизации) и вернуть нам таким образом все “прелести” многопоточности, которых мы пытались избежать. В этом проблема реентерабельности.
Поскольку [Synchronization(true)] применяется на уровне класса, этот атрибут превращает каждый вызов метода, покидающего созданный классом контекст, в троянского коня реентерабельности.
Реентерабельность может быть опасна, но возможны и другие варианты. Предположим, что необходимо реализовать многопоточность внутри синхронизированного класса, делегируя логику рабочим потокам, исполняемым в отдельных контекстах. Этим рабочим потокам было бы неразумно препятствовать во взаимодействии с другими рабочими потоками или исходным объектом без реентерабельности.
Здесь выявляется основное слабое место автоматической синхронизации: широта области, к которой применяется блокировка, создает сложности, которые в других случаях, возможно, никогда и не возникли бы. Эти сложности – взаимоблокировки, реентерабельность и кастрированный параллелизм – могут сделать ручную блокировку более приемлемой в случаях, отличных от простейших сценариев.

3. Работа с потоками

Апартаменты и Windows Forms

Потоковые апартаменты – это автоматический потокобезопасный режим, тесно связанный с COM, предыдущей технологией Microsoft. В то время как .NET в основном свободен от унаследованных потоковых моделей, временами они все еще необходимы, из-за потребности работать с устаревшими API. Потоковые апартаменты особенно важны в Windows Forms, так как Windows Forms по большей части используют или оборачивают Win32 API вместе с его апартаментами.
Апартамент – логический “контейнер” для потоков. Апартаменты бывают двух видов – “single” (однопоточные) и “multi” (многопоточные). Однопоточный апартамент может содержать только один поток, многопоточный – любое количество потоков. Однопоточная модель используется чаще и имеет большие возможности для взаимодействия.
Так же, как потоки, апартаменты могут содержать в себе объекты. Когда объект создается в каком-либо апартаменте, он остается там на всю жизнь, навсегда прикованный к апартаменту и его потоку(-ам). Это напоминает объект в контексте синхронизации .NET, за исключением того, что контекст синхронизации не владеет и не содержит потоков. Любой поток может вызвать любой объект в любом контексте синхронизации – с обязательным ожиданием эксклюзивной блокировки. Объекты же, входящие в какой-либо апартамент, могут быть вызваны только потоком этого апартамента.
Вообразите себе библиотеку, где каждая книга представляет собой объект. Выносить книги из библиотеки нельзя, они должны оставаться там всю жизнь. Добавим сюда человека, который будет представлять собой поток.
Библиотека-контекст синхронизации позволит войти любому человеку, но только одному одновременно. Если людей больше – перед библиотекой образуется очередь.
Библиотека-апартамент имеет свой штат – одного библиотекаря для однопоточной библиотеки или целую группу для многопоточной. Никто, кроме штатных библиотекарей, не может войти в библиотеку. Клиент, желающий провести исследование, должен посигналить библиотекарю, а затем попросить его выполнить задание! Вызов библиотекаря называется маршалингом – клиент выполняет маршалинг своего вызова штатному сотруднику. Маршалинг выполняется автоматически и реализуется через прокачку сообщений – в Windows Forms этот механизм постоянно следит за событиями клавиатуры и мыши от операционной системы. Если обработка не успевает за новыми сообщениями, создается очередь сообщений, обрабатываемая в порядке поступления.

Назначение типа апартамента

Потоку .NET автоматически назначается апартамент при переходе к использующему апартаменты Win32-коду или унаследованному COM-коду. По умолчанию он помещается в многопоточный апартамент, если только не запрашивает однопоточный следующим образом:
Thread t = new Thread(...);
t.SetApartmentState(ApartmentState.STA);
Можно также запросить, чтобы главный поток приложения вошел в однопоточный апартамент, используя атрибут STAThread для метода main:
class Program 
{
  [STAThread]
  static void Main() 
  {
    ...
Апартаменты никак не влияют на исполнение чистого .NET-кода. Другими словами, два потока из STA могут одновременно вызвать один и тот же метод одного и того же объекта, и при этом не будет никакого маршалинга или блокировок. Это может случиться, только когда дело дойдет до неуправляемого кода.
Типы из пространства имен System.Windows.Forms интенсивно вызывают Win32-код, разработанный в расчете на работу в однопоточном апартаменте. По этой причине main-метод программы Windows Forms должен быть помечен как [STAThread], иначе при вызове Win32 UI-кода произойдет одно из двух:
  • Произойдет маршалинг к STA.
  • Все накроется медным тазом.

Control.Invoke

В многопоточном приложении Windows Forms запрещено вызывать методы и свойства элементов управления из потоков, отличных от того, в котором они были созданы. Для всех межпоточных вызовов должен быть явно выполнен маршалинг в поток, в котором был создан элемент управления с использованием методов Control.Invoke или Control.BeginInvoke. Нельзя полагаться на автоматический маршалинг, так как он происходит слишком поздно, когда дело уже дошло до неуправляемого кода, в то время как предшествующий .NET-код уже отработал в “неправильном” потоке – и такой код не будет потокобезопасным.
Превосходное решение для управления рабочими потоками в приложениях Windows Forms состоит в использовании BackgroundWorker. Этот класс-обертка для рабочих потоков умеет уведомлять о ходе выполнения операции и ее завершении, и автоматически вызывает Control.Invoke там, где это нужно.

BackgroundWorker

BackgroundWorker – класс-обертка из пространства имен System.ComponentModel для управления рабочими потоками. Он обеспечивает следующие возможности:
  • Флаг отмены операции для завершения потока без использования Abort.
  • Стандартный протокол для сообщений о ходе выполнения операции, ее завершении и отмене.
  • Реализация интерфейса IComponent для размещения в дизайнере Visual Studio.
  • Обработка исключений в рабочем потоке.
  • Возможность обновления элементов управления Windows Forms в процессе выполнения или при завершении операции.
Последние две возможности особенно полезны – они означают, что можно не добавлять try/catch в рабочий метод и обновлять элементы управления без использования Control.Invoke.
BackgroundWorker использует пул многократно используемых потоков, чтобы не создавать их под каждую новую задачу. Это означает, что нельзя вызывать Abort для потока BackgroundWorker.
Вот минимально необходимые действия для использования BackgroundWorker:
  • Создайте экземпляр BackgroundWorker и назначьте обработчик для события DoWork.
  • Вызовите RunWorkerAsync, если необходимо – с нужным object в качестве аргумента.
После этих действий начнется исполнение. Аргумент, переданный в RunWorkerAsync, будет перенаправлен в обработчик DoWork как свойство Argument параметра DoWorkEventArgs. Вот пример:
class Program 
{
  static BackgroundWorker bw = new BackgroundWorker();
  static void Main() 
  {
    bw.DoWork += bw_DoWork;
    bw.RunWorkerAsync("Message to worker");     
    Console.ReadLine();
  }
 
  static void bw_DoWork(object sender, DoWorkEventArgs e) 
  {
    // Это будет вызвано рабочим потоком
    Console.WriteLine(e.Argument); // напечатает "Message to worker"
    // Выполняем длительную операцию...
  }
BackgroundWorker также предоставляет событие RunWorkerCompleted, которое генерируется после завершения трудов обработчика события DoWorkОбработка RunWorkerCompleted необязательна, но обычно применяется, хотя бы для того, чтобы получить информацию о возможных исключениях в DoWork. Кроме того, код в обработчике RunWorkerCompleted может обновлять элементы управления Windows Forms без явного маршалинга, а код в DoWork – нет.
Чтобы добавить отображение выполнения операции:
  • Установите свойство WorkerReportsProgress в true.
  • Периодически вызывайте ReportProgress из обработчика DoWork с указанием количества “выполненных процентов” и, возможно, с user-state объектом.
  • Обрабатывайте событие ProgressChanged, запрашивая свойство ProgressPercentage его аргумента.
Код в обработчике ProgressChanged может свободно обращаться к элементам управления UI так же, как и в RunWorkerCompleted. Обычно это нужно для обновления индикатора прогресса.
Чтобы иметь возможность отмены операции:
  • Установите свойство WorkerSupportsCancellation в true.
  • Периодически проверяйте свойство CancellationPending в обработчике DoWork – если оно установлено в true, установите свойство Cancel аргумента DoWorkEventArgs в true и сделайте return (можно устанавливать Cancel в true и без опроса CancellationPending – если выполнение задания не может быть продолжено).
  • Для запроса отмены операции вызывайте CancelAsync.
Вот пример, реализующий обе описанные возможности:
using System;
using System.Threading;
using System.ComponentModel;

class Program
{
  static BackgroundWorker bw;

  static void Main()
  {
    bw = new BackgroundWorker();
    bw.WorkerReportsProgress = true;
    bw.WorkerSupportsCancellation = true;
    bw.DoWork += bw_DoWork;
    bw.ProgressChanged += bw_ProgressChanged;
    bw.RunWorkerCompleted += bw_RunWorkerCompleted;

    bw.RunWorkerAsync(null);

    Console.WriteLine(
      "Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу");
    Console.ReadLine();

    if (bw.IsBusy)
    {
      bw.CancelAsync();
      Console.ReadLine();
    }
  }

  static void bw_DoWork(object sender, DoWorkEventArgs e)
  {
    for (int i = 0; i <= 100; i += 20)
    {
      if (bw.CancellationPending)
      {
        e.Cancel = true;
        return;
      }

      bw.ReportProgress(i);
      Thread.Sleep(1000);
    }

    e.Result = 123;    // будет передано в RunWorkerComрleted
  }

  static void bw_RunWorkerCompleted(object sender,
    RunWorkerCompletedEventArgs e)
  {
    if (e.Cancelled)
      Console.WriteLine(
        "Работа BackgroundWorker была прервана пользователем!");
    else if (e.Error != null)
      Console.WriteLine("Worker exception: " + e.Error);
    else
      Console.WriteLine("Работа закончена успешно. Результат - "
        + e.Result + ". ");

    Console.WriteLine("Нажмите Enter для выхода из программы...");
  }

  static void bw_ProgressChanged(object sender,
    ProgressChangedEventArgs e)
  {
    Console.WriteLine("Обработано " + e.ProgressPercentage + "%");
  }
}
Консольный вывод (Enter нажат во время работы BackgroundWorker):
Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу...
Обработано 0%
Обработано 20%
Обработано 40%

Работа BackgroundWorker была прервана пользователем!
Нажмите Enter для выхода из программы...
Консольный вывод (работа BackgroundWorker не прерывалась):
Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу...
Обработано 0%
Обработано 20%
Обработано 40%
Обработано 60%
Обработано 80%
Обработано 100%
Работа закончена успешно. Результат - 123.
Нажмите Enter для выхода из программы...

Наследование от BackgroundWorker

BackgroundWorker не запечатан (sealed) и предоставляет виртуальный метод OnDoWork, давая возможность реализовать его по-своему. Когда разрабатывается потенциально долго выполняемый метод, можно использовать наследника BackgroundWorker, предназначенного для асинхронного выполнения работы. Использующий его код должен будет только обрабатывать события RunWorkerCompleted и ProgressChanged. Предположим, что у нас есть долго выполняющийся метод GetFinancialTotals:
public class Client 
{
  Dictionary <string,int> GetFinancialTotals(int foo, int bar) { ... }
  ...
}
Можно реорганизовать его следующим образом:
public class Client 
{
  public FinancialWorker GetFinancialTotalsBackground(int foo, int bar)
  {
    return new FinancialWorker(foo, bar);
  }
}
 
public class FinancialWorker : BackgroundWorker 
{
  // Можно добавить типизированные поля.
  public Dictionary <string,int> Result; 
  // Можно выставить их наружу как свойства с блокировками!
  public volatile int Foo;
  public volatile int Bar;

  public FinancialWorker() 
  {
    WorkerReportsProgress = true;
    WorkerSupportsCancellation = true;
  }
 
  public FinancialWorker(int foo, int bar) : this() 
  {
    Foo = foo;
    Bar = bar;
  }
 
  protected override void OnDoWork(DoWorkEventArgs e) 
  {
    ReportProgress(0, "Вкалываем... над отчетом...в поте лица...");
    Initialize financial report data
 
    while (!finished report ) 
    {
      if (CancellationPending) 
      {
        e.Cancel = true;
        return;
      }

      Perform another calculation step

      ReportProgress(percentCompleteCalc, "Продолжаем работу...");
    }      

    ReportProgress(100, "Готово!");
    e.Result = Result = completed report data;
  }
}
Вызвавший GetFinancialTotalsBackground получит FinancialWorker – практичную обертку, управляющую фоновой операцией. Она может сообщать о прогрессе операции, поддерживает отмену и совместима с Windows Forms без использования Control.Invoke. Кроме того, она обрабатывает возможные исключения и использует стандартный протокол (попробуйте-ка получить все это без использования BackgroundWorker!).
Возможность использования BackgroundWorker хоронит старую “асинхронную модель, основанную на событиях”.

ReaderWriterLock

Обычно экземпляры типов потокобезопасны при параллельных операциях чтения, но не при параллельных обновлениях, или параллельном чтении и обновлении. То же самое справедливо для ресурсов, например файлов. Если в основном выполняется чтение и только изредка – запись, защита экземпляров таких типов простой эксклюзивной блокировкой для всех режимов доступа может необоснованно ограничить параллелизм. В качестве примера можно привести сервер приложений, в котором часто используемые данные кэшируются в статических полях для быстрого доступа. Класс ReaderWriterLock разработан специально под такой сценарий работы.
ReaderWriterLock предоставляет отдельные методы для блокировки на чтение и на запись – AcquireReaderLock и AcquireWriterLock. Оба метода принимают аргумент-таймаут и генерируют исключение ApplicationException, если этот таймаут истекает (вместо возвращения false, как это делают остальные аналогичные методы, связанные с потоками). Таймаут может быть легко превышен, если ресурс пользуется популярностью.
Блокировка снимается при помощи методов ReleaseReaderLock или ReleaseWriterLock. Эти методы поддерживают вложенные блокировки. Предоставляется также метод ReleaseLock, снимающий все вложенные блокировки за один вызов. (Далее можно вызвать RestoreLock для восстановления состояния всех блокировок, предшествовавшего вызову ReleaseLock – в подражание поведению Monitor.Wait).
Можно начать с блокировки на чтение вызовом AcquireReaderLock, затем превратить ее в блокировку на запись, используя UpgradeToWriterLock. Этот метод возвращает cookie для последующего вызова DowngradeFromWriterLock. Такая система позволяет читателю запрашивать временный доступ для записи без необходимости повторного ожидания в очереди.
В следующем примере стартуют четыре потока – один непрерывно добавляет элементы в список, другой их удаляет, а два оставшихся постоянно сообщают о количестве элементов в списке. Первые два устанавливают блокировку на запись, два оставшихся – только на чтение. При каждой блокировке используется таймаут в 10 секунд (обработка исключений в данном примере опущена для краткости).
class Program 
{
  static ReaderWriterLock rw = new ReaderWriterLock();
  static List<int> items = new List<int>();
  static Random rand = new Random();
 
  static void Main(string[] args) 
  {
    new Thread(delegate() { while (true) AppendItem(); }).Start();
    new Thread(delegate() { while (true) RemoveItem(); }).Start();
    new Thread(delegate() { while (true) WriteTotal(); }).Start();
    new Thread(delegate() { while (true) WriteTotal(); }).Start();
  }
 
  static int GetRandNum(int max) { lock (rand) return rand.Next(max); }
 
  static void WriteTotal() 
  {
    rw.AcquireReaderLock(10000);
    int tot = 0;

    foreach (int i in items)
      tot += i;

    Console.WriteLine(tot);
    rw.ReleaseReaderLock();
  }
 
  static void AppendItem() 
  {
    rw.AcquireWriterLock(10000);
    items.Add(GetRandNum(1000));
    Thread.SpinWait(400);
    rw.ReleaseWriterLock();
  }
 
  static void RemoveItem() 
  {
    rw.AcquireWriterLock(10000);

    if (items.Count > 0)
      items.RemoveAt(GetRandNum(items.Count));

    rw.ReleaseWriterLock();
  }
}
Поскольку добавление элементов в список происходит быстрее, чем удаление, в этом примере в метод AppendItem добавлен вызов SpinWait для сохранения баланса.

Пулы потоков

Если в вашем приложении так много потоков, что большая часть их времени тратится на ожидание на WaitHandle, вы можете уменьшить накладные расходы, используя пул потоков. Экономия достигается за счет объединения WaitHandle нескольких потоков.
Для использования пула потоков нужно зарегистрировать WaitHandle и делегат, который должен быть исполнен, когда WaitHandle будет установлен. Это делается вызовом ThreadPool.RegisterWaitForSingleObject, как в следующем примере:
class Test 
{
  static ManualResetEvent starter = new ManualResetEvent(false);
 
  public static void Main() 
  {
    ThreadPool.RegisterWaitForSingleObject(starter, Go, "привет", -1, true);
    Thread.Sleep(5000);
    Console.WriteLine("Запускается рабочий поток...");
    starter.Set();
    Console.ReadLine();
  }
 
  public static void Go(object data, bool timedOut) 
  {
    Console.WriteLine("Запущено: " + data);
    // Выполнение задачи...
  }
}
Консольный вывод (после пятисекундной задержки):
Запускается рабочий поток...
Запущено: привет
В дополнение к WaitHandle и делегату RegisterWaitForSingleObject принимает объект-“черный ящик”, который будет передан в ваш делегат (как это было для ParameterizedThreadStart), таймаут в миллисекундах (-1, если таймаут не нужен) и флаг, указывающий, является запрос одноразовым или повторяющимся.
Все потоки пула являются фоновыми, они уничтожаются автоматически, когда завершается основной поток(-и) приложения. Однако ожидание выполнения важной работы в потоках пула перед завершением приложения на вызове Join бессмысленно, так как потоки пула после выполнения работы не завершаются, они используются повторно до завершения родительского процесса. Поэтому чтобы узнать, когда исполняемая работа будет закончена, необходимо посигналить, например, специальным WaitHandle.
Вызов Abort для потоков пула – это тоже плохая идея. Потоки пула должны повторно использоваться в течение всей жизни приложения.
Можно использовать пул потоков и без WaitHandle, вызывая метод QueueUserWorkItem и задавая делегат для немедленного вызова. При этом вы не сохраните потоки для повторного использования, но получите выгоду в другом: пул потоков поддерживает максимальное количество потоков (по умолчанию 25), автоматически выстраивая задачи в очередь, когда их количество превышает эту цифру. Это скорее походит на очередь поставщик/потребитель с 25-ю потребителями! В следующем примере 100 заданий ставятся в очередь к пулу потоков, и только 25 одновременно исполняются. Главный поток ждет, пока все задания не будут выполнены, используя Wait и Pulse:
class Test
{
  static object workerLocker = new object();
  static int runningWorkers = 100;

  public static void Main()
  {
    for (int i = 0; i < runningWorkers; i++)
      ThreadPool.QueueUserWorkItem(Go, i);

    Console.WriteLine("Ожидаем завершения работы потоков...");

    lock (workerLocker)
      while (runningWorkers > 0)
        Monitor.Wait(workerLocker);

    Console.WriteLine("Готово!");
    Console.ReadLine();
  }

  public static void Go(object instance)
  {
    Console.WriteLine("Запущен:  " + instance);
    Thread.Sleep(1000);
    Console.WriteLine("Завершен: " + instance);

    lock (workerLocker)
    {
      runningWorkers--;
      Monitor.Pulse(workerLocker);
    }
  }
}
Консольный вывод:
Ожидаем завершения работы потоков..
Запущен:  0
Завершен: 0
Запущен:  2
Запущен:  1
Запущен:  3
Завершен: 2
Запущен:  5
Завершен: 1
Запущен:  6
Запущен:  4
...
Завершен: 95
Завершен: 96
Завершен: 97
Завершен: 99
Завершен: 98
Готово!
Если необходимо передать в целевой метод более одного объекта, можно или создать объект с нужными свойствами, или использовать анонимный метод. Например, если метод Go требует два целочисленных параметра, можно запустить его следующим образом:
ThreadPool.QueueUserWorkItem(delegate(object notUsed) { Go(23,34); });
Другой путь в пул потоков – через асинхронные делегаты.

Асинхронные делегаты

В первой части было описано, как передать данные в поток, используя ParameterizedThreadStart. Иногда же нужно, наоборот, вернуть данные из потока, когда он завершит выполнение. Асинхронные делегаты предлагают для этого удобный механизм, позволяя передавать в обоих направлениях любое количество параметров с контролем их типа. Кроме того, необработанные исключения из асинхронных делегатов удобно перебрасываются в исходный поток и, таким образом, не требуют отдельной обработки. Асинхронные делегаты также предоставляют альтернативный путь к пулу потоков.
Цена, которую нужно заплатить – асинхронная модель. Чтобы понять, что это значит, сначала рассмотрим обычную, синхронную модель программирования. Скажем, нужно сравнить две web-страницы. Можно последовательно загрузить каждую страницу, а затем сравнить их примерно так:
static void ComparePages() 
{
  WebClient wc = new WebClient();
  string s1 = wc.DownloadString("http://www.rsdn.ru");
  string s2 = wc.DownloadString("http://rsdn.ru");
  Console.WriteLine(s1 == s2 ? "Одинаковые" : "Различные");
}
Конечно, было бы быстрее, если бы обе страницы загружались одновременно. Возможный взгляд на эту проблему – возложить вину на DownloadString, которая блокирует вызывающий метод на время загрузки страницы. Было бы хорошо вызвать DownloadString не блокирующим, асинхронным способом, другими словами:
  1. Говорим DownloadString начать выполнение.
  2. Исполняем другие задачи, пока она работает, например загружаем вторую страницу
  3. Запрашиваем у DownloadString результаты.
Класс WebClient предлагает встроенный метод DownloadStringAsync, предоставляющий возможности, аналогичные асинхронным. Однако пока мы его проигнорируем и сосредоточимся на механизме, который позволяет вызывать асинхронно любые методы.
Третий шаг – это то, что делает асинхронные делегаты полезными. Вызывающий поток стыкуется с рабочим, чтобы получить результаты и чтобы позволить повторную генерацию исключений. Без этого шага мы имеем нормальную многопоточность. Однако использование асинхронных делегатов без этого заключительного рандеву немногим более полезно, чем ThreadPool.QueueWorkerItem или BackgroundWorker.
Вот как можно использовать асинхронные делегаты для загрузки двух web-страниц, с одновременным выполнением других вычислений:
delegate string DownloadString(string uri);
 
static void ComparePages() 
{
  // создаем два экземпляра делегата DownloadString:
  DownloadString download1 = new WebClient().DownloadString;
  DownloadString download2 = new WebClient().DownloadString;
  
  // Стартуем загрузку:
  IAsyncResult cookie1 = download1.BeginInvoke(uri1, null, null);
  IAsyncResult cookie2 = download2.BeginInvoke(uri2, null, null);
  
  // Выполняем какие-то вычисления:
  double seed = 1.23;

  for (int i = 0; i < 1000000; i++)
    seed = Math.Sqrt(seed + 1000);
  
  // Получаем результат загрузки, ожидая в случае необходимости.
  // Если были исключения, они будут сгенерированы здесь:
  string s1 = download1.EndInvoke(cookie1);
  string s2 = download2.EndInvoke(cookie2);
  
  Console.WriteLine(s1 == s2 ? "Одинаковые" : "Различные");
}
Мы начинаем с объявления и создания делегатов для методов, которые хотим исполнить асинхронно. В этом примере нам нужны два делегата – каждый для отдельного объекта WebClient (WebClient не допускает параллельного доступа, если бы это было возможно, мы использовали бы один единственный делегат).
Далее вызываем BeginInvoke. Методы WebClient начинают исполняться, а управление немедленно возвращается в вызывающий код. В соответствии с сигнатурой делегата в BeginInvoke передается строка, а EndInvoke возвращает строку.
BeginInvoke нужны два дополнительных параметра – метод обратного вызова и объект с данными; обычно в них нет необходимости и можно передать в них null. BeginInvoke возвращает объект IASynchResult, используемый как cookie для вызова EndInvoke. У объекта IASynchResult есть также свойство IsCompleted, которое можно использовать для проверки завершения операции.
Далее мы вызываем для делегатов EndInvoke, так нам нужны их результаты. При необходимости EndInvoke будет ожидать завершения операции, а затем вернет значение, указанное в типе делегата (в нашем случае строку). Удобная особенность EndInvoke – если бы в DownloadString были ref- или out-параметры, они были бы добавлены в сигнатуру EndInvoke, позволяя возвратить таким образом несколько значений.
Если при исполнении асинхронного метода возникает необработанное исключение, оно будет повторно сгенерировано при вызове EndInvoke. Это позволяет аккуратно передавать исключения вызывающему коду.
Даже если вызываемый асинхронно метод не возвращает никакого значения, чисто технически все равно необходимо вызывать EndInvoke. Однако на практике возможны варианты. MSDN выдает противоречивые рекомендации на этот счет. Если вы решите не вызывать EndInvoke, вам необходимо будет разрешить проблему обработки исключений в вызываемом методе.

Асинхронные методы

Некоторые типы .NET Framework предлагают асинхронные версии своих методов, с именами, начинающимися с "Begin" и "End". Они называются асинхронными методами и имеют сигнатуры, подобные асинхронным делегатам, но предназначены они для решения другой проблемы – выполнять больше асинхронных манипуляций, чем у вас есть потоков. Например, Web-сервер или сервер на TCP-сокетах могут параллельно обрабатывать несколько сотен запросов на горстке потоков из пула, используя NetworkStream.BeginRead и NetworkStream.BeginWrite.
Старайтесь однако, избегать асинхронных методов, если вы не пишете специализированное приложение, которое должно обрабатывать много параллельных запросов, по следующим причинам:
  • В отличие от асинхронных делегатов, асинхронные методы фактически не могут выполняться параллельно с вызывающим кодом.
  • Выгода от асинхронных методов уменьшается или исчезает вообще, если не следовать дотошно правилам их использования.
  • Зато простые вещи быстро станут сложными, если вы строго следуете правилам.
Если вам просто нужно параллельное выполнение, лучше вызовите синхронную версию метода (например NetworkStream.Read) через асинхронный делегат. Другая возможность – использовать ThreadPool.QueueUserWorkItem или BackgroundWorker — либо просто создать новый поток.

Асинхронные события

Существует и другой сценарий, по которому типы могут предоставить асинхронные версии своих методов. Он называется “асинхронностью на основе событий”. Названия таких методов заканчиваются на “Async” и “Completed”. Класс WebClient применяет такой способ в методе DownloadStringAsync. Чтобы использовать его, добавьте обработчик для события “Completed” (в данном случае DownloadStringCompleted), и затем вызывайте метод "Async" (т.е. DownloadStringAsync). Когда метод завершит работу, будет вызван ваш обработчик события. К сожалению, реализация этих методов в WebClient испорчена – DownloadStringAsync блокирует вызывающий код на часть времени загрузки.
Сценарий на основе событий также поддерживает реализованные дружественным для Windows Forms-приложений способом события, информирующие о ходе продвижения асинхронной операции и ее отмене. Если вам нужны эти возможности, а тип не поддерживает асинхронность на основе событий (или реализует ее некорректно), не старайтесь самостоятельно реализовать именно этот паттерн, все это проще реализуется как обертка для BackgroundWorker.

Таймеры

Самый простой способ выполнять метод периодически – использовать таймер, например класс Timer из пространства имен System.Threading. Этот таймер использует пул потоков, допуская создание множества таймеров без накладных расходов в виде такого же количества потоков. Timer – довольно простой класс с конструктором и парой методов (просто наслаждение для минималистов и авторов книг!).
public sealed class Timer : MarshalByRefObject, IDisposable
{
  public Timer(TimerCallback tick, object state, 1st, subsequent);
  public bool Change(1st, subsequent);  // Для изменения периода
  public void Dispose();                // Для удаления
}
// 1st = время до первого срабатывания в миллисекундах или как TimeSpan
// subsequent = следующие интервалы в миллисекундах или как TimeSpan 
// (используйте Timeout.Infinite для одноразового срабатывания)
В следующем примере таймер вызывает метод Tick, который печатает "tick..." по истечении 5 секунд и далее каждую секунду, пока пользователь не нажмет Enter:
using System;
using System.Threading;
 
class Program 
{
  static void Main() 
  {
    Timer tmr = new Timer(Tick, "tick...", 5000, 1000);
    Console.ReadLine();
    tmr.Dispose();           // Остановка таймера
  }

  static void Tick(object data) 
  {
    // Этот код выполняется на потоке из пула
    Console.WriteLine(data); // Печать: "tick..."
  }
}
.NET Framework предоставляет также другой класс таймера с тем же самым именем в пространстве имен System.Timers. Это простая обертка System.Threading.Timer, с тем же самым основным механизмом, обеспечивающая дополнительные удобства при использовании пула потоков. Вот основные дополнительные возможности:
  • Реализация в виде компонента, позволяющая размещать его в дизайнере Visual Studio.
  • Свойство Interval вместо метода Change.
  • Событие Elapsed вместо делегата обратного вызова.
  • Свойство Enabled для запуска и остановки таймера (значение по умолчанию – false).
  • Методы Start и Stop на случай, если вам не нравится Enabled.
  • Флаг AutoReset для указания необходимости периодических срабатываний (значение по умолчанию – true).
Вот пример:
using System;
using System.Timers;   // Пространство имен Timers вместо Threading
 
class SystemTimer 
{
  static void Main()
  {
    Timer tmr = new Timer();       // Конструктор без параметров
    tmr.Interval = 500;
    tmr.Elapsed += tmr_Elapsed;    // Событие вместо делегата
    tmr.Start();                   // Запустить таймер
    Console.ReadLine();

    tmr.Stop();                    // Остановить таймер
    Console.ReadLine();

    tmr.Start();                   // Продолжить
    Console.ReadLine();

    tmr.Dispose();                 // Остановить навсегда
  }
 
  static void tmr_Elapsed(object sender, EventArgs e)
  {
    Console.WriteLine("Tick");
  }
}
.NET Framework предоставляет еще и третий вид таймера – в пространстве имен System.Windows.Forms. Похожий на System.Timers.Timer по интерфейсу, он радикально отличается от него по функциональности. Таймер Windows Forms не использует пула потоков, вместо этого вызывая событие “Tick” всегда в том же самом потоке, в котором был создан таймер. При условии, что таймер создается в главном потоке, там же, где все формы и элементы управления приложения Windows Forms, обработчик события срабатывания таймера может взаимодействовать с формой и элементами управления без нарушения потоковой безопасности и необходимости вызовов Control.Invoke.
Таймер Windows Forms предназначен для заданий, которые могут привести к обновлению пользовательского интерфейса, и которые должны выполняться достаточно быстро. Быстрота выполнения важна, так как событие Tick вызывается в главном потоке, а значит, во время его выполнения интерфейс не будет отвечать на действия пользователя.

Локальные хранилища

Каждый поток получает область данных, изолированную от всех других потоков. Это необходимо для хранения специфических данных инфраструктуры выполнения, типа передачи сообщений, транзакций или токенов безопасности. Передавать такие данные через параметры методов было бы слишком неудобно, хранение же информации в статических полях означает доступность ее для всех потоков.
Метод Thread.GetData читает из изолированной области данных потока, Thread.SetData пишет в нее. Оба метода требуют в качестве параметра объект LocalDataStoreSlot (на самом деле это только обертка для строки с именем слота) для идентификации слота. Один и тот же объект LocalDataStoreSlot может быть использован из любого потока для получения им своих локальных данных. Вот пример:
class ... 
{
  // Этот объект LocalDataStoreSlot может быть использован из любого потока.
  LocalDataStoreSlot secSlot = Thread.GetNamedDataSlot("securityLevel");
 
  // Это свойство будет иметь своё значение для каждого потока
  int SecurityLevel 
  {
    get 
    {
      object data = Thread.GetData(secSlot);
      return data == null ? 0 : (int)data;    // null == uninitialized
    }

    set { Thread.SetData(secSlot, value); }
  }
  ...
Thread.FreeNamedDataSlot освобождает соответствующий слот данных всех потоков, но только тогда, когда все объекты LocalDataStoreSlot с одним и тем же именем выйдут из области видимости и будут уничтожены сборщиком мусора. Это гарантирует, что слот данных не будет отобран у потока, пока он сохраняет ссылку на соответствующий объект LocalDataStoreSlot.

4. Дополнительные материалы

Неблокирующая синхронизация

Ранее было сказано, что синхронизация необходима даже в простых случаях присвоения значения или увеличения значения поля. Хотя эксклюзивная блокировка в данном случае и может помочь, в результате борьбы за блокировку поток может быть заблокирован, что чревато соответствующими накладными расходами. Конструкции неблокирующей синхронизации .NET Framework позволяют выполнить простые операции без блокирования, приостановок и ожидания. При этом используются атомарные операции, а также чтение и запись с семантикой “volatile”. Иногда проще использовать такие конструкции, а не блокировки.

Атомарность и Interlocked

Инструкция является атомарной, если она выполняется как единая, неделимая команда. Строгая атомарность препятствует любой попытке вытеснения. В C# простое чтение или присвоение значения полю в 32 бита или менее является атомарным (для 32-битных CPU). Операции с большими полями не атомарны, так как являются комбинацией более чем одной операции чтения/записи:
class Atomicity 
{
  static int x;
  static int y;
  static long z;
  
  static void Test()
  {
    long myLocal;
    x = 3;             // Атомарная операция
    z = 3;             // Не атомарная (z – 64-битная переменная)
    myLocal = z;       // Не атомарная (z is 64 bits)
    y += x;            // Не атомарная (операции чтения и записи)
    x++;               // Не атомарная (операции чтения и записи)
  }
}
Чтение и запись 64-битных полей не атомарны на 32-битных CPU, так при этом используются два 32-битных участка памяти. Если поток A читает 64-битное значение, в то время как поток B обновляет его, поток A может получить битовую комбинацию из старого и нового значений.
Унарные операторы типа x++ сначала читают переменную, затем обрабатывают ее, а потом записывают новое значение. Рассмотрим следующий класс:
class ThreadUnsafe 
{
  static int x = 1000;

  static void Go()
  {
    for (int i = 0; i < 100; i++)
      x--;
  }
}
Вы могли бы подумать, что если бы десять потоков одновременно выполняли Go, в результате x было бы равно 0. Однако такой гарантии нет, потому что возможна ситуация, когда один поток вытеснит другой после получения им текущего значения x, уменьшит его и запишет его назад (в результате первый поток продолжит работу с устаревшим значением x).
Один из путей решения таких проблем – обернуть неатомарные операции в блокировку. Блокировка фактически моделирует атомарность. Однако класс Interlocked предлагает более простое и быстрое решение для простых атомарных операций:
class Program 
{
  static long sum;
 
  static void Main()                                            // sum
  {
    // Простой increment/decrement:
    Interlocked.Increment(ref sum);                             // 1
    Interlocked.Decrement(ref sum);                             // 0
 
    // Сложение/вычитание:
    Interlocked.Add(ref sum, 3);                                // 3
 
    // Чтение 64-битного поля:
    Console.WriteLine(Interlocked.Read(ref sum));               // 3
 
    // Запись 64-битного поля после чтения предыдущего значения:
    Console.WriteLine(Interlocked.Exchange(ref sum, 10));       // 10
 
    // Обновление поля только если оно соответствует
    // определенному значению(10):
    Interlocked.CompareExchange(ref sum, 123, 10);              // 123
  }
}
Использование Interlocked вообще более эффективно, чем lock, так как при этом в принципе отсутствует блокировка – и соответствующие накладные расходы на временную приостановку потока.
Interlocked аналогично действует и при использовании из разных процессов – в отличие от оператора lock, который эффективен только в рамках потоков текущего процесса. Это может быть использовано, например, при чтении и записи в разделяемую память (shared memory).

Барьеры в памяти и асинхронная изменчивость (volatility)

Рассмотрим следующий класс:
class Unsafe 
{
  static bool endIsNigh;
  static bool repented;

  static void Main()
  {
    // Запустить поток, ждущий изменения флага в цикле...
    new Thread(Wait).Start();
    Thread.Sleep(1000); // Дадим секунду на «прогрев»!
    repented = true;
    endIsNigh = true;
    Console.WriteLine("Понеслась...");
  }
  
  static void Wait()
  {
    while (!endIsNigh) // Крутимся в ожидании изменения значения endIsNigh
      ;

    Console.WriteLine("Готово, " + repented);
  }
}
Внимание, вопрос: насколько существенная задержка может разделять "Понеслась..." от "Готово" – другими словами, может ли цикл в методе Wait продолжать крутиться после того, как флаг endIsNigh был установлен в true? И еще, может ли метод Wait напечатать "Готово, false"?
Ответ на оба вопроса – теоретически да, может, на многопроцессорной машине, если планировщик потоков назначит эти два потока на разные CPU. Поля repented и endIsNigh могут кэшироваться в регистрах CPU для повышения производительности и записываться назад в память с некоторой задержкой. И порядок, в каком регистры записываются в память, необязательно совпадет с порядком обновления полей.
Это кэширование можно обойти, используя статические методы Thread.VolatileRead и Thread.VolatileWrite для чтения и записи полей. VolatileRead – это способ “читать последнее значение”; VolatileWrite означает “записать немедленно в память”. Того же эффекта можно достичь более изящно, объявлением полей с модификатором volatile:
class ThreadSafe 
{
  // используйте семантику чтения/записи volatile:
  volatile static bool endIsNigh;
  volatile static bool repented;
  ...
Если ключевое слово volatile используется как замена методов VolatileRead и VolatileWrite, можно просто думать, что оно означает “не использовать кэш потока для этого поля!”.
Тот же эффект может быть достигнут оборачиванием доступа к repented и endIsNigh в оператор lock. Это работает, так как побочный (но необходимый) эффект блокировки состоит в создании барьера в памяти – для гарантии, что асинхронная изменчивость полей, используемых внутри конструкции lock, не выходит за ее пределы. Другими словами, значения полей будут самими свежими при входе в lock (volatile-чтение) и будут записаны в память перед выходом из lock (volatile-запись).
Использование оператора lock было бы необходимо, если бы нужно было получить доступ к полям repented и endIsNigh атомарно, например, выполнить что-то типа такого:
lock (locker)
{
  if (endIsNigh)
    repented = true; 
}
lock также может оказаться предпочтительнее там, где поля много раз используются в цикле (при этом lock сделан на весь цикл). Хотя volatile-чтение/запись превосходит lock в производительности, маловероятно, что тысяча операций volatile-чтения/записи окажется выгоднее одной блокировки.
Асинхронная изменчивость присуща только примитивным интегральным типам (и unsafe-указателям) – другие типы не кэшируются в регистрах CPU и не могут быть объявлены с ключевым словом volatile. Семантика volatile-чтения и записи автоматически применяется к полям, когда доступ осуществляется через класс Interlocked.
Если ваша политика предполагает доступ к полям из разных потоков в операторе lockvolatile и Interlocked вам не нужны.

Wait и Pulse

Ранее мы рассматривали EventWaitHandle – простой сигнальный механизм блокировки потока до получения уведомления от другого потока.
Более мощная сигнальная конструкция предоставляется классом Monitor при помощи двух статических методов – Wait и Pulse. Принцип состоит в том, что вы пишете сигнальную логику сами, используя флаги и поля (вместе с оператором lock), а затем вводите команды Wait и Pulseдля уменьшения нагрузки на CPU. Преимущество такого низкоуровневого подхода в том, что используя только WaitPulse и lock можно получить функциональность AutoResetEventManualResetEvent и Semaphore, а также статических методов WaitHandle – WaitAll и WaitAny. Кроме того, Wait и Pulse можно применить в ситуациях, где любой WaitHandle бросает вызов бережливости.
Проблема Wait и Pulse – скудная документация, особенно в отношении необходимости их применения. И что еще хуже, Wait и Pulse испытывают особенное отвращение к дилетантам: если вы вызываете их без полного понимания, они это узнают – и будут счастливы найти и замучать вас до смерти! К счастью, есть простой образец, которому можно следовать, и который обеспечивает надежное решение в каждом случае.

Определение Wait и Pulse

Назначение Wait и Pulse – обеспечить простой сигнальный механизм: Wait блокирует, пока не получено уведомление от другого потока, Pulse реализует это уведомление.
Чтобы сигнализация сработала, Wait должен выполняться перед Pulse. Если Pulse выполнится первым, его сигнал будет потерян, и вызванный после него Wait должен будет ожидать следующего сигнала или остаться навсегда заблокированным. Это поведение отличается от AutoResetEvent, у которого метод Set имеет эффект “защелки” и работает, даже если вызван до WaitOne.
Для вызова Wait или Pulse необходимо определить объект синхронизации. Если два потока используют один и тот же объект, они способны посигналить друг другу. Объект синхронизации должен быть заблокирован перед вызовом Wait или Pulse.
Например, если x объявлен следующим образом:
class Test 
{
  // Любой объект ссылочного типа может быть объектом синхронизации
  object x = new object();
}
то следующий код заблокирует поток на вызове Monitor.Wait:
lock (x)
  Monitor.Wait(x);
А этот код (если он выполнен позже в другом потоке) освободит блокированный поток:
lock (x)
  Monitor.Pulse(x);

Переключение блокировки

Чтобы выполнить эту работу, Monitor.Wait временно освобождает или отключает базовый lock на время ожидания, чтобы другой поток (который будет вызывать Pulse) тоже мог получить блокировку. Метод Wait можно представить в виде следующего псевдокода:
Monitor.Exit(x);             // освободить блокировку
Ожидать вызова pulse для x
Monitor.Enter(x);            // восстановить блокировку
Следовательно, Wait может заблокировать поток дважды: один раз при ожидании Pulse, и еще раз – при восстановлении эксклюзивной блокировки. Это также означает, что Pulse не полностью разблокирует ожидающий поток: только когда сигнализирующий поток покидает конструкцию lock, ожидающий поток действительно может идти дальше.
Переключение блокировки методом Wait эффективно независимо от уровня вложенности блокировок. Например, если Wait вызывается из двух вложенных выражений блокировки:
lock (x)
  lock (x)
    Monitor.Wait(x);
Wait логически разворачивается следующим образом:
Monitor.Exit(x); Monitor.Exit(x);    // Exit дважды для освобождения lock
Ожидать вызова pulse для x
Monitor.Enter(x); Monitor.Enter(x);  // Восстановление уровней вложенности
Согласно нормальной семантике блокировки, только первый вызов Monitor.Enter действительно может произвести блокировку.
Зачем нужен lock?
Почему Wait и Pulse разработаны так, что могут работать только в пределах lock? Основная причина – дать возможность вызвать Wait по условию, без нарушений потоковой безопасности. В качестве простого примера предположим, что нужно вызвать Wait, только если булево поле равно false. Следующий код будет потокобезопасным:
lock (x) 
{
  if (!available)
    Monitor.Wait(x);

  available = false;
}
Несколько потоков могут выполнять этот код одновременно, но ни один не может быть вытеснен между проверкой поля и вызовом Monitor.Wait. Эти две инструкции являются атомарными. Аналогично, генерация уведомления также будет потокобезопасной:
lock (x)
{
  if (!available)
  {
    available = true;
    Monitor.Pulse(x);
  }
  ...
Задание таймаута
Таймаут можно задать при вызове Wait как число миллисекунд или как TimeSpanWait возвращает false, если ожидание завершается по таймауту. Таймаут участвует только в фазе ожидания сигнала (Pulse), после ее окончания Wait должен будет вновь заблокировать x, и будет пытаться сделать это столько, сколько потребуется. Вот пример:
lock (x) 
{
  if (!Monitor.Wait(x, TimeSpan.FromSeconds(10)))
    Console.WriteLine("Не дождалися!");

  Console.WriteLine("А 'x'-то все еще заблокирован!");
}
Это поведение объясняется тем, что в правильно разработанном Wait/Pulse-приложении объект, на котором вызываются Wait и Pulse, блокируется на короткий промежуток времени, так что переключение блокировки должно быть почти мгновенной операцией.
Сигнализация и подтверждения
Важная особенность Monitor.Pulse – этот вызов выполняется асинхронно, без блокировок или других задержек. Если другой поток ждет на сигнальном объекте, он получит уведомление, если нет, вызов Pulse будет тихо проигнорирован.
Pulse обеспечивает только одностороннюю коммуникацию в направлении ожидающего потока. Никакого встроенного механизма подтверждений не существует. Pulse не возвращает значения, указывающего, был ли получен сигнальный импульс. Кроме того, когда сигнализирующий поток пошлет сигнал и освободит блокировку, нет никаких гарантий, что ждущий сигнала поток вернется к жизни немедленно. Возможна произвольная задержка, определяемая планировщиком потоков, в течении которой никакой поток не владеет блокировкой. Это еще более затрудняет определение момента, когда ожидающий поток действительно возобновляет исполнение, если только не предусмотрено специального подтверждения, например в виде флага.
Если требуется подтверждение, оно должно добавляться явным образом, например, в виде флага, связывающего потоки, вызывающие Pulse и Wait.
Полагаясь на своевременные действия потока, ожидающего сигнала, без специального механизма подтверждений вы проиграете!
Очередь ожидания и PulseAll
Вызвать Wait на одном и том же объекте могут сразу несколько потоков – в этом случае за объектом синхронизации образуется очередь ожидания, "waiting queue" (не путать c очередью ожидания на lock – "ready queue"). Каждый Pulse освобождает один поток из головы очереди ожидания, "waiting queue", после чего он переходит к "ready queue" для переустановки блокировки. Можно провести аналогию с автоматической парковкой для машин – сначала вы стоите в очереди к автомату для проверки билетов ("waiting queue"), а потом ждете снова перед шлагбаумом на входе ("ready queue").

Рисунок 2: Waiting Queue и Ready Queue
Однако часто упорядоченность потоков, присущая очередям, не нужна в Wait/Pulse-приложениях, и в таких случаях проще представить себе некий "пул" ожидающих потоков. Каждый Pulse освобождает один поток из пула.
Класс Monitor предоставляет также метод PulseAll, освобождающий всю очередь или пул потоков. Потоки, однако, все равно стартуют не все сразу, а в определенной последовательности, так как каждый Wait пытается переустановить одну и ту же блокировку. Так что PulseAll просто перемещает потоки из "waiting queue" в "ready queue", после чего они могут продолжить исполнение в дежурном порядке.

Использование Pulse и Wait

Итак, начнем. И для начала условимся о следующем:
  • Из всех конструкций синхронизации мы будем использовать только lock aka Monitor.Enter/Monitor.Exit.
  • Нет никаких ограничений на загрузку CPU!
Имея в виду эти два правила, рассмотрим простой пример: рабочий поток, который приостанавливается, пока не получит уведомление от главного потока:
class SimpleWaitPulse 
{
  bool go;
  object locker = new object();
 
  void Work() 
  {
    Console.Write("Ждем... ");

    lock (locker)
    {
      while (!go) 
      {
        // Освободим блокировку, чтобы другой поток мог изменить флаг go
        Monitor.Exit(locker); 
        // Снова заблокируем перед проверкой go в while
        Monitor.Enter(locker);
      }
    }

    Console.WriteLine("Оповещен!");
  }
 
  void Notify()// вызывается из другого потока
  {
    lock (locker) 
    {
      Console.Write("Оповещаем... ");
      go = true;
    }
  }
}
Вот метод Main, приводящий все это в движение:
static void Main() 
{
  SimpleWaitPulse test = new SimpleWaitPulse();
 
  // Запускаем метод Work в отдельном потоке
  new Thread(test.Work).Start(); // "Ждем..."
 
  // Подождем секунду и уведомим рабочий поток из главного:
  Thread.Sleep(1000);
  test.Notify(); // "Оповещаем... Оповестили!"
}
Метод Work, где мы крутимся в цикле, постоянно потребляет ресурсы CPU, пока флаг go установлен в true! В цикле нужно постоянно переключать блокировку при помощи Monitor.Enter и Monitor.Exit – чтобы другой поток мог получить блокировку и модифицировать флаг go. Доступ к полю go должен всегда осуществляться только изнутри lock, чтобы избежать проблем с асинхронной изменчивостью (volatility) (помните, что по правилам, о которых мы условились, другие конструкции синхронизации, в том числе и ключевое слово volatile нам недоступны!).
Теперь запустим пример, чтобы убедиться, что он действительно работает. Вот что он выводит:
Ждем... (пауза) Оповещаем... Оповестили!
Добавим Wait и Pulse. Сделаем это так:
  • Заменим переключение блокировки (Monitor.Enter/Monitor.Exit) на Monitor.Wait.
  • Вставим вызов Monitor.Pulse после установки флага go.
Вот модифицированный класс, с опущенными для краткости вызовами Console:
class SimpleWaitPulse 
{
  bool go;
  object locker = new object();
 
  void Work() 
  {
    lock (locker)
      while (!go)
        Monitor.Wait(locker);
  }
 
  void Notify() 
  {
    lock (locker) 
    {
      go = true;
      Monitor.Pulse(locker);
    }
  }
}
Класс работает так же, как и раньше, только постоянная прокрутка цикла устранена. Wait неявно исполняет код, который был удален – Monitor.Enter после Monitor.Exit, но с одним дополнительным шагом в середине: пока блокировка отпущена, он ожидает вызова Pulse из другого потока. Именно это и делает метод Notifier после установки флага go в true. Работа сделана.

Обобщение модели использования Wait и Pulse

Давайте доработаем наш шаблон использования Wait и Pulse. В предыдущем примере внутри блокировки использовалось только одно булево поле – флаг go. В другом сценарии мог бы потребоваться дополнительный флаг, показывающий состояние ожидающего потока – готов или завершен. Экстраполировав эту ситуацию, и предполагая, что набор полей, вовлеченных в блокирование, может быть любым, представим программу в виде следующего псевдокода:
class X 
{
  Блокировочные поля:  один или более объектов, 
  участвующих в условии блокировки, например:
  bool go;   bool ready;   int semaphoreCount;   Queue <Task> consumerQ...
 
  object locker = new object();  // защищает все перечисленные выше поля!
 
  ... SomeMethod 
  {
    ... всякий раз когда нужно блокировать, 
        основываясь на наших блокировочных полях:
    lock (locker)
    {
      while (! Некий набор блокировочных полей ) 
      {
        // Дадим шанс другим потокам изменить блокировочные поля!
        Monitor.Exit(locker);
        Monitor.Enter(locker);
      }
    }
    ... всякий раз когда нужно изменить одно 
        или несколько блокировочных полей:
    lock (locker) 
    {
      изменяем поле(поля)
    }
  }
}
Теперь вставим в наш шаблон Wait и Pulse так же, как и в прошлый раз:
  • Заменим в цикле ожидания переключение блокировки на Monitor.Wait.
  • При каждом изменении условий блокировки вызываем Pulse.
Вот модифицированный псевдокод:
Wait/Pulse шаблон #1: Основной вариант использования Wait/Pulse
class X 
{
  < Блокировочные поля ... >
  object locker = new object();

  ... SomeMethod
  {
    ...
    ... всякий раз когда нужно блокировать, 
        основываясь на наших блокировочных полях:
    lock (locker)
      while (!Некий набор блокировочных полей )
        Monitor.Wait(locker);

    ... всякий раз когда нужно изменить 
        одно или несколько блокировочных полей:
    lock (locker) 
    {
      изменяем поле(поля)
      Monitor.Pulse(locker);
    }     
  }
}
Такой подход дает надежную модель использования Wait и Pulse. Вот её главные особенности:
  • Условие блокировки реализовано с использованием некоторого набора полей (это работает и без Wait и Pulse, просто с ожиданием в цикле).
  • Wait всегда вызывается внутри цикла с проверкой условия блокирования (и внутри оператора lock).
  • Для всех Wait и Pulse и для защиты доступа ко всем блокировочным полям используется единый объект синхронизации (в примере выше – locker).
  • Блокировки кратковременны.
Важно также, что Pulse не принуждает ожидающий поток к возобновлению исполнения. Скорее Pulse только уведомляет его, что кое-что изменилось, рекомендуя перепроверить условие блокировки. Не сигнализирующий, а сам ожидающий поток определяет (в следующей итерации цикла), нужно ли ему продолжать работу или остаться в заблокированном состоянии. Преимущество такого подхода – возможность использования сложных условий блокировки без замысловатой логики синхронизации.
Другое полезное свойство этой модели – устойчивость логики к пропущенным Pulse. Пропуск импульсов сигнализации может произойти, когда Pulse вызывается раньше Wait, например, из-за гонок между ожидающим и сигнализирующим потоками. Поскольку в этой модели каждый сигнал означает “перепроверить условие блокировки” (а не “продолжить работу”), слишком ранний Pulse может быть безопасно проигнорирован, так как условие блокировки проверяется в while до вызова Wait.
Такой дизайн позволяет определить несколько блокировочных полей, составить из них сложное условие блокировки, но при этом использовать единственный объект синхронизации (в предыдущем примере – locker). Обычно это лучше, чем несколько объектов синхронизации, используемых в lockWait и Pulse, так как помогает избежать взаимоблокировок. Кроме того, с одним объектом синхронизации все блокировочные поля читаются и записываются как единое целое, тем самым исключая тонкие ошибки атомарности. Хорошей идеей, однако, будет не использовать объект синхронизации вне необходимой области видимости (объявив как private и собственно объект синхронизации, и все блокировочные поля).

Очередь поставщик/потребитель

Наше простое Wait/Pulse-приложение по сути – очередь поставщик/потребитель, реализованная нами ранее с использованием AutoResetEvent. Поставщик ставит в очередь задачи (обычно в главном потоке), в то время как один или более потребителей в рабочих потоках разбирают и выполняют их одну за другой.
В следующем примере для представления задачи будет использоваться строка. А очередь задач будет, соответственно, такой:
Queue<string> taskQ = new Queue<string>();
Поскольку очередь будет использоваться из нескольких потоков, необходимо обернуть в lock все операции чтения и записи в очередь. Так будет выглядеть постановка задачи в очередь:
lock (locker) 
{
  taskQ.Enqueue("my task");
  Monitor.PulseAll(locker);   // Мы изменили условие блокировки
}
Поскольку изменяется потенциальное условие блокировки, необходимо выдать сигнал. Мы вызываем PulseAll, а не Pulse, так как нужно учитывать, что потребителей может быть несколько, соответственно, ожидать будут несколько потоков.
Далее, требуется, чтобы рабочие потоки были блокированы, когда им нечего делать, другими словами, когда очередь пуста. Следовательно, наше условие блокировки – taskQ.Count==0. Вот выполняющее это выражение Wait:
lock (locker)
  while (taskQ.Count == 0)
    Monitor.Wait(locker);
Следующий шаг – рабочий поток удаляет задачу из очереди и исполняет её:
lock (locker)
  while (taskQ.Count == 0)
    Monitor.Wait(locker);
 
string task;

lock (locker)
  task = taskQ.Dequeue();
То, что получилось, однако, не является потокобезопасным: во время удаления задачи информация о состоянии очереди может быть уже устаревшей, так как она получена в предыдущем выражении lock. Посмотрите, что получится, если запустить одновременно два потока-потребителя с одним-единственным элементом в очереди. Возможно, ни один из них не вошел бы в ожидание в цикле – оба увидели бы в очереди один элемент. Далее они оба попытались бы удалить из очереди этот элемент, с генерацией исключения в потоке, который будет делать это вторым. Для исправления ситуации будем удерживать lock немного дольше – до окончания взаимодействия с очередью:
string task;

lock (locker) 
{
  while (taskQ.Count == 0)
    Monitor.Wait(locker);

  task = taskQ.Dequeue();
}
(Нет необходимости вызывать Pulse после удаления из очереди, так как никакой потребитель не может быть разблокирован имеющимися немногими элементами в очереди).
Как только задача удалена из очереди, нет никаких причин сохранять блокировку. Снятие блокировки в этом месте позволяет потребителю выполнять продолжительную задачу без ненужного блокирования других потоков.
Вот полный текст программы. Как и в версии с AutoResetEvent, постановка в очередь задачи со значением null будет сигналом потребителю на завершение (после завершения других недовыполненных задач). Поскольку потребителей может быть несколько, необходимо будет добавить по null-задаче на каждого, чтобы полностью завершить очередь:
Wait/Pulse шаблон #2: Очередь поставщик/потребитель
using System;
using System.Threading;
using System.Collections.Generic;

public class TaskQueue : IDisposable 
{
  object locker = new object();
  Thread[] workers;
  Queue<string> taskQ = new Queue<string>();

  public TaskQueue(int workerCount) 
  {
    workers = new Thread[workerCount];

    // Создать и запустить отдельный поток на каждого потребителя
    for (int i = 0; i < workerCount; i++)
      (workers [i] = new Thread(Consume)).Start();
  }

  public void Dispose() 
  {
    // Добавить по null-задаче на каждого завершаемого потребителя
    foreach (Thread worker in workers)
      EnqueueTask(null);

    foreach (Thread worker in workers)
      worker.Join();
  }

  public void EnqueueTask(string task) 
  {
    lock (locker) 
    {
      taskQ.Enqueue(task);
      Monitor.PulseAll(locker);
    }
  }

  void Consume()
  {
    while (true) 
    {
      string task;

      lock (locker)
      {
        while (taskQ.Count == 0)
          Monitor.Wait(locker);

        task = taskQ.Dequeue();
      }

      if (task == null)
        return;  // Сигнал на выход

      Console.Write(task);
      Thread.Sleep(1000);       // Имитация длительной работы
    }
  }
}
Вот метод Main, в котором создается очередь задач, задаются два потока-потребителя, для которых в очередь ставится 10 задач:
  static void Main()
  {
    using(TaskQueue q = new TaskQueue(2))
    {
      Console.WriteLine("Помещаем в очередь 10 задач");
      Console.WriteLine("Ожидаем завершения задач...");

      for (int i = 0; i < 10; i++)
        q.EnqueueTask(" Задача" + i);
    }

    // Выход из using приводит к вызову метода Dispose двух TaskQueue,
    // завершая потребителей после выполнения всех задач.
    Console.WriteLine("\r\nВсе задачи выполнены!");
  }
Консольный вывод:
Помещаем в очередь 10 задач
Ожидаем завершения задач...
 Задача0 Задача1 (пауза...) Задача2 Задача3 (пауза...) 
 Задача4 Задача5 (пауза...) Задача6 Задача7 (пауза...)
 Задача8 Задача9 (пауза...)
Все задачи выполнены!
В соответствии с нашим шаблоном проектирования, если удалить PulseAll и заменить Wait на переключение блокировок, мы получим тот же самый результат.
Про экономию сигналов
Посмотрим еще раз на добавление задачи в очередь:
lock (locker) 
{
  taskQ.Enqueue(task);
  Monitor.PulseAll(locker);
}
Вообще говоря, можно было сэкономить на выдаче сигналов, сигналя только тогда, когда действительно есть возможность освободить блокированного исполнителя:
lock (locker) 
{
  taskQ.Enqueue(task);

  if (taskQ.Count <= workers.Length)
    Monitor.PulseAll(locker);
}
Сэкономим мы, однако, немного, так как выдача сигнала занимает менее микросекунды и не влечет за собой накладных расходов для занятых потребителей, поскольку они этот сигнал все равно игнорируют. Правильной практикой для многопоточного кода будет удаление любой необязательной логики: плохо воспроизводимый дефект из-за глупой ошибки – слишком большая цена за экономию одной микросекунды! Вот пример внесения блуждающей ошибки типа “застрявший потребитель”, которая наверняка не будет выявлена при первоначальном тестировании (найди одно отличие):
lock (locker) 
{
  taskQ.Enqueue(task);

  if (taskQ.Count < workers.Length)
    Monitor.PulseAll(locker);
}
Сигнализация без всяких условий защитит вас от этого типа ошибок.
СОВЕТ
Сомневаешься – сигналь! В рассмотренном шаблоне проектирования сигнализация редко может повредить.
Pulse или PulseAll?
В нашем примере есть еще одна возможность для экономии сигналов. После добавления задачи в очередь можно вызвать Pulse, а не PulseAll, и ничего не сломается.
Вспомним отличие: при использовании Pulse может пробудиться максимум один поток (и перепроверить условие блокирования в while); в случае PulseAll пробудятся все ждущие потоки (и перепроверят условие блокирования). Если в очередь добавляется одна задача, для обработки нужен только один потребитель, так что нужно разбудить только одного вызовом Pulse. Это походит на класс спящих детей – если есть только одно мороженое, нет смысла будить их всех и ставить в очередь.
В нашем примере запускаются только два потока-потребителя, так что большой выгоды не извлечь. Если бы потоков-потребителей было десять, некоторый выигрыш от выбора Pulse вместо PulseAll появился бы. Если в очередь добавляются несколько задач, необходимо вызвать Pulseнесколько раз. Это можно сделать в рамках одной конструкции lock, например, так:
lock (locker) 
{
  taskQ.Enqueue("task 1");
  taskQ.Enqueue("task 2");
  Monitor.Pulse(locker);    // "Сигналим двум 
  Monitor.Pulse(locker);    //  ожидающим потокам."
}
Цена одного невызванного Pulse – застрявший поток-потребитель. Эта ошибка будет блуждающей, так как вызов Pulse производит эффект, только когда потребитель находится в состоянии ожидания. Следовательно, можно расширить наш предыдущий лозунг “Сомневаешься – сигналь” до “Сомневаешься – сигналь всем!”
Исключение из этого правила возможно, только когда вычисление условия блокировки занимает слишком много времени.

Использование таймаутов для Wait

Иногда может быть нежелательно или невозможно вызывать Pulse всякий раз, когда изменяется условие блокировки. В качестве примера можно представить условие блокировки, вызывающее метод, периодически запрашивающий информацию из базы данных. Если время ожидания – не проблема, решение простое: определить таймаут при вызове Wait следующим образом:
lock (locker) 
{
  while ( blocking condition )
    Monitor.Wait(locker, timeout);
  ...
Условие блокировки будет перепроверяться как минимум регулярно с интервалом, заданным в таймауте, а также немедленно по получении сигнала. Чем проще условие блокировки, тем меньшее значение таймаута можно использовать без снижения эффективности.
Такой подход, кроме того, хорошо работает, если сигнал будет пропущен из-за ошибки в программе! Возможно, стоит добавлять таймаут ко всем командам Wait в программах, где синхронизация особенно сложна – в качестве страховки от особо загадочных ошибок сигнализации. Также это обеспечивает дополнительную устойчивость к ошибкам, если программа изменяется позже кем-то несведущим.

Гонки и подтверждения

Допустим, мы хотим посигналить рабочему потоку пять раз подряд:
class Race 
{
  static object locker = new object();
  static bool go;
 
  static void Main() 
  {
    new Thread(SaySomething).Start();
 
    for (int i = 0; i < 5; i++)
    {
      lock (locker)
      {
        go = true;
        Monitor.Pulse(locker);
      }
    }
  }
 
  static void SaySomething()
  {
    for (int i = 0; i < 5; i++)
    {
      lock (locker)
      {
        while (!go)
          Monitor.Wait(locker);

        go = false;
      }

      Console.WriteLine("Wassup?");
    }
  }
}
Ожидаемый вывод:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Реальный вывод:
Wassup?
(зависание)
ПРИМЕЧАНИЕ
При тестировании этого примера мы получили «ожидаемый результат», запуская программу, скомпилированную в debug-режиме (не из под отладчика), и «реальный результат» (то есть, зависание) в release-версии – прим.ред.
Эта программа с дефектом: цикл for в главном потоке может прокрутить все пять своих итераций в то время, когда рабочий поток не блокирован. Возможно, даже до того, как он вообще стартует! Наш пример с очередью Поставщик/Потребитель не страдал от этой проблемы, так как если главный поток забегал вперёд рабочего, каждый запрос просто ставился в очередь. Но в данном случае требуется блокировка главного потока на каждой итерации, если рабочий все еще занят предыдущей задачей.
В качестве простого решения можно в каждой итерации цикла for ожидать, пока флаг go не будет сброшен рабочим потоком. Рабочий поток после сброса флага должен вызвать Pulse:
class Acknowledged 
{
  static object locker = new object();
  static bool go;
 
  static void Main() 
  {
    new Thread(SaySomething).Start();
 
    for (int i = 0; i < 5; i++) 
    {
      lock (locker) 
      {
        go = true;
        Monitor.Pulse(locker);
      }

      lock (locker) 
      {
        while (go)
          Monitor.Wait(locker);
      }
    }
  }
 
  static void SaySomething() 
  {
    for (int i = 0; i < 5; i++) 
    {
      lock (locker) 
      {
        while (!go)
          Monitor.Wait(locker);

        go = false;
        Monitor.Pulse(locker); // Надо посигналить
      }

      Console.WriteLine("Wassup?");
    }
  }
}
Консольный вывод:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup? (пять повторов)
Важная особенность такой программы заключается в том, что рабочий поток освобождает блокировку перед выполнением длительного задания (которое должно быть на месте вызова Console.WriteLine). Это гарантирует, что инициатор не будет заблокирован все время, пока рабочий поток исполняет задачу, о которой ему только что посигналили (и будет блокирован, только если рабочий поток занят предыдущей задачей).
В данном примере только один поток (главный) сигналит рабочему потоку о необходимости выполнить задачу. Если несколько потоков начнут сигналить рабочему – используя текущую логику из метода Main – у нас начнутся проблемы. Два сигналящих потока могли бы последовательно исполнить следующую строку кода:
lock (locker) 
{
  go = true;
  Monitor.Pulse(locker);
}
что привело бы к потере второго сигнала, если рабочий поток в это время не до конца отработал по первому сигналу. Можно учесть такую ситуацию, использовав пару флажков – “ready” и “go”. Флажок “ready” показывает, что рабочий поток готов принять новую задачу, “go”, как и раньше, – сигнал начинать. Решение аналогично предыдущему примеру, который делал то же самое, используя два AutoResetEvent, за исключением лучшей расширяемости. Вот переработанный шаблон, с нестатическими полями:
Wait/Pulse шаблон #3: Двусторонняя сигнализация
public class Acknowledged 
{
  object locker = new object();
  bool ready;
  bool go;  

  public void NotifyWhenReady() 
  {
    lock (locker)
    {
      // ожидать, если рабочий поток занят предыдущей задачей
      while (!ready)
        Monitor.Wait(locker);

      ready = false;
      go = true;
      Monitor.PulseAll(locker);
    }
  }

  public void AcknowledgedWait()
  { 
    // Отобразить готовность принять запрос
    lock (locker) 
    {
      ready = true;
      Monitor.Pulse(locker);
    }

    lock (locker)
    {
      while (!go)
        Monitor.Wait(locker);     // Ожидать установки "go"

      go = false;
      Monitor.PulseAll(locker);   // Подтвердить сигнал
    }
      
    Console.WriteLine("Wassup?"); // Выполнить задачу
  }
}
Для проверки запустим два параллельных потока, каждый из которых посигналит рабочему потоку пять раз. Тем временем главный поток будет ожидать десяти уведомлений:
public class Test
{
  static Acknowledged a = new Acknowledged();
 
  static void Main()
  {
    new Thread(Notify5).Start();     // Запустить два параллельных
    new Thread(Notify5).Start();     // "уведомляльщика"...
    Wait10();                         // ... и одного ожидающего
  }
 
  static void Notify5()
  {
    for (int i = 0; i < 5; i++)
      a.NotifyWhenReady();
  }
 
  static void Wait10()
  {
    for (int i = 0; i < 10; i++)
      a.AcknowledgedWait();
  }
}
Консольный вывод:
Wassup?
Wassup?
...
Wassup? (десять повторов)
В методе Notify флаг ready очищается перед выходом из lock. Это жизненно важно: таким образом предотвращается последовательная сигнализация двумя уведомляющими потоками без перепроверки флага. Для простоты установка флага go и вызов PulseAll выполняются в той же самой конструкции lock, однако можно поместить эти две инструкции в отдельные конструкции lock, и ничего не сломается.

Имитация Wait Handle

Возможно, вы заметили шаблонный код в предыдущем примере: оба цикла ожидания имеют следующую структуру:
lock (locker) 
{
  while (!flag) 
    Monitor.Wait(locker);

  flag = false;
 ...
}
где flag устанавливается в true в другом потоке. В действительности это имитация AutoResetEvent. А если опустить flag=false, то ManualResetEvent. Используя целочисленное поле, Pulse и Wait, можно имитировать Semaphore. Фактически единственный WaitHandle, который нельзя имитировать при помощи Pulse и Wait – это Mutex, так как эта функциональность предоставляется оператором lock.
Имитация статических методов, которые работают с несколькими WaitHandle, в большинстве случаев проста. Эквивалент вызова WaitAll с несколькими WaitHandle – не что иное, как условие блокировки, включающее все флаги, используемые вместо WaitHandle:
lock (locker) 
{
  while (!flag1 && !flag2 && !flag3...)
    Monitor.Wait(locker);
Это может быть особенно полезно, так как WaitAll в большинстве случаев не пригоден к использованию из-за проблем с унаследованным COM-кодом. Имитация WaitAny – просто вопрос замены оператора && оператором ||.
С имитацией SignalAndWait сложнее. При вызове он сигналит на одном хендле при ожидании на другом в атомарной операции. Ситуация аналогична транзакциям в распределенной базе данных – необходим двухфазный commit! Если, например, необходимо посигналить флагом flagAпри ожидании на флаге flagB, придется разделить каждый флаг на два, получив в результате код типа такого:
lock (locker)
{
  flagAphase1 = true;
  Monitor.Pulse(locker);

  while (!flagBphase1)
    Monitor.Wait(locker);
 
  flagAphase2 = true;
  Monitor.Pulse(locker);

  while (!flagBphase2)
    Monitor.Wait(locker);
}
возможно, с дополнительной rollback-логикой для отката flagAphase1, если первый Wait сгенерирует исключение в результате прерывания по Interrupt или аварийного завершения потока по Abort. В этой ситуации использовать WaitHandle гораздо проще. В действительности, однако, атомарная сигнализация-и-ожидание – это редкое требование.
Ожидание стыковки
Wait и Pulse можно использовать так же, как для стыковки двух потоков используется WaitHandle.SignalAndWait. В следующем примере, можно сказать, имитируются два ManualResetEvent (другими словами, мы определяем два булевых флажка!), а затем выполняется взаимная сигнализация-и-ожидание установкой одного флага при ожидании другого. В данном случае нет необходимости в истинной атомарности сигнализации-и-ожидания, так что нет нужды и в двухфазном commit. Пока мы устанавливаем наш флаг и вызываем Wait в пределах одного lock-а, стыковка будет работать:
class Rendezvous 
{
  static object locker = new object();
  static bool signal1, signal2;
 
  static void Main() 
  {
    // Заставим каждый поток бездействовать в течение 
    // случайного промежутка времени
    Random r = new Random();
    new Thread(Mate).Start(r.Next(10000));
    Thread.Sleep(r.Next(10000));
 
    lock (locker)
    {
      signal1 = true;
      Monitor.Pulse(locker);

      while (!signal2)
        Monitor.Wait(locker);
    }

    Console.Write("Одновременно! ");
  }
 
  // Этот метод вызывается в потоке
  static void Mate(object delay) 
  {
    Thread.Sleep((int) delay);

    lock (locker) 
    {
      signal2 = true;
      Monitor.Pulse(locker);

      while (!signal1)
        Monitor.Wait(locker);
    }

    Console.Write("Одновременно! ");
  }
}
Консольный вывод:
Одновременно! Одновременно! (почти одновременно)

Wait и Pulse vs. Wait Handles

Поскольку Wait и Pulse являются наиболее гибкой конструкцией сигнализации, они могут использоваться практически в любой ситуации. У WaitHandle, однако, есть два преимущества:
  • Они могут взаимодействовать из разных процессов.
  • Их проще понять и тяжелее сломать.
В дополнение к этому с WaitHandle проще взаимодействовать – их можно передавать через параметры методов. В пулах потоков это свойство с пользой применяется.
В смысле производительности Wait и Pulse имеют небольшой перевес, если следовать вот такому образцу дизайна:
lock (locker)
  while (blocking condition)
    Monitor.Wait(locker);
и условие блокировки ложно с самого начала. Единственные накладные расходы – это выход из lock (десятки наносекунд) против нескольких микросекунд на вызов WaitHandle.WaitOne. Конечно, все это при условии, что борьбы за блокировку не происходит; даже самой короткой борьбы за блокировку было бы достаточно, чтобы выровнять результаты; а частая борьба за блокировку сделала бы WaitHandle быстрее!
С учетом потенциальных различий разных CPU, операционных систем, версий CLR и программной логики, несколько микросекунд вряд ли могут быть причиной для выбора между WaitHandle и Wait/Pulse.
Правильно будет использовать WaitHandle, когда одна из конструкций естественно соответствует требуемой работе, а если такой конструкции нет – использовать Wait и Pulse.

Suspend и Resume

Поток может быть явно приостановлен и продолжен с помощью методов Thread.Suspend и Thread.Resume. Это механизм никак не пересекается с блокировками, обсуждаемыми ранее. Обе системы независимы и работают параллельно.
Поток может приостановить себя или другой поток. Вызов Suspend переводит поток на короткое время в состояние SuspendRequested, а затем, при достижении безопасной точки для сбора мусора – в состояние Suspended. Из этого состояния поток может продолжить выполнение только с помощью другого потока, который вызовет для него метод ResumeResume работает только для приостановленных, но не для заблокированных потоков.
В .NET 2.0 Suspend и Resume объявлены не рекомендованными к применению из-за опасности произвольной приостановки другого потока. Если поток, удерживающий блокировку на критическом ресурсе, будет приостановлен, может зависнуть целое приложение (или компьютер). Это намного опаснее вызова Abort – который привел бы к освобождению всех блокировок – по крайней мере, теоретически – на основании кода в блоках finally.
Можно, однако, безопасно вызывать Suspend для текущего потока, реализовав при этом простой механизм синхронизации для рабочего потока в цикле выполнение задачи/вызов Suspend для себя/ожидание вызова Resume (“побудки”) главным потоком, когда будет готова следующая задача. Сложность заключается в определении, действительно ли рабочий поток сейчас приостановлен. Посмотрите следующий код:
worker.NextTask = "MowTheLawn";

if ((worker.ThreadState & ThreadState.Suspended) > 0)
  worker.Resume();
else
  // Нельзя вызывать Resume, так как поток уже выполняется.
  // Посигналим рабочему потоку флагом:
  worker.AnotherTaskAwaits = true;
Это грубейшее нарушение потоковой безопасности – код может быть вытеснен в любой точке этих пяти строк, и пока он будет ожидать своего кванта времени, рабочий поток будет исполняться и может изменить свое состояние. Несмотря на то, что разрулить эту ситуацию можно, решение будет более сложным, чем его альтернатива – использование конструкций сигнализации, таких как AutoResetEvent или Monitor.Wait. Это делает Suspend и Resume совершенно бесполезными.
Нерекомендуемые методы Suspend и Resume имеют два режима – опасный и бесполезный.

Аварийное завершение потоков

Поток может быть аварийно завершен при помощи метода Abort:
class Abort 
{
  static void Main()
  {
    Thread t = new Thread(delegate() { while (true) ; }); // Бесконечный цикл
    t.Start();
    Thread.Sleep(1000);  // Пусть поработает секунду...
    t.Abort();           // после чего принудительно завершим его.
  }
}
Аварийно завершаемый поток немедленно переходит в состояние AbortRequested. Если завершение проходит как ожидалось, поток переходит в состояние Stopped. Поток, вызвавший Abort, может ожидать этого, вызвав Join:
class Abort 
{
  static void Main()
  {
    Thread t = new Thread(delegate() { while (true) ; });  // Бесконечный цикл
    Console.WriteLine(t.ThreadState);     // Unstarted
 
    t.Start();
    Thread.Sleep(1000);
    Console.WriteLine(t.ThreadState);     // Running
 
    t.Abort();
    Console.WriteLine(t.ThreadState);     // AbortRequested
 
    t.Join();
    Console.WriteLine(t.ThreadState);     // Stopped
  }
}
Вызов Abort вызывает генерацию исключения ThreadAbortException в завершаемом потоке, в большинстве случаев прямо там, где поток находится в это время. Завершаемый поток может обработать это исключение, но оно будет автоматически выброшено снова в конце блока catch(чтобы содействовать завершению потока, как это и предполагалось). Можно, однако, предотвратить автоматический повторный выброс исключения с помощью вызова Thread.ResetAbort в блоке catch. В этом случае поток возвращается в состояние Running (из которого его можно опять попытаться принудительно прекратить). В следующем примере рабочий поток воскресает из мертвых всякий раз после вызова Abort:
class Terminator 
{
  static void Main()
  {
    Thread t = new Thread(Work);
    t.Start();

    Thread.Sleep(1000);
    t.Abort();

    Thread.Sleep(1000);
    t.Abort();

    Thread.Sleep(1000);
    t.Abort();
  }
 
  static void Work()
  {
    while (true)
    {
      try
      {
        while (true)
          ;
      }
      catch(ThreadAbortException) { Thread.ResetAbort(); }

      Console.WriteLine("Я не умру!");
    }
  }
}
Необработанное ThreadAbortException, однако, не приводит к аварийному завершению приложения, в отличие от других типов исключений.
Abort будет воздействовать на поток в любом состоянии – рабочем, заблокированном, приостановленном или остановленном. Если, однако, принудительно завершается приостановленный поток, генерируется ThreadStateException – на сей раз в вызывающем потоке – и принудительное завершение не может быть произведено, пока поток не возобновит работу. Вот как можно принудительно завершить приостановленый поток:
try { suspendedThread.Abort(); }
catch(ThreadStateException) { suspendedThread.Resume(); }
// Сейчас suspendedThread будет принудительно прекращен.

Сложности с Thread.Abort

Предположив, что аварийно завершаемый поток не будет вызывать ResetAbort, можно было бы ожидать, что убийство произойдет довольно быстро. Но, как это обычно случается, с хорошим адвокатом можно провести в камере смертников довольно долгое время! Вот несколько факторов, которые могут задержать поток в состоянии AbortRequested:
  • Статические конструкторы классов не могут быть принудительно прерваны на середине (чтобы не испортить класс, необходимый для дальнейшей работы домена приложения).
  • Все блоки catch/finally не могут быть принудительно завершены на полпути.
  • Если принудительно завершаемый поток исполняет в это время неуправляемый код, его выполнение продолжается до первой инструкции управляемого кода.
Последний фактор особенно неприятен, так как .NET Framework часто вызывает неуправляемый код, иногда оставаясь там в течение долгого времени. Примером может быть работа с сетью или базами данных. Если сетевой ресурс или сервер баз данных умирают или не спешат с ответом, выполнение может оставаться в неуправляемом коде в течение минут, в зависимости от используемой реализации. В этих случаях было бы неприятно зависнуть на Join, ожидая завершения потока, особенно без использования таймаута.
Принудительное завершение работы чистого .NET-кода менее проблематично, если для гарантии надлежащей очистки после выброса ThreadAbortException используются блоки try/finally или конструкция using. Однако и в этом случае нужно быть готовым к неприятным сюрпризам. Посмотрите, например, на следующий код:
using(StreamWriter w = File.CreateText("myfile.txt"))
  w.Write("Abort-Safe?");
C#-оператор using – на самом деле просто сокращенная запись для следующего кода:
StreamWriter w;
w = File.CreateText("myfile.txt");
try     { w.Write("Abort-Safe"); }
finally { w.Dispose();            }  
Abort может случиться после создания StreamWriter, но перед началом блока try. Фактически, углубляясь в IL, можно увидеть, что это может произойти даже между созданием StreamWriter и присвоением значения w:
IL_0001:  ldstr      "myfile.txt"
IL_0006:  call       class [mscorlib]System.IO.StreamWriter
                     [mscorlib]System.IO.File::CreateText(string)
IL_000b:  stloc.0
.try
{
  ...
В этом случае до вызова Dispose в блоке finally дело не дойдет, хэндл открытого файла зависнет, пресекая любые попытки создать myfile.txt в течение оставшейся жизни домена приложения.
В действительности ситуация еще хуже, так как Abort может иметь место внутри реализации File.CreateText. Исходные коды этого метода официально не открыты, но к счастью, в .NET трудно что-либо действительно закрыть, можно снова обратиться к ILDASM, а лучше к Lutz Roeder's Reflector – и заглянув соответствующую сборку, увидеть, что там вызывается конструктор StreamWriter, который имеет следующую логику:
public StreamWriter(string path, bool append, ...)
{
  ...
  ...
  Stream stream1 = StreamWriter.CreateFile(path, append);
  this.Init(stream1, ...);
}
Нигде в этом конструкторе нет блоков try/catch, а это значит, что вызов Abort во время выполнения нетривиального метода Init подвесит только что созданный StreamWriter безо всякого способа закрыть принадлежащий ему файловый хэндл.
Поскольку дизассемблирование каждого используемого вызова CLR, очевидно, непрактично, встает вопрос, как же писать методы, которые можно без проблем аварийно завершать. Самый очевидный выход из ситуации – не завершать аварийно другие потоки вообще, а использовать специальное булево поле, через которое сигнализировать потоку о необходимости завершения. Рабочий поток должен периодически проверять это поле, элегантно завершаясь, если оно установлено в true. Как ни странно, самым элегантным завершением для рабочего потока будет вызов Abort для себя самого, подойдет также явный выброс исключения. Это гарантирует правильную очистку потоков в процессе выполнения блоков catch/finally – аналогично вызову Abort из другого потока, за исключением того, что исключение генерируется в выбранном нами месте:
class ProLife 
{
  public static void Main()
  {
    RulyWorker w = new RulyWorker();
    Thread t = new Thread(w.Work);
    t.Start();
    Thread.Sleep(500);
    w.Abort();
  }
 
  public class RulyWorker 
  {
    // Ключевое слово volatile гарантирует, что abort не будет
    // кешироваться потоком
    volatile bool abort;   
 
    public void Abort() { abort = true; }
 
    public void Work() 
    {
      while (true)
      {
        CheckAbort();
        // Делаем что-то полезное...
        try      { OtherMethod(); }
        finally  { /* требуемая очистка */ }
      }
    }
 
    void OtherMethod()
    {
      // Делаем что-то полезное...
      CheckAbort();
    }
 
    void CheckAbort() 
    {
      if (abort)
        Thread.CurrentThread.Abort();
    }
  }
}
Вызов Abort для текущего потока – один из вариантов, при которых Abort будет полностью безопасен. Другой вариант – точное знание, что прерываемый извне поток находится в определенной секции кода, обычно достигаемое использованием механизмов синхронизации типа WaitHandle или Monitor.Wait. Третий безопасный вариант вызова Abort – если вслед за ним последует завершение домена приложений или процесса.

Завершение домена приложений

Еще один способ безопасно прикончить рабочий поток – заставить поток работать в собственном домене приложений. После вызова Abort просто сносится весь домен, что приводит к освобождению всех зависших ресурсов.
Строго говоря, первый шаг – принудительное завершение потока – на самом деле не нужен, так как когда домен приложений выгружается, все потоки, исполняющие код в этом домене, автоматически принудительно завершаются. Однако если потоки при этом не завершаются своевременно (возможно, из-за кода в блоках finally, или по причинам, обсуждавшимся выше), домен приложений не выгружается, и выбрасывается исключение CannotUnloadAppDomainException. По этой причине лучше явно вызвать Abort для рабочих потоков, а затем вызвать Join с заданным таймаутом (который вы можете контролировать) перед выгрузкой домена приложений.
В следующем примере рабочий поток в бесконечном цикле создает и закрывает файл, используя небезопасный в случае аварийного завершения метод File.CreateText. Главный поток занимается тем, что запускает и завершает рабочие потоки. Обычно это заканчивается IOException в течении одной-двух итераций из-за реализации File.CreateText, оставляющей незакрытые файловые хэндлы при аварийном завершении потока:
using System;
using System.IO;
using System.Threading;
 
class Program 
{
  static void Main()
  {
    while (true)
    {
      Thread t = new Thread(Work);
      t.Start();
      Thread.Sleep(100);
      t.Abort();
      Console.WriteLine("Aborted");
    }
  }
 
  static void Work()
  {
    while (true)
      using(StreamWriter w = File.CreateText("myfile.txt"))
         ...
  }
}
Консольный вывод:
Aborted
Aborted
System.IO.IOException: The process cannot access the file '...myfile.txt' 
because it is being used by another process.
А вот модифицированная программа, в ней рабочий поток исполняется в собственном домене, который выгружается после принудительного завершения потока. Она бесконечно выполняется без ошибок, так как домен приложений, выгружаясь, освобождает файловый хэндл:
class Program 
{
  static void Main(string [] args)
  {
    while (true)
    {
      AppDomain ad = AppDomain.CreateDomain("worker");
      Thread t = new Thread(delegate() { ad.DoCallBack(Work); });
      t.Start();
      Thread.Sleep(100);
      t.Abort();

      if (!t.Join(2000)) 
      {
        // Поток не завершился – можно предпринять что-либо,
        // если есть что. К счастью, в данном случае можно ожидать,
        // что поток будет завершаться *всегда*.
      }

      AppDomain.Unload(ad); // Выгружаем загрязненный домен!
      Console.WriteLine("Aborted");
    }
  }
 
  static void Work()
  {
    while (true)
      using(StreamWriter w = File.CreateText("myfile.txt")) 
        ...
  }
}
Консольный вывод:
Aborted
Aborted
Aborted
Aborted
...
Создание и разрушение доменов приложений классифицируется в мире потоков как операция, отнимающая относительно много времени (несколько миллисекунд), так что, наверное, не стоит делать это в цикле. Кроме того, разделение, привносимое доменами приложений, добавляет другой элемент, который может быть и полезным, и вредным, в зависимости от назначения многопоточной программы. В контексте unit-тестирования, например, исполнение потоков в отдельных доменах может быть очень полезным.

Завершение процессов

Еще один вариант, при котором поток может завершиться – завершение родительского процесса. Пример – рабочий поток с установленным в true свойством IsBackground, и главный поток, завершающийся, когда рабочий еще исполняется. Фоновый поток не способен продлить жизнь приложения, так что процесс завершается, унося с собой фоновый поток.
Когда поток завершается из-за родительского процесса, он останавливается намертво, никакие блоки finally не выполняются.
Так же выглядит ситуация с завершением пользователем зависшего приложения через Диспетчер задач Windows, или с процессом, завершаемым программно с помощью Process.Kill.

источник

Комментарии