Синхронизация потоков потребителей

Основные ограничения синхронизации, необходимые для корректного функционирования системы. Добавление в код производителя и потребителя операторов синхронизации для обеспечения ее корректной работы. Сигнал конечного буфера производителя-потребителя.

Рубрика Программирование, компьютеры и кибернетика
Вид курсовая работа
Язык русский
Дата добавления 05.12.2012
Размер файла 167,0 K

Отправить свою хорошую работу в базу знаний просто. Используйте форму, расположенную ниже

Студенты, аспиранты, молодые ученые, использующие базу знаний в своей учебе и работе, будут вам очень благодарны.

Размещено на http://www.allbest.ru/

Оглавление

  • 1. Постановка задачи
  • 2. Сигнал производителя - потребителя
  • 3. Решение проблемы производителя потребителя
  • 4. Deadlock #4
  • 5. Производитель - потребитель с конечным буфером
  • 6. Сигнал конечного буфера производителя потребителя
  • 7. Конечный буфер производителя потребителя. Решение
  • 8. ОСРВ RTX
  • 9. Постановка задачи
  • 10. Листинг программы
  • 11. Результаты работы
  • Список литературы

1. Постановка задачи

В многопоточных программах часто возникает разделение труда между потоками. В общей картине некоторые потоки являются производителями, а некоторые потребителями. Производители создают данные и помещают их в структуру данных, потребители забирают данные и обрабатывают их.

Хорошим примером являются программы, управляемые событиями. "Событие" это то, что происходит для того, что бы программа ответила: пользователь нажимает кнопку или перемещает мышь, блок данных поступает с диска, пакет данных приходит по сети, заканчивается выполняемая операция.

Всякий раз, когда происходит событие, поток-производитель создает объект события и добавляет его в буфер событий. Одновременно с этим, потребительские потоки принимают события из буфера и обрабатывают их. В этом случае потребителей называют "обработчиками событий"

Есть несколько ограничений синхронизации, которые нужны нам для того, что бы система функционировала корректно.

* В то время как элемент добавляется или удаляется из буфера, буфер находится в несогласованном состоянии. Таким образом, потоки должны иметь эксклюзивный доступ к буферу.

* Если поток-потребитель приходит, в то время, когда буфер пуст, происходит блокировка, пока производитель не добавляет новый элемент.

Предположим, что производители должны выполнить следующие операции многократно:

1 event = waitForEvent()

2 buffer.add(event)

Листинг. 1 Основной код производителя

Кроме того, предполагается, что потребители выполнят следующие операции:

1 event = buffer.get()

2 event.process()

Листинг. 2 Основной код производителя потребителя

Как было указано выше, доступ к буферу должен быть эксклюзивным, но waitForEvent и event.process могут сработать одновременно.

Задача: Добавить в код производителя и потребителя операторы синхронизации, для обеспечения ее корректной работы.

2. Сигнал производителя - потребителя

Переменные, которые мы могли бы использовать:

1 mutex = Semaphore(1)

2 items = Semaphore(0)

3 local event

Листинг. 3 Инициализация производителя-потребителя

Взаимное исключение обеспечивает эксклюзивный доступ к буферу. Когда переменная положительна, это указывает число элементов в буфере, когда она отрицательна, это указывает число потребительских потоков в очереди.

Переменная event является локальной переменной, которая в данном контексте означает, что каждый поток имеет свою версию.

До сих пор мы предполагали, что у всех потоков есть доступ ко всем переменным, но мы назначим переменную к каждому потоку.

Есть несколько способов, как это может быть реализовано в различных условиях.

* Если каждый поток имеет свой собственный стек времени выполнения, то любые переменные, находящиеся в стеке, являются переменными конкретного потока.

* Если потоки представлены как объекты, мы можем добавить атрибут к каждому объекту потоку

* Если потоки имеют уникальные идентификаторы, мы можем использовать идентификаторы в качестве индекса массива или хеш-таблицы, и хранить там данные для каждого потока. В большинстве программы переменные являются локальными, однако в нашем случае переменные будут общими, если они явно не обвялены как локальные.

3. Решение проблемы производителя потребителя

Код решения:

