IGLib 1.4
The IGLib base library for development of numerical, technical and business applications.

IG::Num::ParallelJobDispatcherBase< JobContainerType > Class Template Reference

Parallel job dispatcher. Accepts job requests and dispatches jobs to parallel job servers when available and redy to run a job. More...

Inheritance diagram for IG::Num::ParallelJobDispatcherBase< JobContainerType >:
Collaboration diagram for IG::Num::ParallelJobDispatcherBase< JobContainerType >:

List of all members.

Public Member Functions

 ParallelJobDispatcherBase ()
void AddServer (ParallelJobServerBase< JobContainerType > server)
 Adds the specified server to the current dispatcher.
void RemoveServer (ParallelJobServerBase< JobContainerType > server)
 Removes the specified server to the current dispatcher.
void SetServersSleepTimeMs (int sleepTimeMs)
 Sets sleeping time in milliseconds on all servers assigned to the current parallel job dispatcher.
void SetServersOutputLevel (int outputLevel)
 Sets output level on all servers assigned to the current parallel job dispatcher.
void SetServersIsServer (bool isServer)
 Sets the server flag on all servers assigned to the current parallel job dispatcher.
bool SendJob (JobContainerType jobData)
 Enqueues job for execution, and returns a flag indicating whether a job has been started immediately.
void SendJob (JobContainerType jobData, out bool startedImmediately)
 Enqueues job for execution.
void WaitAllJobsCompleted ()
 Waits for job completion.
bool WaitAllJobsCompleted (double timeoutInSeconds)
 Wait until all jobs that weer sent to the current dispatcher object complete, or timeout occurs (timeout specified in seconds), and returns a flag indicating whether the jobs have actually completed (i.e. stop was not due to timeout).
void StopServer ()
 Sends to the server thread command that it has to stop.
bool StopServerWhenAllJobsDone ()
 Waits until all jobs are completed, and then stops the server.
bool StopServerWhenAllJobsDone (double timeoutInSeconds)
 Waits until all jobs are completed or timeout occurs, and then stops the server. Returns a flag indicating whether all jobs are actually completed before server was ordered to stop.
void KillDispatcherThread ()
 Forces the working thread to stop. To let the working thread finish its current jobs and then stop, call StopServer
void KillServerThreads ()
 Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer
void KillThreads ()
 Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer
void RegisterSystemPriorityUpdating ()
 Registers the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes. After registration, this method will be called every time the value of the UtilSystem.ThreadPriority property changes.
void UnregisterSystemPriorityUpdating ()
 Unregisters the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes.
void StartServer ()
 Starts the queue server.
