7.0.1685-testing

Tasks and Concurrency

Tasks

GreyCat comes with a concept of asynchronous tasks built-in. The mental model for the tasks is close to what other languages have:

  • Java’s Future
  • JavaScript’s Promise

They essentially are a handle to a potentially not-yet-done computation. Allowing you to poll for readiness, or cancel it mid-way. Tasks are well-suited for long calculations using the graph or involving network requests.

From a language perspective, there are no differences between defining a task or a function.

Exposing a Task

To execute a task remotely, add task as a key to the Header when making a request, this will return an instance of a runtime::Task containing all the relevant meta information about the spawned task.

curl -H "task:''" -X POST -d '[]' http://localhost:8080/project::long_computation
# Task{user_id:1,task_id:1,mod:"tasks",type:null,fun:"long_computation",creation:'2024-06-24T10:23:47.965517+00:00',start:null,duration:null,status:TaskStatus::waiting}

Task Object

Field Type Description
user_id int The id of the user that spawned the task
task_id int The unique id of the spawned task
mod String? The Task Mode
fun String? The name of the function spawned
creation time Time when the task was spawned
start time? Time when the task started execution
duration duration? How long the task took
status TaskStatus The status of the task

To follow the task status call runtime::Task::info with the tasks user and task id

curl -X POST -d '[1,1]' http://localhost:8080/runtime::Task::info
# TaskInfo{user_id:1,task_id:1,mod:"tasks",type:null,fun:"long_computation",creation:'2024-06-24T10:15:18.814828+00:00',start:'2024-06-24T10:15:18.815025+00:00',duration:8ms278us,status:TaskStatus::ended,progress:1.0,remaining:null,sub_waiting:0,sub_tasks_all:0}

TaskInfo

In addition to the Task fields the TaskInfo also contains additional information

Field Type Description
progress float? The current task progress, can be updated by the user manually
remaining duration? Based on the progress will estimate how much time is remaining
sub_waiting int? How many subtasks are waiting to be executed
sub_tasks_all int? How many subtasks where spawned in total

Task Status

To track the state of your Task the response contains an TaskStatus enum that explains the different stages it can be in

Enum Value Description
empty Task is empty
waiting Task is waiting
running Task is running
await Task is awaiting
cancelled Task is cancelled
error Task encountered error
ended Task ended successfully
ended_with_errors Task ended with errors

Retrieve Task result

In the above example after fetching the task info, the TaskStatus is set to ended which means we can retrieve the result which is stored in an result.gcb file

The numbers after files and tasks are the respective user and task id that you would replace with the ones from the TaskInfo

curl -X GET 'http://localhost:8080/files/0/tasks/1/result.gcb?json'

Concurrency using Jobs

A Job is a handle to a function and its arguments, creating a Job does nothing on its own, it is just a GreyCat object

Calling the await on the jobs will pause the current task while awaiting for its jobs to complete.

fn long_computation(max: int): int {
  var count = 0;
  for (var i = 0; i < max; i++) {
    count++;
  }
  return count;
}

fn main() {
 // Define the task request object by providing the arguments and the function pointer
 var jobs = Array<Job> {
     Job {function: project::long_computation , arguments: [100_000] },
     Job {function: project::long_computation , arguments: [100_000] }
 };

// Blocks code execution awaiting the end or fails early if an error occurs
 await(jobs);

 for(_, job in jobs){
  // accessing the result
  var result = job.result();
 }
}

Errors (fail-fast)

await Will throw if at least one job has raised an exception, you can try/catch it and iterate over the jobs result to find the error

Parallel writes (Concurrent Writing)

You are able to write in parallel to the graph, the only caveat is you may not write to the same node in parallel, since there is a build in protection.

In a future update this limitation may be improved with an automatic best effort merge.

The following code snippet shows an example of creating independent nodes in parallel and insert them into a global index at the end;

This model becomes very powerful when used to import big amounts of data from separate files, or executing heavy computation.

var sensor_list: nodeList<node<Sensor>>;

fn main() {
  var jobs = Array<Job> {};

  jobs.add(Job {function: project::import });
  jobs.add(Job {function: project::import });

  await(jobs);

  for (_, job in jobs) {
    var sensors = job.result();
    for (_, sensor: node<Sensor> in sensors) {
      sensor_list.add(sensor);
    }
  }
}