1 event = waitForEvent()

2 mutex.wait()

3 buffer.add(event)

4 items.signal()

5 mutex.signal()

Листинг. 4 Решение производителя

Производитель не должен получить эксклюзивный доступ к буферу, пока не произойдет событие. Несколько потоков могут выполнить waitForEvent одновременно. Items семафоры отслеживают число элементов в буфере. Каждый раз, когда производитель добавляет элемент, он сигнализирует items, постепенно увеличивая его на один.

Код потребителя аналогичен.

1 items.wait()

2 mutex.wait()

3 event = buffer.get()

4 mutex.signal()

5 event.process()

Листинг. 5 Решение потребителя

Буферные операции защищены мьютексами, но прежде чем потребитель доберется до него, он должен уменьшить items. Если items равен нулю или отрицательный, потребитель блокируется, пока производитель не подаст сигнал.

Хотя это решение правильное, есть возможность внести в него небольшое улучшение. Представим себе, что по крайне мере один потребитель стоит в очереди, когда производитель сигнализирует items. Если диспетчер позволит работать потребителю, что произойдет дальше? Произойдет немедленная блокировка мьютексом, который пока еще захвачен производителем.

Блокировка и активация являются средне затратными операциями, их излишнее выполнение может ухудшить качество программы. Таким образом, вероятно, было бы лучше перестроить производителя вот так:

1 event = waitForEvent()

2 mutex.wait()

3 buffer.add(event)

4 mutex.signal()

5 items.signal()

Листинг. 6 Улучшенное решение производителя

Теперь мы можем не беспокоиться разблокировать потребителя, пока не будем знать, что работа может быть продолжена (кроме того редкого случая, когда другой производитель захватит мьютекс).

Однако в этом решении есть проблема. Семафор items отслеживает число элементов в очереди. Взглянув на код потребителя, мы видимо, что есть ситуация, в которой несколько потребителей постепенно могут уменьшить items, прежде чем один из них захватит мьютекс и удалит переменную из буфера. По крайне мере, на некоторое время items была бы неточной.

Мы могли бы попытаться решить эту проблему по средствам проверки буфера внутри мьютекса.

1 mutex.wait()

2 items.wait()

3 event = buffer.get()

4 mutex.signal()

5 event.process()

Листинг. 7 Нарушенное решение потребителя

Однако, это решение плохое.

4. Deadlock #4

Если потребитель начнет выполнение кода, это может привести к блокировке.

1 mutex.wait()

2 items.wait()

3 event = buffer.get()

4 mutex.signal()

5

6 event.process()

Листинг. 8 Нарушенное решение потребителя

Представим себе, что буфер пуст. Потребитель приходит, захватывает мьютекс, а затем блокируется на items. Когда придет производитель, он заблокируется на мьютексе, что приведет к остановке системы.

Это общая ошибка в коде синхронизации: если всегда ожидать захвата семафора, когда удерживается мьютекс, то существует опасность наступления deadlock. При решении проблемы синхронизации нужно обязательно учитывать этот нюанс.

5. Производитель - потребитель с конечным буфером

В примере выше описан метод обработки событий потоками с использованием бесконечного буфера (размер ограничен только физическими ресурсами).

Однако в ядре операционной системы существует лимит на свободное место. Для таких вещей, как запросы к дискам, сетевые пакеты, буфер как правило имеет фиксированный размер. В подобных ситуациях у нас есть дополнительное ограничение синхронизации.

* Если производитель приходит, когда буфер полон, он блокируется, пока потребитель не удаляет элемент.

Предположим мы знаем размер буфера. Назовем его BufferSize. Теперь мы имеем семафор, который отслеживает число элементов. Это можно попробовать представить так:

1 if items >= bufferSize:

2 block()

Листинг. 9 Нарушенное решение с конечным буфером

Но мы не можем это сделать, так как мы не можем узнать текущее значение семафора, есть только операции сигнала и ожидания.

Задача: написать код, который обрабатывает ограничение конечного буфера потребителя - производителя.

6. Сигнал конечного буфера производителя потребителя

Добавим второй семафор, чтобы отслеживать число доступного места в буфер.

1 mutex = Semaphore(1)

