Контейнер concurrent_queue в Visual Studio 2010

by Alexei 5. December 2009 19:15

В Visual Studio 2010 версия Beta 2 которая вышла более месяца назад появились 2 новых параллельных контейнера: concurrent_queue<T> и concurrent_vector<T>. Оба этих контейнера являются потокобезопасными и созданы для избежания узких мест в параллельных алгоритмах.

От queue к concurrent_queue

Полезно будет сравнить реализацию concurrent_queue со стандартной queue из С++, которая не является потокобезопасной. Основное API вставки/удаления из очереди у std::queue выглядит следующим образом:

template<class T>
class queue
{
public:

    void push(const T&); 

    size_type size() const;

    T& front();
    void pop();
};

Забыв на минуту о том что реализация std::queue не является потокобезопасной можно отметить что та часть кода, которая выполняет вставку в очередь может быть потокобезопасной. Добавление одного элемента в очередь может быть выполнено при помощи единственного вызова метода push(). И эта операция является атомарной поскольку она выполняется при помощи единственного вызова метода, что значит что добавляемый элемент будет добавлен в очередь к тому времени, когда push() закончит свою работу.

Однако операция выборки из очереди не является и не может являться атомарной потому что требует вызова 3-х различных методов: size(), front(), и pop().

if (q.size() > 0)
{

 

    T x = q.front(); 
    q.pop();

}

Чтобы совершить безопасную выборку из очереди необходимо во-первых убедиться что очередь не пуста, чтобы избежать удаления элемента из пустой очереди. Затем нужно получить значение первого элемента в очереди. После того как значение первого элемента получено, его надо удалить из очереди вызвав метод pop(). Параллельные вставка и удаление из очереди могут привести к непредсказуемому результату. Здесь приведены 3 потенциальных опасности такого поведения:

  • Внутренняя реализация std::queue может быть повреждена, что приведёт к неопределённому поведению, скорее всего к аварийному завершению работы программы.
  • Утверждению (q.size() > =0) нельзя доверять если сразу после него будут вызваны push() или pop(). Предположим что очередь была пуста – (q.size() == 0), если произойдёт параллельный вызов push() сразу после этой проверки, то остаток выборки из очереди будет пропущен несмотря на то, что в очереди есть ровно один элемент. Если в очереди есть ровно один элемент (q.size() == 1) и если произойдёт параллельно попытка вызвать метод pop() сразу после этой проверки, то она сделает очередь пустой. Этот вызов pop() закончится неудачей, т.к. очередь уже пуста, что в отладочном режиме приведёт к появлению окошка assert.
  • Два потока могут выбирать из очереди одновременно. Первый поток может получить первый элемент очереди и затем оказаться прерванным. В это время второй поток начнёт выполняться и получит точно такое же значение первого элемента. Если ничего плохого больше не произойдёт, то оба эти потока выберут из очереди один и тот же элемент.

Почему std::queue реализовано именно таким образом? Предположим что front() и pop() были объединены в единый метод и вызваны следующим образом:

T x = q.pop(); // Гипотетический API

Если оператор присваивания копии типа T вызовет исключение, удаляемое из очереди значение будет потеряно. Разделяя возвращение элемента из очереди от его удаления можно избежать подобной ситуации. Это разделение делает API std::queue безопасным для исключений, но не потокобезопасным.

Потокобезопасные операции в concurrent_queue

Контейнер concurrent_queue является потокобезопасным по отношению к следующим операциям:

  • Добавление параллельно с добавлением
  • Удаление параллельно с удалением
  • Добавление параллельно с удалением

Эти операции внутренне синхронизированы с использованием  безопасного алгоритма.

Добавление в очередь: операция добавления в очередь аналогична подобной из std::queue

void push(const T& source)

Этот код добавит исходное значение в очередь синхронизировано с другими операциями вставки/удаления.

Удаление из очереди: вместо вызова трёх методов как в std::queue метод dequeue контейнера concurrent_queue инкапсулирован в один-единственный метод try_pop():

bool try_pop(T& destination);

Обратите внимание на имя метода, который пытается удалить элемент из начала очереди. Если удаление прошло удачно, то удалённое значение помещается в специальный параметр и метод возвращает “true”.

Ранее мы убедились в том, что проверка очереди на пустоту перед удалением элемента могла привести к ошибкам. Метод try_pop() решает эту проблему возвращая “false” если произошла попытка удалить элемент из пустой очереди.

В отличие от реализации std::queue реализация concurrent_queue не является безопасной для исключений. Если оператор присваивания типа “T” бросит исключение, удаляемое из очереди значение будет потеряно.

Потоконебезопасные операции в concurrent_queue

Методы, описываемые здесь могут привести к ошибкам при вызове параллельно с методами push() и try_pop(). Их стоит использовать только после завершения всех параллельных операций.

empty: этот метод возвращает true если очередь пуста:

bool empty() const;

Хотя технически этот метод является потокобезопасным(т.к. состояние очереди он не изменяет),  значение которое он возвращает может быть опасно использовать т.к. оно сразу же может стать некорректным, если параллельно будут вызваны push() или try_pop().

unsafe_size: название метода говорит само за себя

size_type unsafe_size() const;