fn import(): Array<node<Sensor>> {
  var sensors = Array<node<Sensor>> {};
  for (var i = 0; i < 10; i++) {
    var sensor = node<Sensor> { Sensor { history: nodeTime<int> {} }};
    sensors.add(sensor);
  }
  return sensors;
}

Limitations

Take the following snippet as an example, it will raise the following error message wrong state before await, variable contains an object stored in a node.

type Foo {
  status: String;
}

fn task(foo: node<Foo>) {
  var resolved_foo = foo.resolve();

  // awaiting jobs

  resolved_foo.status = "Done";
}

When execution reaches an await point, the current function scope is serialized. This means that any object resolved from a node before an await may become outdated after resumption, as the node’s content might have changed in the meantime. Accessing such outdated values is unsafe and will result in an error.

Scope serialization occurs because the current execution context is discarded and later reconstructed from the serialized data upon resuming execution after the await.

Leverage the arrow operator if you need to modify an object inside a node, or set the variable to null before the await, resolved_foo = null, both methods will work.

Periodic tasks

How it works

Periodic tasks are like Tasks but as the name implies GreyCat will run them periodically once registered.

To register periodic tasks we need to use the runtime module:

fn my_task() {
  println("The current time is ${time::now()}");
}

fn main() {
  var my_task_every_day = PeriodicTask {
    user_id: 0,                 // the user associated with the execution
    arguments: null,            // the arguments to use for the execution
    every: 1_day,               // the periodicity as a duration
    function: project::my_task, // the function pointer of the task
    start: time::now(),         // the time of the first execution
  };

  // register the task in the scheduler
  PeriodicTask::set(Array<PeriodicTask>{my_task_every_day});
}

Note that as of today, the scheduler API is only providing PeriodicTask::set(...) which means you have to always define the whole list of periodic task when calling PeriodicTask::set(...) otherwise the previously registered ones will be unregistered.

Manipulating periodic tasks

Because the API is minimalist, manipulating the current list of registered PeriodicTasks is a bit tedious so here is an example of abstraction above what is currently available in the standard library:

type PeriodicTaskHelper {
  /// Schedules the given task
  static fn schedule(pTask: PeriodicTask) {
    var tasks = PeriodicTask::all();
    tasks.add(pTask);
    PeriodicTask::set(tasks);
  }

  /// Removes the first PeriodicTask from the scheduler and returns it
  ///
  /// If there is no tasks scheduled, `null` is returned.
  static fn shift(): PeriodicTask? {
    var tasks = PeriodicTask::all();
    var pTask: PeriodicTask?;
    if (tasks.size() > 0) {
      pTask = tasks[0];
      tasks.remove(0);
      PeriodicTask::set(tasks);
    }
    return pTask;
  }

  /// Removes the last PeriodicTask from the scheduler and returns it
  ///
  /// If there is no tasks scheduled, `null` is returned.
  static fn pop(): PeriodicTask? {
    var tasks = PeriodicTask::all();
    var len = tasks.size();
    var pTask: PeriodicTask?;
    if (len > 0) {
      pTask = tasks[len];
      tasks.remove(len);
      PeriodicTask::set(tasks);
    }
    return pTask;
  }

  /// Removes the task at the given index.
  ///
  /// Returns `true` on success, otherwise `false`
  static fn remove_at(index: int): bool {
    var tasks = PeriodicTask::all();
    var len = tasks.size();
    if (index >= len) {
      return false;
    }
    tasks.remove(index);
    PeriodicTask::set(tasks);
    return true;
  }

  /// Removes the first task that matches `task.function == ptr`
  ///
  /// Returns `true` on success, otherwise `false`
  static fn remove_first(ptr: function): bool {
    var tasks = PeriodicTask::all();
    for (i, pTask in tasks) {
      if (pTask.function == ptr) {
        tasks.remove(i);
        PeriodicTask::set(tasks);
        return true;
      }
    }
    return false;
  }

  /// Removes the last task that matches `task.function == ptr`
  ///
  /// Returns `true` on success, otherwise `false`
  static fn remove_last(ptr: function): bool {
    var tasks = PeriodicTask::all();
    var len = tasks.size();
    for (var i = len - 1; i >= 0; i--) {
      if (tasks[i].function == ptr) {
        tasks.remove(i);
        PeriodicTask::set(tasks);
        return true;
      }
    }
    return false;
  }
}