2 items = Semaphore(0)

3 spaces = Semaphore(buffer.size())

Листинг. 10 Инициализация конечного буфера производителя - потребителя. Когда потребитель удаляет элемент, то он должен подать сигнал об освобождении памяти. Когда приходит производитель, он должен декрементировать свободное место, в этой точке возможна блокировка до следующего сигнала потребителя.

7. Конечный буфер производителя потребителя. Решение

Ниже приведено конечное решение:

1 items.wait()

2 mutex.wait()

3 event = buffer.get()

4 mutex.signal()

5 spaces.signal()

6

7 event.process()

Листинг. 11 Решение потребителя с конечным буфером

Код производителя симметричен:

1 event = waitForEvent()

2

3 spaces.wait()

4 mutex.wait()

5 buffer.add(event)

6 mutex.signal()

7 items.signal()

Листинг. 12 Решение производителя с конечным буфером

Для того, что бы избежать deadlock производителей и потребителей, нужно проверить наличие свободных мест до захвата мьютекса. Для лучшей производительности мьютекс освобождают до подачи сигнала.

8. ОСРВ RTX

Расширение IntervalZero RTX - программное средство, предназначенное для добавления функциональности "жёсткого" реального времени в системы под управлением операционных систем Microsoft Windows. Программный продукт RTX был с успехом опробован в тысячах различных автоматизированных систем управления, оборонных и аэрокосмических системах, контрольно-измерительной аппаратуре, роботах и т.д. Он позволил добиться повышения их эффективности, возможностей, степени масштабируемости и надёжности функционирования при одновременном сокращении сроков и стоимости разработки новой продукции.

9. Постановка задачи

Заправочную станцию обслуживает один заправщик (производитель) и заправляется один потребитель. В единицу времени можно либо сливать топливо, либо заправляться, размер бака потребителя и размер бака заправки конечны. Реализовать синхронизацию между потребителем и производителем.

10. Листинг программы

синхронизация производитель сигнал буфер

//////////////////////////////////////////////////////////////////

//

// RtxApp1.c - C file

//

//////////////////////////////////////////////////////////////////

#include "RtxApp1.h"

HANDLE mutex;

HANDLE items;

HANDLE spaces;

INT GasStation = 20;

INT lReleaseCount = 1;

INT Bubble;

INT BubbleM;

INT Plus = 3;

INT Minus = 4;

#define MSGSTR_SEM "Sem"

#define MUTEX_ENABLED

INT RTFCNDCL Thread1Cycle (PVOID unused)

{

while(1)

{

RtWaitForSingleObject(mutex, INFINITE);

{

Bubble = GasStation + Plus;

if (Bubble < 20)

{

GasStation += Plus;

RtPrintf("(PRODUCER)GIVE = %d ", Plus);

RtPrintf("(Fuel level = %d\n", GasStation);

}

if (Bubble > 20)

{

RtPrintf("(PRODUCER)want GIVE = %d ", Plus);

RtPrintf("Fuel level will be FULL, wait = %d\n", GasStation);

}

if (Bubble == 20)

{

GasStation += Plus;

RtPrintf("(PRODUCER)GIVE = %d ", Plus);

RtPrintf("Fuel level is FULL = %d\n", GasStation);

}

RtSleep(3000);

RtReleaseSemaphore(mutex, lReleaseCount, NULL);

}

}

return NO_ERRORS;

}

INT RTFCNDCL Thread2Cycle (PVOID unused)

{

while(1)

{

RtWaitForSingleObject(mutex, INFINITE);

{

srand(10);

Minus = rand()%10;

BubbleM = GasStation - Minus;

if (BubbleM > 0)

{

GasStation -= Minus;

RtPrintf("(CONSUMER)TAKE = %d ", Minus);

RtPrintf("Fuel level = %d\n", GasStation);

}

if (BubbleM < 0)

{

RtPrintf("(CONSUMER)want TAKE = %d ", Minus);

RtPrintf("(CONSUMER)Need more GAS, wait = %d\n", GasStation);

}

if (BubbleM == 0)

{

GasStation -= Minus;

RtPrintf("(CONSUMER)TAKE = %d ", Minus);

RtPrintf("Fuel level is EMPTY = %d\n", GasStation);

}

RtSleep(3000);

RtReleaseSemaphore(mutex, lReleaseCount, NULL);

}

}

return NO_ERRORS;

}