void NotifyJobStarted (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server.
void NotifyJobFinished (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server.
void NotifyJobAborted (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server.
void NotifyServerIdle (ParallelJobServerBase< JobContainerType > server)
 Notifies the current parallel job dispatcher that the specified server has become idle. This increases the number of idle servers and puts the server to the list of idle serves.When an idle server is requested, it is still checked on the server whether it is actually idle or not.
override string ToString ()
 Returns a string representation of the current job dispatcher, which contains relevent data about the server state.

Protected Member Functions

void AddIdleServer (ParallelJobServerBase< JobContainerType > server)
 Adds the specified parallel job server to the idle list.
void RemoveIdleServer (ParallelJobServerBase< JobContainerType > server)
 Removes the specified parallel job server from the idle list. If the specified server is not on the list then nothing happens.
ParallelJobServerBase
< JobContainerType > 
GetFirstIdleServer ()
 Returns the first idle server (last on the list of idle servers) and removes it from the idle servers list, or returns null if there are no idle servers.
void EnqueueJob (JobContainerType jobData)
 Adds the specified job container to the execution queue.
JobContainerType DequeueJob ()
 Removes the last job from the execution queue and returns it, or returns null if there are no jobs on the queue.
virtual void UpdateThreadPriorityFromSystem ()
 Updates thread priority (property ThreadPriority) to the current global thread priority (the UtilSystem.ThreadPriority property).
void Serve ()
 Method executed in the queue server thread. Excecutes eventual enqueued jobs as job servers become idle.

Protected Attributes

readonly List
< ParallelJobServerBase
< JobContainerType > > 
_jobServers
 List of job servers contained by the current dispatcher.
readonly List
< ParallelJobServerBase
< JobContainerType > > 
_idleJobServers
 List of idle job servers dispatched by teh current dispatcher.
Queue< JobContainerType > _jobQueue = new Queue<JobContainerType>()
 Queue of jobs that could not be immediately served, scheduled for later execution.
bool _commandStopServing = false
bool _systemPriorityUpdatesRegistered = false
 Whether the "event" handler for system priprity changes has already been registered.
ThreadPriority _threadPriority = UtilSystem.ThreadPriority
Thread _workingThread
bool _isServerRunning = false

Properties

int NumEnqueuedJobs [get]
 Gets the number of enqueued jobs.
int NumJobServers [get]
 number of job cervers that are currently asigned to hte dispatcher.
int NumActiveJobServers [get]
 Gets number of active job servers assigned to the dispatcher. These are servers capable of serving jobs, but which may be busy at the moment.
int NumExecutingJobs [get]
 Gets number of currently executing jobs. The number is obtained by counting assigned job servers that are in the state of job execution.
bool CommandStopServing [get, set]
 Flag indicating whether the server should be stopped. If set to true and server thread is runing, then the server thread stops when the currently run job completes (or stops immediately if there is no job running).
ThreadPriority ThreadPriority [get, set]
 Priority of the dispatcher and contained server threads. Setting priority changes priority of the dispatcher thread and all server threads.When threads are created, they will adopt the priority.

Detailed Description

template<JobContainerType>
class IG::Num::ParallelJobDispatcherBase< JobContainerType >

Parallel job dispatcher. Accepts job requests and dispatches jobs to parallel job servers when available and redy to run a job.

Template Parameters:
JobContainerTypeType of the container that holds data for the job (input and output).

This dispatcher object contains a list of parallel server objects (ParallelJobServerBase and its subclasses), and sends requested jobs to these objects.

If a job is requested and no servers are in idle state, the job is enqueued and waits for execution until a server becomes available.

Notes for developers:

Dispatcher should be steer synchronization with servers.

In order to avoid deadlocks, server may not call any dispatcher's code its own lock! All dispatcher's code must be called out out of code blocks locked on server's lock.

History note:

This implementation has been transfered in June 2012 from the IOptLib.Net library and adapted to current needs in IGLib. Some features were omitted and some implementatinon details changed. Implementations will be unified in the future, therefore the class may be subject to larger changes. What is currently exposed should remain more or less the same, so users of the class should not be affected.

$A Igor Aug08 Jun12;

Type Constraints
JobContainerType :ParallelJobContainerBase 

Member Function Documentation

template<JobContainerType >
IG::Num::ParallelJobDispatcherBase< JobContainerType >::ParallelJobDispatcherBase ( ) [inline]
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::AddServer ( ParallelJobServerBase< JobContainerType >  server) [inline]

Adds the specified server to the current dispatcher.

Parameters:
serverServer to be added.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::RemoveServer ( ParallelJobServerBase< JobContainerType >  server) [inline]

Removes the specified server to the current dispatcher.

Parameters:
serverServer to be removed.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::AddIdleServer ( ParallelJobServerBase< JobContainerType >  server) [inline, protected]

Adds the specified parallel job server to the idle list.

Parameters:
serverServer to be added to the list.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::RemoveIdleServer ( ParallelJobServerBase< JobContainerType >  server) [inline, protected]

Removes the specified parallel job server from the idle list. If the specified server is not on the list then nothing happens.

Parameters:
serverServer to be removed from the list.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::SetServersSleepTimeMs ( int  sleepTimeMs) [inline]

Sets sleeping time in milliseconds on all servers assigned to the current parallel job dispatcher.

Parameters:
sleepTimeMsThe sleeping time in milliseconds to be set on servers.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::SetServersOutputLevel ( int  outputLevel) [inline]

Sets output level on all servers assigned to the current parallel job dispatcher.

Parameters:
outputLevelThe output level to be set on servers.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::SetServersIsServer ( bool  isServer) [inline]

Sets the server flag on all servers assigned to the current parallel job dispatcher.

Parameters:
isServerThe server flag value to be set on servers.

Specified whether the server acts as server (true) or runs every job in a new thread.

template<JobContainerType >
ParallelJobServerBase<JobContainerType> IG::Num::ParallelJobDispatcherBase< JobContainerType >::GetFirstIdleServer ( ) [inline, protected]

Returns the first idle server (last on the list of idle servers) and removes it from the idle servers list, or returns null if there are no idle servers.

Returns:
The first idle job server available.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::EnqueueJob ( JobContainerType  jobData) [inline, protected]

Adds the specified job container to the execution queue.

Parameters:
jobDataJob container of the job that is enqueued.
template<JobContainerType >
JobContainerType IG::Num::ParallelJobDispatcherBase< JobContainerType >::DequeueJob ( ) [inline, protected]

Removes the last job from the execution queue and returns it, or returns null if there are no jobs on the queue.

Parameters:
jobDataJob container of the job that is removed from the queue, or null if there are no jobs in the queue.
template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::SendJob ( JobContainerType  jobData) [inline]

Enqueues job for execution, and returns a flag indicating whether a job has been started immediately.

Parameters:
jobDataJob data.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::SendJob ( JobContainerType  jobData,
out bool  startedImmediately 
) [inline]

Enqueues job for execution.

Parameters:
jobDataJob data.
startedImmediatelyOutput flag, set to true if the job has been started by some server immediately, false othwrwise.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::WaitAllJobsCompleted ( ) [inline]

Waits for job completion.

template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::WaitAllJobsCompleted ( double  timeoutInSeconds) [inline]

Wait until all jobs that weer sent to the current dispatcher object complete, or timeout occurs (timeout specified in seconds), and returns a flag indicating whether the jobs have actually completed (i.e. stop was not due to timeout).

Parameters:
timeoutInSecondsTimeot in seconds. Less or equal to 0 means no timeout (waiting for condition fulfilled indefinitely).
Returns:
A flag indicating whether the job was actually completed when the function returned. If false is returned then timeout occurred.
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::StopServer ( ) [inline]

Sends to the server thread command that it has to stop.

If new jobs are sent after the server has been stopped, the server is restarted automatically.

template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::StopServerWhenAllJobsDone ( ) [inline]

Waits until all jobs are completed, and then stops the server.

If new jobs are sent after the server has been stopped, the server is restarted automatically.

template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::StopServerWhenAllJobsDone ( double  timeoutInSeconds) [inline]

Waits until all jobs are completed or timeout occurs, and then stops the server. Returns a flag indicating whether all jobs are actually completed before server was ordered to stop.

Parameters:
timeoutInSecondsTimeot in seconds. Less or equal to 0 means no timeout defined (waiting for condition fulfilled indefinitely).

If new jobs are sent after the server has been stopped, the server is restarted automatically.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::KillDispatcherThread ( ) [inline]

Forces the working thread to stop. To let the working thread finish its current jobs and then stop, call StopServer

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::KillServerThreads ( ) [inline]

Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::KillThreads ( ) [inline]

Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer

template<JobContainerType >
virtual void IG::Num::ParallelJobDispatcherBase< JobContainerType >::UpdateThreadPriorityFromSystem ( ) [inline, protected, virtual]

Updates thread priority (property ThreadPriority) to the current global thread priority (the UtilSystem.ThreadPriority property).

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::RegisterSystemPriorityUpdating ( ) [inline]

Registers the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes. After registration, this method will be called every time the value of the UtilSystem.ThreadPriority property changes.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::UnregisterSystemPriorityUpdating ( ) [inline]

Unregisters the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes.

See also:
RegisterSystemPriorityUpdating
template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::Serve ( ) [inline, protected]

Method executed in the queue server thread. Excecutes eventual enqueued jobs as job servers become idle.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::StartServer ( ) [inline]

Starts the queue server.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::NotifyJobStarted ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
) [inline]

Notifies the current dispatched that the specified job has started on the specified server.

Parameters:
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::NotifyJobFinished ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
) [inline]

Notifies the current dispatched that the specified job has started on the specified server.

Parameters:
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::NotifyJobAborted ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
) [inline]

