IGLib  1.5
The IGLib base library for development of numerical, technical and business applications.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events
IG.Num.ParallelJobDispatcherBase< JobContainerType > Class Template Reference

Parallel job dispatcher. Accepts job requests and dispatches jobs to parallel job servers when available and redy to run a job. More...

+ Inheritance diagram for IG.Num.ParallelJobDispatcherBase< JobContainerType >:
+ Collaboration diagram for IG.Num.ParallelJobDispatcherBase< JobContainerType >:

Public Member Functions

 ParallelJobDispatcherBase ()
 
void AddServer (ParallelJobServerBase< JobContainerType > server)
 Adds the specified server to the current dispatcher. More...
 
void RemoveServer (ParallelJobServerBase< JobContainerType > server)
 Removes the specified server to the current dispatcher. More...
 
void SetServersSleepTimeMs (int sleepTimeMs)
 Sets sleeping time in milliseconds on all servers assigned to the current parallel job dispatcher. More...
 
void SetServersOutputLevel (int outputLevel)
 Sets output level on all servers assigned to the current parallel job dispatcher. More...
 
void SetServersIsServer (bool isServer)
 Sets the server flag on all servers assigned to the current parallel job dispatcher. More...
 
bool SendJob (JobContainerType jobData)
 Enqueues job for execution, and returns a flag indicating whether a job has been started immediately. More...
 
void SendJob (JobContainerType jobData, out bool startedImmediately)
 Enqueues job for execution. More...
 
void WaitAllJobsCompleted ()
 Waits for job completion. More...
 
bool WaitAllJobsCompleted (double timeoutInSeconds)
 Wait until all jobs that weer sent to the current dispatcher object complete, or timeout occurs (timeout specified in seconds), and returns a flag indicating whether the jobs have actually completed (i.e. stop was not due to timeout). More...
 
void StopServer ()
 Sends to the server thread command that it has to stop. More...
 
bool StopServerWhenAllJobsDone ()
 Waits until all jobs are completed, and then stops the server. More...
 
bool StopServerWhenAllJobsDone (double timeoutInSeconds)
 Waits until all jobs are completed or timeout occurs, and then stops the server. Returns a flag indicating whether all jobs are actually completed before server was ordered to stop. More...
 
void KillDispatcherThread ()
 Forces the working thread to stop. To let the working thread finish its current jobs and then stop, call StopServer More...
 
void KillServerThreads ()
 Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer More...
 
void KillThreads ()
 Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer More...
 
void RegisterSystemPriorityUpdating ()
 Registers the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes. After registration, this method will be called every time the value of the UtilSystem.ThreadPriority property changes. More...
 
void UnregisterSystemPriorityUpdating ()
 Unregisters the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes. More...
 
void StartServer ()
 Starts the queue server. More...
 
