BufferedTask

BufferedTask is an adapter which executes another task (called child task) in an asynchronous fashion. The data are sent to ingest pipeline asynchronously and it takes a significant fraction of a cycle to just assemble the data structure. Some tasks require all data to be present (e.g. writing the data to disk by MSSink) but do not alter the data in any way. This task creates a copy of the data and buffers them for the service thread to execute the child task on these data. Provided the child fundamentally keeps up with data rate and there is no issues related to memory bandwidth, etc slowing down the rest of the ingest pipeline, this approach gives more time for the child task to finish. In particular, it is found useful to wrap the MS writing this way. Any task which does not alter data or data distribution pattern (except on the first iteration which is executed in the main thread as normal to allow automatic distribution patterns) can be used. If the first condition is not the case and the data are altered by the child task, the modifications are simply lost. If the latter condition is not fulfilled - the result is unpredictable and can cause MPI lock-ups or crashes. It, therefore, requires an expert user to make the decision about ingest configuration when it comes to buffering.

Configuration Parameters

Ingest pipeline requires a configuration file be provided on the command line. This section describes the valid parameters applicable to this particular task. These parameters need to be defined only if this task is used. As for all tasks, parameters are taken from keys with tasks.name.params prefix (not shown in the table below) where name is an arbitrary name assigned to this task and used in tasklist (this allows us to run the same task more than once with different parameters). The type of the task defined by tasks.name.type should be set to BufferedTask.

Parameter

Type

Default

Description

child

string

None

The (logical) name of the child task used with this instance of buffered task (there could be more than one buffered task in the tasklist wrapping around different tasks, e.g. MSSink and TCPSink). Any name which can be used in tasklist is acceptable.

lossless

boolean

true

Flag showing what to do if the child task is found to not keep up and the timeout defined by maxwait has expired. If true, an exception is thrown and data acquision is stopped. Otherwise, the new data chunk is ingored and processing continues (with an error message in the log).

size

unsigned int

1

Size of the buffer. Although, in principle, having a longer buffer might help in the case of very variable execution times for the child, if it fundamentally keeps up, the general buffering mechanism of ingest pipeline’s Source task should be sufficient.

maxwait

unsigned int

30

Maximum waiting time before triggering the timeout condition (see also lossless), if the buffer is full.

Example

########################## BufferedTask ##############################

# the following line would force ingest to use orginary unbuffered MSSink
#tasks.tasklist = [MergedSource, CalcUVWTask, MSSink]
# and this task list triggers buffering
tasks.tasklist = [MergedSource, CalcUVWTask, BufferedMSSink]


# type of the task
tasks.BufferedMSSink.type = BufferedMSSink
# child to be wrapped around - MSSink in this instance
tasks.BufferedMSSink.params.child = MSSink

# ordinary setup of MSSink task

# the measurement sets will be created in the current directory and named like 2015-12-22_162617.ms
tasks.MSSink.params.filename = %d_%t.ms
# pointing table will be written
tasks.MSSink.params.pointingtable.enable = true
# type of the task
tasks.MSSink.type = MSSink