Notifies the current dispatched that the specified job has started on the specified server.

Parameters:
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

template<JobContainerType >
void IG::Num::ParallelJobDispatcherBase< JobContainerType >::NotifyServerIdle ( ParallelJobServerBase< JobContainerType >  server) [inline]

Notifies the current parallel job dispatcher that the specified server has become idle. This increases the number of idle servers and puts the server to the list of idle serves.When an idle server is requested, it is still checked on the server whether it is actually idle or not.

Parameters:
serverServer that has become idle.
template<JobContainerType >
override string IG::Num::ParallelJobDispatcherBase< JobContainerType >::ToString ( ) [inline]

Returns a string representation of the current job dispatcher, which contains relevent data about the server state.


Member Data Documentation

template<JobContainerType >
readonly List<ParallelJobServerBase<JobContainerType> > IG::Num::ParallelJobDispatcherBase< JobContainerType >::_jobServers [protected]
Initial value:
  
            new List<ParallelJobServerBase<JobContainerType>>()

List of job servers contained by the current dispatcher.

template<JobContainerType >
readonly List<ParallelJobServerBase<JobContainerType> > IG::Num::ParallelJobDispatcherBase< JobContainerType >::_idleJobServers [protected]
Initial value:
 
            new List<ParallelJobServerBase<JobContainerType>>()