void NotifyJobStarted (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server. More...
 
void NotifyJobFinished (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server. More...
 
void NotifyJobAborted (ParallelJobServerBase< JobContainerType > server, JobContainerType job)
 Notifies the current dispatched that the specified job has started on the specified server. More...
 
void NotifyServerIdle (ParallelJobServerBase< JobContainerType > server)
 Notifies the current parallel job dispatcher that the specified server has become idle. More...
 
override string ToString ()
 Returns a string representation of the current job dispatcher, which contains relevent data about the server state. More...
 
- Public Member Functions inherited from IG.Num.ParallelJobDispatcherBase
void IncrementNumSentJobs ()
 Increments by one the number of sent jobs (all jobs sent to the current dispatcher for execution). More...
 
void IncrementNumStartedJobs ()
 Increments by one the number of started by the dispatcher up to this point. More...
 
void IncrementNumFinishedJobs ()
 Increments by one the number of finished jobs (of those handled by the current dispatcher) up to this point. More...
 
void IncrementNumAbortedJobs ()
 Increments by one the number of aborted jobs (of those handled by the current dispatcher) up to this point. More...
 

Protected Member Functions

void AddIdleServer (ParallelJobServerBase< JobContainerType > server)
 Adds the specified parallel job server to the idle list. More...
 
void RemoveIdleServer (ParallelJobServerBase< JobContainerType > server)
 Removes the specified parallel job server from the idle list. More...
 
ParallelJobServerBase
< JobContainerType > 
GetFirstIdleServer ()
 Returns the first idle server (last on the list of idle servers) and removes it from the idle servers list, or returns null if there are no idle servers. More...
 
void EnqueueJob (JobContainerType jobData)
 Adds the specified job container to the execution queue. More...
 
JobContainerType DequeueJob ()
 Removes the last job from the execution queue and returns it, or returns null if there are no jobs on the queue. More...
 
virtual void UpdateThreadPriorityFromSystem ()
 Updates thread priority (property ThreadPriority) to the current global thread priority (the UtilSystem.ThreadPriority property). More...
 
void Serve ()
 Method executed in the queue server thread. Excecutes eventual enqueued jobs as job servers become idle. More...
 
- Protected Member Functions inherited from IG.Num.ParallelJobDispatcherBase
void ResetNumIdleJobServers ()
 Resets number of idle job servers to 0. More...
 

Protected Attributes

readonly List
< ParallelJobServerBase
< JobContainerType > > 
_jobServers
 List of job servers contained by the current dispatcher. More...
 
readonly List
< ParallelJobServerBase
< JobContainerType > > 
_idleJobServers
 List of idle job servers dispatched by teh current dispatcher. More...
 
Queue< JobContainerType > _jobQueue = new Queue<JobContainerType>()
 Queue of jobs that could not be immediately served, scheduled for later execution. More...
 
bool _commandStopServing = false
 
bool _systemPriorityUpdatesRegistered = false
 Whether the "event" handler for system priprity changes has already been registered. More...
 
ThreadPriority _threadPriority = UtilSystem.ThreadPriority
 
Thread _workingThread
 
bool _isServerRunning = false
 
- Protected Attributes inherited from IG.Num.ParallelJobDispatcherBase
readonly object ServersLock = new object()
 Lock that is usef for locking code that can be run from servers. More...
 
volatile int _outputLevel = ParallelJobContainerBase.DefaultOutputLevel
 Output level for objects of this class. More...
 
bool _isTestMode = ParallelJobContainerBase.DefaultIsTestMode
 
int _sleepTimeMs = DefaultSleepTimeMs
 
int _numSentJobs = 0
 

Properties

int NumEnqueuedJobs [get]
 Gets the number of enqueued jobs. More...
 
int NumJobServers [get]
 number of job cervers that are currently asigned to hte dispatcher. More...
 
int NumActiveJobServers [get]
 Gets number of active job servers assigned to the dispatcher. More...
 
int NumExecutingJobs [get]
 Gets number of currently executing jobs. More...
 
bool CommandStopServing [get, protected set]
 Flag indicating whether the server should be stopped. If set to true and server thread is runing, then the server thread stops when the currently run job completes (or stops immediately if there is no job running). More...
 
ThreadPriority ThreadPriority [get, set]
 Priority of the dispatcher and contained server threads. More...
 
- Properties inherited from IG.Num.ParallelJobDispatcherBase
object Lock [get]
 This object's central lock object to be used by other object. Do not use this object for locking in class' methods, for this you should use InternalLock. More...
 
static int DefaultOutputLevel [get, set]
 Default output level for objects of this and derived types. More...
 
int OutputLevel [get, set]
 Output level the current object. More...
 
static bool DefaultIsTestMode [get, set]
 Default value of test mode flag. More...
 
bool IsTestMode [get, set]
 Whether the current job data conntainer is in test mode. In this mode, delays specified by internal variables are automatically added in job execution. More...
 
int Id [get]
 Unique ID for objects of the currnet and derived classes. More...
 
static int DefaultSleepTimeMs [get, set]
 Default sleeping time, in milliseconds, used by parallel job data objects when waiting for fulfillment of some condition in a loop that includes sleeping when condition is not met. More...
 
int SleepTimeMs [get, set]
 Sleeping time, in milliseconds, used by the current object when waiting for fulfillment of some condition in a loop that includes sleeping when condition is not met. More...
 
int NumIdleJobServers [get]
 Gets the number of idle job servers that are currently available on the dispatcher. More...
 
int NumSentJobs [get]
 Gets the number of sent jobs (all jobs sent to the current dispatcher for execution). More...
 
int NumStartedJobs [get]
 Gets the number of jobs started by the dispatcher up to this point. More...
 
int NumFinishedJobs [get]
 Gets the number of finished jobs (of those handled by the current dispatcher) up to this point. More...
 
int NumAbortedJobs [get]
 Gets the number of aborted jobs (of those handled by the current dispatcher) up to this point. More...
 
int NumUncompletedJobs [get]
 Gets the number of idle job runners that are currently available on the dispatcher. More...
 
- Properties inherited from IG.Lib.ILockable
object Lock [get]
 
- Properties inherited from IG.Lib.IIdentifiable
int Id [get]
 Returns unique ID (in the scope of a given type) of the current object. More...
 

Additional Inherited Members

- Static Protected Member Functions inherited from IG.Num.ParallelJobDispatcherBase
static int GetNextId ()
 Returns another ID that is unique for objects of the containing class its and derived classes. More...
 

Detailed Description

Parallel job dispatcher. Accepts job requests and dispatches jobs to parallel job servers when available and redy to run a job.

Template Parameters
JobContainerTypeType of the container that holds data for the job (input and output).

This dispatcher object contains a list of parallel server objects (ParallelJobServerBase and its subclasses), and sends requested jobs to these objects.

If a job is requested and no servers are in idle state, the job is enqueued and waits for execution until a server becomes available.

Notes for developers:

Dispatcher should be steer synchronization with servers.

In order to avoid deadlocks, server may not call any dispatcher's code its own lock! All dispatcher's code must be called out out of code blocks locked on server's lock.

History note:

This implementation has been transfered in June 2012 from the IOptLib.Net library and adapted to current needs in IGLib. Some features were omitted and some implementatinon details changed. Implementations will be unified in the future, therefore the class may be subject to larger changes. What is currently exposed should remain more or less the same, so users of the class should not be affected.

$A Igor Aug08 Jun12;

Type Constraints
JobContainerType :ParallelJobContainerBase 

Constructor & Destructor Documentation

Member Function Documentation

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.AddServer ( ParallelJobServerBase< JobContainerType >  server)
inline
void IG.Num.ParallelJobDispatcherBase< JobContainerType >.RemoveServer ( ParallelJobServerBase< JobContainerType >  server)
inline

Removes the specified server to the current dispatcher.

Parameters
serverServer to be removed.

References IG.Num.ParallelJobServerBase< JobContainerType >.Dispatcher, IG.Num.ParallelJobServerBase< JobContainerType >.Id, and IG.Num.ParallelJobServerBase< JobContainerType >.Lock.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.AddIdleServer ( ParallelJobServerBase< JobContainerType >  server)
inlineprotected

Adds the specified parallel job server to the idle list.

Parameters
serverServer to be added to the list.
void IG.Num.ParallelJobDispatcherBase< JobContainerType >.RemoveIdleServer ( ParallelJobServerBase< JobContainerType >  server)
inlineprotected

Removes the specified parallel job server from the idle list.

If the specified server is not on the list then nothing happens.

Parameters
serverServer to be removed from the list.
void IG.Num.ParallelJobDispatcherBase< JobContainerType >.SetServersSleepTimeMs ( int  sleepTimeMs)
inline

Sets sleeping time in milliseconds on all servers assigned to the current parallel job dispatcher.

Parameters
sleepTimeMsThe sleeping time in milliseconds to be set on servers.

References IG.Num.ParallelJobServerBase< JobContainerType >.SleepTimeMs.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.SetServersOutputLevel ( int  outputLevel)
inline

Sets output level on all servers assigned to the current parallel job dispatcher.

Parameters
outputLevelThe output level to be set on servers.

References IG.Num.ParallelJobServerBase< JobContainerType >.OutputLevel.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.SetServersIsServer ( bool  isServer)
inline

Sets the server flag on all servers assigned to the current parallel job dispatcher.

Parameters
isServerThe server flag value to be set on servers.

Specified whether the server acts as server (true) or runs every job in a new thread.

References IG.Num.ParallelJobServerBase< JobContainerType >.IsServer.

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

ParallelJobServerBase<JobContainerType> IG.Num.ParallelJobDispatcherBase< JobContainerType >.GetFirstIdleServer ( )
inlineprotected

Returns the first idle server (last on the list of idle servers) and removes it from the idle servers list, or returns null if there are no idle servers.

Returns
The first idle job server available.

References IG.Num.ParallelJobServerBase< JobContainerType >.Id, and IG.Num.ParallelJobServerBase< JobContainerType >.IsIdle.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.EnqueueJob ( JobContainerType  jobData)
inlineprotected

Adds the specified job container to the execution queue.

Parameters
jobDataJob container of the job that is enqueued.
JobContainerType IG.Num.ParallelJobDispatcherBase< JobContainerType >.DequeueJob ( )
inlineprotected

Removes the last job from the execution queue and returns it, or returns null if there are no jobs on the queue.

Parameters
jobDataJob container of the job that is removed from the queue, or null if there are no jobs in the queue.
bool IG.Num.ParallelJobDispatcherBase< JobContainerType >.SendJob ( JobContainerType  jobData)
inline

Enqueues job for execution, and returns a flag indicating whether a job has been started immediately.

Parameters
jobDataJob data.

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.SendJob ( JobContainerType  jobData,
out bool  startedImmediately 
)
inline

Enqueues job for execution.

Parameters
jobDataJob data.
startedImmediatelyOutput flag, set to true if the job has been started by some server immediately, false othwrwise.
void IG.Num.ParallelJobDispatcherBase< JobContainerType >.WaitAllJobsCompleted ( )
inline
bool IG.Num.ParallelJobDispatcherBase< JobContainerType >.WaitAllJobsCompleted ( double  timeoutInSeconds)
inline

Wait until all jobs that weer sent to the current dispatcher object complete, or timeout occurs (timeout specified in seconds), and returns a flag indicating whether the jobs have actually completed (i.e. stop was not due to timeout).

Parameters
timeoutInSecondsTimeot in seconds. Less or equal to 0 means no timeout (waiting for condition fulfilled indefinitely).
Returns
A flag indicating whether the job was actually completed when the function returned. If false is returned then timeout occurred.
void IG.Num.ParallelJobDispatcherBase< JobContainerType >.StopServer ( )
inline

Sends to the server thread command that it has to stop.

If new jobs are sent after the server has been stopped, the server is restarted automatically.

bool IG.Num.ParallelJobDispatcherBase< JobContainerType >.StopServerWhenAllJobsDone ( )
inline

Waits until all jobs are completed, and then stops the server.

If new jobs are sent after the server has been stopped, the server is restarted automatically.

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

bool IG.Num.ParallelJobDispatcherBase< JobContainerType >.StopServerWhenAllJobsDone ( double  timeoutInSeconds)
inline

Waits until all jobs are completed or timeout occurs, and then stops the server. Returns a flag indicating whether all jobs are actually completed before server was ordered to stop.

Parameters
timeoutInSecondsTimeot in seconds. Less or equal to 0 means no timeout defined (waiting for condition fulfilled indefinitely).

If new jobs are sent after the server has been stopped, the server is restarted automatically.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.KillDispatcherThread ( )
inline

Forces the working thread to stop. To let the working thread finish its current jobs and then stop, call StopServer

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.KillServerThreads ( )
inline

Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer

References IG.Num.ParallelJobServerBase< JobContainerType >.KillServerThread().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.KillThreads ( )
inline

Forces all the server threads to stop, even if in the middle of eecution of a job. To let the working thread finish its current jobs and then stop, call StopServer

virtual void IG.Num.ParallelJobDispatcherBase< JobContainerType >.UpdateThreadPriorityFromSystem ( )
inlineprotectedvirtual

Updates thread priority (property ThreadPriority) to the current global thread priority (the UtilSystem.ThreadPriority property).

References IG.Lib.UtilSystem.ThreadPriority.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.RegisterSystemPriorityUpdating ( )
inline

Registers the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes. After registration, this method will be called every time the value of the UtilSystem.ThreadPriority property changes.

References IG.Lib.UtilSystem.AddOnThreadPriorityChange().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.UnregisterSystemPriorityUpdating ( )
inline

Unregisters the UpdateThreadPriorityFromSystem method as "event handler" for system priority changes.

See also
RegisterSystemPriorityUpdating

References IG.Lib.UtilSystem.RemoveOnThreadPriorityChange().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.Serve ( )
inlineprotected

Method executed in the queue server thread. Excecutes eventual enqueued jobs as job servers become idle.

References IG.Num.ParallelJobServerBase< JobContainerType >.Id, and IG.Num.ParallelJobServerBase< JobContainerType >.StartJob().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.StartServer ( )
inline

Starts the queue server.

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.NotifyJobStarted ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
)
inline

Notifies the current dispatched that the specified job has started on the specified server.

Parameters
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

Referenced by IG.Num.ParallelJobServerBase< JobContainerType >.NotifyJobStarted().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.NotifyJobFinished ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
)
inline