Данный метод может привести к некорректным результатам если будет вызван параллельно с push() или try_pop(). Чтобы понять почему это так, необходимо детально понять как работает concurrent_queue. Существуют две пользовательские переменные в concurrent_queue, которые используются для подсчёта добавленных и удалённых элементов за время существования объекта: _Tail_counter и _Head_counter. При добавлении элемента в очередь увеличивается _Tail_counter. При удалении элемента из очереди увеличивается _Head_counter. Таким образом количество элементов в очереди можно вычислить по формуле: (_Tail_counter - _Head_counter), именно так и вычисляется unsafe_size(). Теперь рассмотрим следующую ситуацию:

  1. Предположим что _Tail_counter = 12 и _Head_counter is 10, т.е. в очереди ровно 2 элемента.
  2. Поток 1 вызывает unsafe_size(). Поток начинает выполнять метод, и считывает одну из переменных, которая имеет значение 12.
  3. Поток 2 выполняет 5 операций push(), затем 5 операций try_pop(). _Tail_counter теперь равен 17, а _Head_counter 15. В очереди по прежнему 2 элемента.
  4. Поток 1 продолжает выполнение unsafe_size(). Метод получает значение _Head_counter, которое равно 15. Теперь он вычтет новое значение _Head_counter(15) из старого  значения _Tail_counter(12) и вернёт (12-15)=-3 как размер очереди. Однако *size_type* это безразмерный тип, поэтому реальная длина очереди станет равной 4294967293.

clear: Теперь очистка всех элементов очереди не является потокобезопасной во время выполнения других параллельных операций:

void clear();

Если после завершения всех параллельных операций вы хотите убедиться, что очередь пуста, вы можете вызвать этот метод. Деструктор concurrent_queue тоже неявно удалит все элементы из очереди.

Iretation: итерация по concurrent_queue не является потокобезопасной, поэтому ко всем методам, возвращающим итераторы добавлен префикс “unsafe_”.

iterator unsafe_begin();
iterator unsafe_end();
const_iterator unsafe_begin() const;
const_iterator unsafe_end() const;

Итерация одновременно с операциями push() и try_pop() приведёт к неопределённым результатам. Тем не менее в целях отладки эти методы могут пригодиться чтобы вывести все элементы concurrent_queue.

Производитель-потребитель с помощью concurrent_queue

concurrent_queue это идеальный контейнер для сценария производитель-потребитель. В приведённом примере производитель добавляет в очередь 1000 элементов, в то время как главный поток удаляет их:

// Task that produces 1000 items
void ProducerTask(void* p) {
    concurrent_queue<int>* pq = (concurrent_queue<int>*)p;

    for (int i = 0; i < 1000; ++i)
        pq->push(i);
} 

void Producer_Consumer() {
    concurrent_queue<int> q; 

    // Schedule a task to produce 1000 items
    CurrentScheduler::ScheduleTask(ProducerTask, &q); 

    // Consume 1000 items
    for (int i = 0; i < 1000; ++i) {
        int result = -1;

        while (!q.try_pop(result))
           
Context::Yield();

        assert(result == i);
    }
}

Такая реализация гарантирует что производитель добавит 1000 элементов в очередь. Неудача при удалении из очереди означает что потребитель опережает производителя, в конечном итоге производитель догонит его и добавит элемент в очередь. Таким образом, очень важно вызывать метод Context::Yield() внутри цикла, который совместно пропустит все выполняющиеся потоки, позволяя производителю стартовать. Чтобы понять почему это важно достаточно представить этот код выполняющимся на однопроцессорной машине. Без Context::Yield() потребитель занял бы место в ожидании появления элемента в очереди, что привело бы к ситуации “голодания”(starvation).

Отладка

Внутренние структуры данных concurrent_queue очень сложны. Во время отладки разработчики хотят увидеть содержание очереди, а не её внутренних структур данных. В Visual Studio 2010 Beta 2 появилась возможность смотреть содержание этих контейнеров (concurrent_vector и concurrent_queue) как содержание обычных контейнеров STL.

image5

Ссылки

Страница загрузки Visual Studio 2010 Beta 2

Visual C++ на MSDN

Оригинал(Англ.)

Progg it

Tags: , ,

Comments

12/5/2009 7:22:58 PM #

trackback

Контейнер concurrent_queue в Visual Studio 2010

Thank you for submitting this cool story - Trackback from progg.ru

progg.ru

12/7/2009 10:17:09 AM #

Alexander

> 2.Поток 1 вызывает usafe_size(). Метод возвращает значение 12.
Неверно переведено! Поток начинает выполнять метод, и считывает одну из переменных, которая имеет значение 12. Вторая переменная будет считьана на шаге 4. А на 4 шаге поток ничего больше не вызывает, а продолжает выполнение unsafe_size()... Иначе все приведенные шаги описания конкуренции становятся непонятны.

Alexander Russia

12/7/2009 2:14:46 PM #

Alexei

Спасибо за пояснения, я действительно не так перевёл!

Alexei Russia

Add comment




  Country flag

biuquote
  • Comment
  • Preview
Loading



Powered by BlogEngine.NET 1.6.1.0
Theme by Extensive SEO