VOID _cdecl wmain(int argc, wchar_t **argv, wchar_t **envp)

{

// for periodic timer code

LARGE_INTEGER liPeriod; // timer period

LARGE_INTEGER time;

HANDLE hTimer = NULL; // timer handle

DWORD threadId;

DWORD dwStackSize = 0;

ULONG stackSize = 0;

ULONG sleepTime = 30000;

DWORD dwExitCode = 0;

RtPrintf("\n");

//

// Create the thread.

//

hThread1 = CreateThread(

NULL,

dwStackSize, //default

Thread1Cycle, //function

NULL, //parameters

CREATE_SUSPENDED,

&threadId

);

if(hThread1 == NULL)

{

RtPrintf("Error: could not create thread. GetLastError = %d\n", GetLastError());

return ERROR_OCCURED;

}

if(!RtSetThreadPriority( hThread1, RT_PRIORITY_MAX))

{

RtPrintf("Error: could not set thread priority. GetLastError = %d\n", GetLastError());

TerminateThread( hThread1, dwExitCode);

return ERROR_OCCURED;

}

//

// Create the CPU HOG thread.

//

hThread2 = CreateThread(

NULL,

dwStackSize, //default

Thread2Cycle, //function

NULL, //parameters

CREATE_SUSPENDED,

&threadId

);

if(hThread2 == NULL)

{

RtPrintf("Error: could not create thread. GetLastError = %d\n", GetLastError());

return ERROR_OCCURED;

}

if(!RtSetThreadPriority(hThread2, RT_PRIORITY_MIN))

{

RtPrintf("Error: could not set thread priority. GetLastError = %d\n", GetLastError());

TerminateThread( hThread2, dwExitCode);

return ERROR_OCCURED;

}

mutex = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);

items = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);

spaces = RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);

if (mutex==NULL)

{

RtPrintf("RtCreateSemaphore failed.");

}

if (items==NULL)

{

RtPrintf("RtCreateSemaphore failed.");

}

if (spaces==NULL)

{

RtPrintf("RtCreateSemaphore failed.");

}

RtReleaseSemaphore(mutex, lReleaseCount, NULL);

if(

(ResumeThread(hThread1) == RESUME_ERROR)

||

(ResumeThread(hThread2) == RESUME_ERROR)

)

{

RtPrintf("Error: could not resume thread. GetLastError = %d\n", GetLastError());

TerminateThread(hThread1, dwExitCode);

TerminateThread(hThread2, dwExitCode);

return ERROR_OCCURED;

}

RtSleep(150000);

//

// Stop thread.

if( !TerminateThread(hThread1, dwExitCode) || !TerminateThread(hThread2, dwExitCode))

{

RtPrintf("Error: could not terminate thread. GetLastError = %d\n", GetLastError());

return ERROR_OCCURED;

}

RtCloseHandle(mutex);

RtCloseHandle(items);

RtCloseHandle(spaces);

ExitProcess(0);

}

#include "RtxApp1.h"

//

// RTX periodic timer handler function

//

void RTFCNDCL TimerHandler(PVOID context)

{

PINT a = (PINT)context;

(*a)++;

}

//////////////////////////////////////////////////////////////////

//

// RtxApp1.h - header file

//

//////////////////////////////////////////////////////////////////

#include <windows.h>

#include <wchar.h>

#include <rtapi.h>

#include <time.h>

#include <stdio.h>

//#include <string.h>

//#include <ctype.h>

//#include <conio.h>

#include <stdlib.h>

//#include <math.h>

//#include <errno.h>

// Add DEFINES Here

//**** error codes ****//

#define NO_ERRORS

#define ERROR_OCCURED -1

#define RESUME_ERRORxFFFFFFFF

#define SUPPEND_ERRORxFFFFFFFF

HANDLE hThread1; // handle to the thread

HANDLE hThread2; // handle to the thread

// Add Function prototypes Here

// function prototype for periodic timer function