Notifies the current dispatched that the specified job has started on the specified server.

Parameters
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

Referenced by IG.Num.ParallelJobServerBase< JobContainerType >.NotifyJobFinished().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.NotifyJobAborted ( ParallelJobServerBase< JobContainerType >  server,
JobContainerType  job 
)
inline

Notifies the current dispatched that the specified job has started on the specified server.

Parameters
serverServer to which the job is assigned.
jobThe job in question.

This functionality is currently not implemented and is prepared in advanced e.g. for the ability of pulsing.

Referenced by IG.Num.ParallelJobServerBase< JobContainerType >.NotifyJobAborted().

void IG.Num.ParallelJobDispatcherBase< JobContainerType >.NotifyServerIdle ( ParallelJobServerBase< JobContainerType >  server)
inline

Notifies the current parallel job dispatcher that the specified server has become idle.

This increases the number of idle servers and puts the server to the list of idle serves.

When an idle server is requested, it is still checked on the server whether it is actually idle or not.

Parameters
serverServer that has become idle.

References IG.Num.ParallelJobServerBase< JobContainerType >.Id.

Referenced by IG.Num.ParallelJobServerBase< JobContainerType >.NotifyServerIdle().

override string IG.Num.ParallelJobDispatcherBase< JobContainerType >.ToString ( )
inline

