Литмир - Электронная Библиотека

Интерфейс IMessageSink объявляет два метода, реализующие обработку соответственно синхронных и асинхронных вызовов. Это SyncProcessMessage И AsyncProcessMessage.

Таким образом, в зависимости от типа текущей работы (инкапсулированного в ней вызова), необходимо вызвать один из упомянутых вызовов на следующем перехватчике:

if (IsAsync()) {

        _nextSink.AsyncProcessMessage(_reqMsg, _replySink);

}

else if (_nextSink!= null) {

       _replyMsg = _nextSink.SyncProcessMessage(_reqMsg);

}

Вызов IsAsync возвращает true если текущая работа асинхронна. В этом случае на следующем перехватчике _nextSink вызывается AsyncProcessMessage и в качестве параметров передаются исходный вызов _reqMsg и ссылка на перехватчик для возвращения результата _replySink.

В случае синхронной работы вызывается SyncProcessMessage. Параметром является вызов, возвращаемое значение сохраняется в поле replyMsg.

Говоря про синхронные и асинхронные работы стоит напомнить, что в случае синхронной работы текущий поток блокируется до возврата из метода SyncProcessMessage, Т. е. до завершения прохождения вызова по всем остальным перехватчикам, собственно его выполнения и возвращения результата по той же цепочки перехватчиков. В случае асинхронной работы текущий поток не ожидает завершения вызова.

И, наконец, текущий поток должен вернуться в старый контекст и связаться со старым контекстом вызова:

CallContext.SetLogicalCallContext(oldCallCtx);

Thread.CurrentThread.ReturnToContext(ref frame);

Завершая обсуждение класса WorkItem осталось упомянуть свойство ReplyMessage, с помощью которого можно получить ответ, возвращенный для синхронного вызова:

internal virtual IMessage ReplyMessage {

get {return _replyMsg;}

}

Извлечение работы из очереди и ее выполнение

Напомним, что вызовы, перехваченные перехватчиком, ассоциированным со свойством синхронизации, инкапсулируются в работу (экземпляр класса WorkItem) и записываются в очередь работ (_workItemQueue). Эта очередь поддерживается свойством синхронизации, и мы уже говорили об ее инициализации как составной части процесса инициализации свойства синхронизации в целом. Тогда же говорилось о том, что в пуле рабочих потоков регистрируется делегат callBackDelegate типа WaitOrTimerCallBack, который будет вызываться и выполняться некоторым рабочим потоком всякий раз, когда будет установлено в состояние signaled событие _asyncWorkEvent.

Забегая вперед можно сказать, что это событие будет устанавливаться в состояние signaled всякий раз, когда можно будет начинать выполнение очередной работы из очереди работ, причем эта работа должна иметь асинхронный тип (инкапсулирует асинхронный вызов). В этом случае свободный или вновь сгенерированный рабочий поток начнет выполнять данную асинхронную работу путем вызова упомянутого делегата.

Делегат callBackDelegate содержит ссылку на функцию

DispatcherCallBack:

    WaitOrTimerCallback callBackDelegate =

                  new WaitOrTimerCallback(this.DispatcherCallBack);

и именно эта функция будет выполняться в рабочем потоке, обрабатывая очередную асинхронную работу.

Кстати, эта же функция будет вызываться и в том случае, когда очередная работа имеет синхронный тип (инкапсулирует синхронный вызов). Правда, в этом случае она будет выполняться в потоке вызова, а не в произвольном рабочем потоке из пула потоков, как было при асинхронном вызове. Но об этом позже.

Код функции DispatcherCallBack представлен ниже:

private void DispatcherCallBack(Object statelgnored,

       bool ignored) {

       Workltem work;

       lock (_workItemQueue) {

            work = (Workltem) _workltemQueue.Dequeue();

       }

       ExecuteWorkltem(work);

       HandleWorkCompletion();

}

В соответствии с определением типа WaitorTimerCallBack функция DispatcherCallBack имеет два параметра. Входной параметр типа Object (третий параметр при вызове ThreadPool.RegisterWaitForSingleObject. В нашем случае null) используется для задания подлежащей обработке информации. Выходной параметр типа bool принимает значение true в том случае, если вызов зарегистрированного в пуле рабочих потоков соответствующего делегата (в нашем случае callBackDelegate) произошел по причине истечения времени ожидания (в нашем случае это значение поля _timeOut). Если же вызов упомянутого делегата произошел в связи с тем, что зарегистрированное в этом же пуле событие типа AutoResetEvent (в нашем случае _asyncWorkEvent) перешло в состояние signaled, то второй параметр принимает значение false. В нашем случае оба эти параметра не учитываются.

Выполнение вызова DispatcherCallBack начинается с входа в критическую секцию, доступную при успешной блокировке объекта workItemQueue. В результате, при работе в данной критической секции с очередью работ, поддерживаемой текущим свойством синхронизации, никакой другой код не сможет заблокировать эту очередь и параллельно начать с ней работать.

Единственной операцией, выполняемой в данной критической секции, является извлечение из очереди работ очередной работы (с удалением из очереди). Здесь предполагается, что все необходимые для этого условия уже выполнены. О том, каковы эти условия, мы будем говорить позже.

После выхода из критической секции (освобождающего очередь работ для других потоков), вызываются последовательно ExecuteWorkltem(work) и HandleWorkCompletion ().

Реализация метода ExecuteWorkItem в рассматриваемом классе SynchronizationAttribute Очень проста:

internal void ExecuteWorkltem(Workitem work) {

       work.Execute();

}

Тут все сводится к уже рассмотренному методу Execute класса WorkItem. Иными словами, исполняющий этот метод поток переходит в тот контекст, где должен будет выполняться вызов, с этим потоком связывается тот самый контекст вызова, который сопровождал этот вызов до его перехвата. После этого вызов передается следующему перехватчику, который будет его обрабатывать в зависимости от типа вызова (синхронный или асинхронный). В случае асинхронного вызова сразу же после этого происходит возврат потока в исходный контекст и восстанавливается его связь с исходным контекстом вызова. В случае синхронного вызова происходит тоже самое, но только после возврата ответа.

Вызов метода HandleWorkCompletion() должен выполнить некоторую подготовительную работу к выполнению следующей работы из очереди работ. Мы рассмотрим этот метод позже. Сейчас же следует начать с начала, т. е. рассмотреть весь процесс перехвата вызова и включения его в очередь (или, в некоторых случаях, исполнения без включения в очередь).

Перехват входящего вызова

Формирование перехватчика входящих вызовов

Класс SynchronizationAttribute реализует интерфейс IContributeServerContextSink. Благодаря этому факту, при формировании нового контекста синхронизации (в новом или в старом домене синхронизации) автоматически вызывается метод GetServerContextSink, объявленный в данном интерфейсе, для формирования перехватчика входящих вызовов для данного контекста. Ниже приводится код этого метода из Rotor:

public virtual IMessageSink GetserverContextsink {

         IMessageSink nextSink) {

232
{"b":"870522","o":1}