List of idle job servers dispatched by teh current dispatcher.

template<JobContainerType >
Queue<JobContainerType> IG::Num::ParallelJobDispatcherBase< JobContainerType >::_jobQueue = new Queue<JobContainerType>() [protected]

Queue of jobs that could not be immediately served, scheduled for later execution.

template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::_commandStopServing = false [protected]
template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::_systemPriorityUpdatesRegistered = false [protected]

Whether the "event" handler for system priprity changes has already been registered.

template<JobContainerType >
ThreadPriority IG::Num::ParallelJobDispatcherBase< JobContainerType >::_threadPriority = UtilSystem.ThreadPriority [protected]
template<JobContainerType >
Thread IG::Num::ParallelJobDispatcherBase< JobContainerType >::_workingThread [protected]
template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::_isServerRunning = false [protected]

Property Documentation

template<JobContainerType >
int IG::Num::ParallelJobDispatcherBase< JobContainerType >::NumEnqueuedJobs [get]

Gets the number of enqueued jobs.

template<JobContainerType >
int IG::Num::ParallelJobDispatcherBase< JobContainerType >::NumJobServers [get]

number of job cervers that are currently asigned to hte dispatcher.

template<JobContainerType >
int IG::Num::ParallelJobDispatcherBase< JobContainerType >::NumActiveJobServers [get]

Gets number of active job servers assigned to the dispatcher. These are servers capable of serving jobs, but which may be busy at the moment.

template<JobContainerType >
int IG::Num::ParallelJobDispatcherBase< JobContainerType >::NumExecutingJobs [get]

Gets number of currently executing jobs. The number is obtained by counting assigned job servers that are in the state of job execution.

template<JobContainerType >
bool IG::Num::ParallelJobDispatcherBase< JobContainerType >::CommandStopServing [get, set]

Flag indicating whether the server should be stopped. If set to true and server thread is runing, then the server thread stops when the currently run job completes (or stops immediately if there is no job running).

template<JobContainerType >
ThreadPriority IG::Num::ParallelJobDispatcherBase< JobContainerType >::ThreadPriority [get, set]

Priority of the dispatcher and contained server threads. Setting priority changes priority of the dispatcher thread and all server threads.When threads are created, they will adopt the priority.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Enumerations Properties Events