In this page
Tasks and Concurrency
Tasks
GreyCat comes with a concept of asynchronous tasks built-in. The mental model for the tasks is close to what other languages have:
- Java’s
Future
- JavaScript’s
Promise
They essentially are a handle to a potentially not-yet-done computation. Allowing you to poll for readiness, or cancel it mid-way. Tasks are well-suited for long calculations using the graph or involving network requests.
From a language perspective, there are no differences between defining a task or a function.
Exposing a Task
To execute a task remotely, add task as a key to the Header when making a request, this will return an instance of a runtime::Task
containing all the relevant meta information about the spawned task.
curl -H "task:''" -X POST -d '[]' http://localhost:8080/project::long_computation
# Task{user_id:1,task_id:1,mod:"tasks",type:null,fun:"long_computation",creation:'2024-06-24T10:23:47.965517+00:00',start:null,duration:null,status:TaskStatus::waiting}
Task Object
Field | Type | Description |
---|---|---|
user_id | int |
The id of the user that spawned the task |
task_id | int |
The unique id of the spawned task |
mod | String ? |
The Task Mode |
fun | String ? |
The name of the function spawned |
creation | time |
Time when the task was spawned |
start | time ? |
Time when the task started execution |
duration | duration ? |
How long the task took |
status | TaskStatus |
The status of the task |
To follow the task status call runtime::Task::info with the tasks user and task id
curl -X POST -d '[1,1]' http://localhost:8080/runtime::Task::info
# TaskInfo{user_id:1,task_id:1,mod:"tasks",type:null,fun:"long_computation",creation:'2024-06-24T10:15:18.814828+00:00',start:'2024-06-24T10:15:18.815025+00:00',duration:8ms278us,status:TaskStatus::ended,progress:1.0,remaining:null,sub_waiting:0,sub_tasks_all:0}
TaskInfo
In addition to the Task
fields the TaskInfo
also contains additional information
Field | Type | Description |
---|---|---|
progress | float? | The current task progress, can be updated by the user manually |
remaining | duration? | Based on the progress will estimate how much time is remaining |
sub_waiting | int? | How many subtasks are waiting to be executed |
sub_tasks_all | int? | How many subtasks where spawned in total |
Task Status
To track the state of your Task the response contains an TaskStatus
enum that explains the different stages it can be in
Enum Value | Description |
---|---|
empty | Task is empty |
waiting | Task is waiting |
running | Task is running |
await | Task is awaiting |
cancelled | Task is cancelled |
error | Task encountered error |
ended | Task ended successfully |
ended_with_errors | Task ended with errors |
Retrieve Task result
In the above example after fetching the task info, the TaskStatus
is set to ended which means we can retrieve the result which is stored in an result.gcb file
The numbers after files and tasks are the respective user and task id that you would replace with the ones from the TaskInfo
curl -X GET 'http://localhost:8080/files/0/tasks/1/result.gcb?json'
Concurrency using Jobs
A Job is a handle to a function and its arguments, creating a Job does nothing on its own, it is just a GreyCat object
Calling the await on the jobs will pause the current task while awaiting for its jobs to complete.
fn long_computation(max: int): int {
var count = 0;
for (var i = 0; i < max; i++) {
count++;
}
return count;
}
fn main() {
// Define the task request object by providing the arguments and the function pointer
var jobs = Array<Job> {
Job {function: project::long_computation , arguments: [100_000] },
Job {function: project::long_computation , arguments: [100_000] }
};
// Blocks code execution awaiting the end or fails early if an error occurs
await(jobs);
for(_, job in jobs){
// accessing the result
var result = job.result();
}
}
Errors (fail-fast)
await
Will throw if at least one job has raised an exception, you can try/catch it and iterate over the jobs result to find the error
Parallel writes (Concurrent Writing)
You are able to write in parallel to the graph, the only caveat is you may not write to the same node
in parallel, since there is a build in protection.
In a future update this limitation may be improved with an automatic best effort merge.
The following code snippet shows an example of creating independent nodes in parallel and insert them into a global index at the end;
This model becomes very powerful when used to import big amounts of data from separate files, or executing heavy computation.
var sensor_list: nodeList<node<Sensor>>;
fn main() {
var jobs = Array<Job> {};
jobs.add(Job {function: project::import });
jobs.add(Job {function: project::import });
await(jobs);
for (_, job in jobs) {
var sensors = job.result();
for (_, sensor: node<Sensor> in sensors) {
sensor_list.add(sensor);
}
}
}
fn import(): Array<node<Sensor>> {
var sensors = Array<node<Sensor>> {};
for (var i = 0; i < 10; i++) {
var sensor = node<Sensor> { Sensor { history: nodeTime<int> {} }};
sensors.add(sensor);
}
return sensors;
}
Limitations
Take the following snippet as an example, it will raise the following error message wrong state before await, variable contains an object stored in a node.
type Foo {
status: String;
}
fn task(foo: node<Foo>) {
var resolved_foo = foo.resolve();
// awaiting jobs
resolved_foo.status = "Done";
}
When execution reaches an await point, the current function scope is serialized. This means that any object resolved from a node before an await may become outdated after resumption, as the node’s content might have changed in the meantime. Accessing such outdated values is unsafe and will result in an error.
Scope serialization occurs because the current execution context is discarded and later reconstructed from the serialized data upon resuming execution after the await.
Leverage the arrow operator if you need to modify an object inside a node, or set the variable to null before the await, resolved_foo = null, both methods will work.
Periodic tasks
How it works
Periodic tasks are like Tasks but as the name implies GreyCat will run them periodically once registered.
To register periodic tasks we need to use the runtime
module:
fn my_task() {
println("The current time is ${time::now()}");
}
fn main() {
var my_task_every_day = PeriodicTask {
user_id: 0, // the user associated with the execution
arguments: null, // the arguments to use for the execution
every: 1_day, // the periodicity as a duration
function: project::my_task, // the function pointer of the task
start: time::now(), // the time of the first execution
};
// register the task in the scheduler
PeriodicTask::set(Array<PeriodicTask>{my_task_every_day});
}
Note that as of today, the scheduler API is only providing
PeriodicTask::set(...)
which means you have to always define the whole list of periodic task when callingPeriodicTask::set(...)
otherwise the previously registered ones will be unregistered.
Manipulating periodic tasks
Because the API is minimalist, manipulating the current list of registered PeriodicTask
s is a bit tedious so here is an example of abstraction above what is currently available in the standard library:
type PeriodicTaskHelper {
/// Schedules the given task
static fn schedule(pTask: PeriodicTask) {
var tasks = PeriodicTask::all();
tasks.add(pTask);
PeriodicTask::set(tasks);
}
/// Removes the first PeriodicTask from the scheduler and returns it
///
/// If there is no tasks scheduled, `null` is returned.
static fn shift(): PeriodicTask? {
var tasks = PeriodicTask::all();
var pTask: PeriodicTask?;
if (tasks.size() > 0) {
pTask = tasks[0];
tasks.remove(0);
PeriodicTask::set(tasks);
}
return pTask;
}
/// Removes the last PeriodicTask from the scheduler and returns it
///
/// If there is no tasks scheduled, `null` is returned.
static fn pop(): PeriodicTask? {
var tasks = PeriodicTask::all();
var len = tasks.size();
var pTask: PeriodicTask?;
if (len > 0) {
pTask = tasks[len];
tasks.remove(len);
PeriodicTask::set(tasks);
}
return pTask;
}
/// Removes the task at the given index.
///
/// Returns `true` on success, otherwise `false`
static fn remove_at(index: int): bool {
var tasks = PeriodicTask::all();
var len = tasks.size();
if (index >= len) {
return false;
}
tasks.remove(index);
PeriodicTask::set(tasks);
return true;
}
/// Removes the first task that matches `task.function == ptr`
///
/// Returns `true` on success, otherwise `false`
static fn remove_first(ptr: function): bool {
var tasks = PeriodicTask::all();
for (i, pTask in tasks) {
if (pTask.function == ptr) {
tasks.remove(i);
PeriodicTask::set(tasks);
return true;
}
}
return false;
}
/// Removes the last task that matches `task.function == ptr`
///
/// Returns `true` on success, otherwise `false`
static fn remove_last(ptr: function): bool {
var tasks = PeriodicTask::all();
var len = tasks.size();
for (var i = len - 1; i >= 0; i--) {
if (tasks[i].function == ptr) {
tasks.remove(i);
PeriodicTask::set(tasks);
return true;
}
}
return false;
}
}