Интерфейс IMessageSink объявляет два метода, реализующие обработку соответственно синхронных и асинхронных вызовов. Это SyncProcessMessage И AsyncProcessMessage.
Таким образом, в зависимости от типа текущей работы (инкапсулированного в ней вызова), необходимо вызвать один из упомянутых вызовов на следующем перехватчике:
if (IsAsync()) {
_nextSink.AsyncProcessMessage(_reqMsg, _replySink);
}
else if (_nextSink!= null) {
_replyMsg = _nextSink.SyncProcessMessage(_reqMsg);
}
Вызов IsAsync возвращает true если текущая работа асинхронна. В этом случае на следующем перехватчике _nextSink вызывается AsyncProcessMessage и в качестве параметров передаются исходный вызов _reqMsg и ссылка на перехватчик для возвращения результата _replySink.
В случае синхронной работы вызывается SyncProcessMessage. Параметром является вызов, возвращаемое значение сохраняется в поле replyMsg.
Говоря про синхронные и асинхронные работы стоит напомнить, что в случае синхронной работы текущий поток блокируется до возврата из метода SyncProcessMessage, Т. е. до завершения прохождения вызова по всем остальным перехватчикам, собственно его выполнения и возвращения результата по той же цепочки перехватчиков. В случае асинхронной работы текущий поток не ожидает завершения вызова.
И, наконец, текущий поток должен вернуться в старый контекст и связаться со старым контекстом вызова:
CallContext.SetLogicalCallContext(oldCallCtx);
Thread.CurrentThread.ReturnToContext(ref frame);
Завершая обсуждение класса WorkItem осталось упомянуть свойство ReplyMessage, с помощью которого можно получить ответ, возвращенный для синхронного вызова:
internal virtual IMessage ReplyMessage {
get {return _replyMsg;}
}
Извлечение работы из очереди и ее выполнение
Напомним, что вызовы, перехваченные перехватчиком, ассоциированным со свойством синхронизации, инкапсулируются в работу (экземпляр класса WorkItem) и записываются в очередь работ (_workItemQueue). Эта очередь поддерживается свойством синхронизации, и мы уже говорили об ее инициализации как составной части процесса инициализации свойства синхронизации в целом. Тогда же говорилось о том, что в пуле рабочих потоков регистрируется делегат callBackDelegate типа WaitOrTimerCallBack, который будет вызываться и выполняться некоторым рабочим потоком всякий раз, когда будет установлено в состояние signaled событие _asyncWorkEvent.
Забегая вперед можно сказать, что это событие будет устанавливаться в состояние signaled всякий раз, когда можно будет начинать выполнение очередной работы из очереди работ, причем эта работа должна иметь асинхронный тип (инкапсулирует асинхронный вызов). В этом случае свободный или вновь сгенерированный рабочий поток начнет выполнять данную асинхронную работу путем вызова упомянутого делегата.
Делегат callBackDelegate содержит ссылку на функцию
DispatcherCallBack:
WaitOrTimerCallback callBackDelegate =
new WaitOrTimerCallback(this.DispatcherCallBack);
и именно эта функция будет выполняться в рабочем потоке, обрабатывая очередную асинхронную работу.
Кстати, эта же функция будет вызываться и в том случае, когда очередная работа имеет синхронный тип (инкапсулирует синхронный вызов). Правда, в этом случае она будет выполняться в потоке вызова, а не в произвольном рабочем потоке из пула потоков, как было при асинхронном вызове. Но об этом позже.
Код функции DispatcherCallBack представлен ниже:
private void DispatcherCallBack(Object statelgnored,
bool ignored) {
Workltem work;
lock (_workItemQueue) {
work = (Workltem) _workltemQueue.Dequeue();
}
ExecuteWorkltem(work);
HandleWorkCompletion();
}
В соответствии с определением типа WaitorTimerCallBack функция DispatcherCallBack имеет два параметра. Входной параметр типа Object (третий параметр при вызове ThreadPool.RegisterWaitForSingleObject. В нашем случае null) используется для задания подлежащей обработке информации. Выходной параметр типа bool принимает значение true в том случае, если вызов зарегистрированного в пуле рабочих потоков соответствующего делегата (в нашем случае callBackDelegate) произошел по причине истечения времени ожидания (в нашем случае это значение поля _timeOut). Если же вызов упомянутого делегата произошел в связи с тем, что зарегистрированное в этом же пуле событие типа AutoResetEvent (в нашем случае _asyncWorkEvent) перешло в состояние signaled, то второй параметр принимает значение false. В нашем случае оба эти параметра не учитываются.
Выполнение вызова DispatcherCallBack начинается с входа в критическую секцию, доступную при успешной блокировке объекта workItemQueue. В результате, при работе в данной критической секции с очередью работ, поддерживаемой текущим свойством синхронизации, никакой другой код не сможет заблокировать эту очередь и параллельно начать с ней работать.
Единственной операцией, выполняемой в данной критической секции, является извлечение из очереди работ очередной работы (с удалением из очереди). Здесь предполагается, что все необходимые для этого условия уже выполнены. О том, каковы эти условия, мы будем говорить позже.
После выхода из критической секции (освобождающего очередь работ для других потоков), вызываются последовательно ExecuteWorkltem(work) и HandleWorkCompletion ().
Реализация метода ExecuteWorkItem в рассматриваемом классе SynchronizationAttribute Очень проста:
internal void ExecuteWorkltem(Workitem work) {
work.Execute();
}
Тут все сводится к уже рассмотренному методу Execute класса WorkItem. Иными словами, исполняющий этот метод поток переходит в тот контекст, где должен будет выполняться вызов, с этим потоком связывается тот самый контекст вызова, который сопровождал этот вызов до его перехвата. После этого вызов передается следующему перехватчику, который будет его обрабатывать в зависимости от типа вызова (синхронный или асинхронный). В случае асинхронного вызова сразу же после этого происходит возврат потока в исходный контекст и восстанавливается его связь с исходным контекстом вызова. В случае синхронного вызова происходит тоже самое, но только после возврата ответа.
Вызов метода HandleWorkCompletion() должен выполнить некоторую подготовительную работу к выполнению следующей работы из очереди работ. Мы рассмотрим этот метод позже. Сейчас же следует начать с начала, т. е. рассмотреть весь процесс перехвата вызова и включения его в очередь (или, в некоторых случаях, исполнения без включения в очередь).
Перехват входящего вызова
Формирование перехватчика входящих вызовов
Класс SynchronizationAttribute реализует интерфейс IContributeServerContextSink. Благодаря этому факту, при формировании нового контекста синхронизации (в новом или в старом домене синхронизации) автоматически вызывается метод GetServerContextSink, объявленный в данном интерфейсе, для формирования перехватчика входящих вызовов для данного контекста. Ниже приводится код этого метода из Rotor:
public virtual IMessageSink GetserverContextsink {
IMessageSink nextSink) {