Returns a string representation of the current job dispatcher, which contains relevent data about the server state.

References IG.Num.ParallelJobServerBase< JobContainerType >.Id, and IG.Num.ParallelJobServerBase< JobContainerType >.ToString().

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

Member Data Documentation

readonly List<ParallelJobServerBase<JobContainerType> > IG.Num.ParallelJobDispatcherBase< JobContainerType >._jobServers
protected
Initial value:
=
new List<ParallelJobServerBase<JobContainerType>>()

List of job servers contained by the current dispatcher.

readonly List<ParallelJobServerBase<JobContainerType> > IG.Num.ParallelJobDispatcherBase< JobContainerType >._idleJobServers
protected
Initial value:
=
new List<ParallelJobServerBase<JobContainerType>>()

List of idle job servers dispatched by teh current dispatcher.

Queue<JobContainerType> IG.Num.ParallelJobDispatcherBase< JobContainerType >._jobQueue = new Queue<JobContainerType>()
protected

Queue of jobs that could not be immediately served, scheduled for later execution.

bool IG.Num.ParallelJobDispatcherBase< JobContainerType >._commandStopServing = false
protected
bool IG.Num.ParallelJobDispatcherBase< JobContainerType >._systemPriorityUpdatesRegistered = false
protected

Whether the "event" handler for system priprity changes has already been registered.