void

RTFCNDCL

TimerHandler(

void * nContext

);

// Interrupt handler prototype

void

RTFCNDCL

InterruptHandler(

void * nContext

);

11. Результаты работы

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 20

(CONSUMER)TAKE = 1 Fuel level = 19

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19

(CONSUMER)TAKE = 1 Fuel level = 18

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18

(CONSUMER)TAKE = 1 Fuel level = 17

(PRODUCER)GIVE = 3 Fuel level is FULL = 20

(CONSUMER)TAKE = 1 Fuel level = 19

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19

(CONSUMER)TAKE = 1 Fuel level = 18

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18

(CONSUMER)TAKE = 1 Fuel level = 17

Листинг. 13 Результат исполнения

Рис. 1 Результат работы

Список литературы

1. The Little Book of SEMAPHORES, Allen B. Downey, 2009

Размещено на Allbest.ru


Подобные документы

  • Функции программного интерфейса операционной системы Windows, предназначенные для работы с семафорами. Средства синхронизации Win32 АРI, основанные на использовании объектов исполнительной системы с дескрипторами. Проблемы при использовании семафоров.

    реферат [67,4 K], добавлен 06.10.2010

  • Разработка приложения, автоматизирующего процесс синхронизации файлов между сменным носителем и каталогом на другом диске. Классы для работы с файловой системой. Интерфейс программы и способы взаимодействия пользователя с ним. Создание новой синхропары.

    курсовая работа [632,0 K], добавлен 21.10.2015

  • Взаимодействие процессов и потоков в операционной системе, основные алгоритмы и механизмы синхронизации. Разработка школьного курса по изучению процессов в операционной системе Windows для 10-11 классов. Методические рекомендации по курсу для учителей.

    дипломная работа [3,2 M], добавлен 29.06.2012

  • Исследование принципа действия поэлементной синхронизации с добавлением и вычитанием импульсов. Характеристика кодирования в системах ПДС, классификации кодов, построения кодера и декодера циклического кода. Расчет параметров системы с ОС и ожиданием.

    курсовая работа [2,8 M], добавлен 08.12.2011

  • Классификация компьютерной памяти. Использование оперативной, статической и динамической оперативной памяти. Принцип работы DDR SDRAM. Форматирование магнитных дисков. Основная проблема синхронизации. Теория вычислительных процессов. Адресация памяти.

    курсовая работа [1,5 M], добавлен 28.05.2016

  • Общая характеристика потока как последовательности инструкций, которые считывает и исполняет процессор. Анализ преимуществ одно- и многопроцессорности. Особенности реализации потоков и технологий взаимосвязей с микроархитектурой. Средства синхронизации.

    курсовая работа [27,4 K], добавлен 18.05.2013

  • Поддержание целостности общих данных, используемые методы и приемы. Проблема критической секции и направления ее разрешения. Аппаратная поддержка синхронизации, классические проблемы и разрешение. Критические области. Синхронизация в Solaris и в Windows.

    презентация [1,5 M], добавлен 24.01.2014

  • Обзор операционных систем, обеспечивающих взаимную синхронизацию процессов и потоков. Понятие критической секции и критических данных, описание приема взаимного исключения. Использование блокирующих переменных и семафоров. Объекты-взаимоисключения.

    доклад [26,7 K], добавлен 27.12.2013

  • Устройство управления и синхронизации в структуре микропроцессора. Порядок синтеза конечного автомата (КА) для устройства управления ЭВМ. Алгоритм функционирования КА, заданный с помощью графа, функции переходов. Состояние триггеров в микросхеме.

    методичка [1019,0 K], добавлен 28.04.2009

  • Понятие репликации (синхронизации) базы данных, ее назначение и механизмы, классификация по различным признакам, выгоды от ее внедрения. Функциональные требования к серверу репликации. Основные принципы, правила построения и функционирования РБД.

    курсовая работа [29,2 K], добавлен 22.04.2011

Работы в архивах красиво оформлены согласно требованиям ВУЗов и содержат рисунки, диаграммы, формулы и т.д.
PPT, PPTX и PDF-файлы представлены только в архивах.
Рекомендуем скачать работу.