ThreadPriority IG.Num.ParallelJobDispatcherBase< JobContainerType >._threadPriority = UtilSystem.ThreadPriority
protected
Thread IG.Num.ParallelJobDispatcherBase< JobContainerType >._workingThread
protected
bool IG.Num.ParallelJobDispatcherBase< JobContainerType >._isServerRunning = false
protected

Property Documentation

int IG.Num.ParallelJobDispatcherBase< JobContainerType >.NumEnqueuedJobs
get

Gets the number of enqueued jobs.

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

int IG.Num.ParallelJobDispatcherBase< JobContainerType >.NumJobServers
get

number of job cervers that are currently asigned to hte dispatcher.

int IG.Num.ParallelJobDispatcherBase< JobContainerType >.NumActiveJobServers
get

Gets number of active job servers assigned to the dispatcher.

These are servers capable of serving jobs, but which may be busy at the moment.

int IG.Num.ParallelJobDispatcherBase< JobContainerType >.NumExecutingJobs
get

Gets number of currently executing jobs.

The number is obtained by counting assigned job servers that are in the state of job execution.

Referenced by IG.Num.ParallelJobContainerGen< InputType, ResultType >.TestPerformance().

bool IG.Num.ParallelJobDispatcherBase< JobContainerType >.CommandStopServing
getprotected set

Flag indicating whether the server should be stopped. If set to true and server thread is runing, then the server thread stops when the currently run job completes (or stops immediately if there is no job running).

ThreadPriority IG.Num.ParallelJobDispatcherBase< JobContainerType >.ThreadPriority
getset

Priority of the dispatcher and contained server threads.

Setting priority changes priority of the dispatcher thread and all server threads.

When threads are created, they will adopt the priority.


The documentation for this class was generated from the following file: