Skip to main content

Lambda Tasks

Lambda tasks are executed by the AWS Lambda service. Unlike XINA Run tasks, Lambda tasks are registered on the XINA server with a mapping of XINA task name to lambda function name. The XINA server maintains a queue of lambda tasks and executes each in the order they are received.

To execute a lambda task, the server first creates the lambda payload as a JSON object with the following properties:

Property Type Value
conf JSON object task configuration
task_id integer the unique numeric ID assigned to the task by the XINA server
parent_id integer the unique numeric ID of the parent task (optional, if provided in RUN action)
bucket string the AWS S3 bucket used by the current XINA instance
queue string the AWS SQS queue URL used by the current XINA instance

The lambda function is then invoked synchronously by the XINA server to capture any logged output. If an error occurs, either in the act of invocation or as reported by the function, the task will be marked as failed and include any log information that is available. Otherwise, the behavior is determined by whether the open flag has been set. If open is false, the task will be marked as finished and include any logging output. If open is true, the task will be updated but remain incomplete.

XINA Queue

The purpose of the open flag and parent_id is to allow chained operations to be associated with a single task. Unlike XINA Run tasks, which can communicate directly with the XINA server through the XINA Tunnel application, lambda tasks do not currently have a way to directly access the server. To solve this problem lambda tasks can use the XINA Queue workflow, which builds on the AWS SQS (Simple Queue Service) to process a FI-FO queue of XINA API actions. Actions can be queued at any point during a lambda task execution by invoking the xina-queue lambda function, which will then be executed asynchronously by the XINA server. If a lambda task requires the response of an API action, it can be accessed from S3 by a subsequent lambda task.

The open flag indicates that is a task is not necessarily complete when an initial lambda function execution completes successfully, and may be followed by subsequent XINA API actions through the XINA queue, and possibly additional lambda functions as well. Therefore tasks which set the open flag must invoke a CONCLUDE API action through the XINA queue at ultimate end of their execution, with the task ID of the originating task. This has the simple format:

 {
  "action" : "conclude",
  "task"   : <task ID>
 }

If the initial lambda function launches additional lambda function(s) through chained RUN actions, the originating task ID should be passed to future functions with the parent property. This provides a way to associate all subsequent API actions to the originating task.

To invoke the xina-queue function, a payload must be provided as a JSON object with the following properties:

Property Type Value
source string "default", "s3", or "url" (optional, default "default")
objects JSON object binary objects required by action (optional)
action string or JSON object the API action to queue
task_id integer the current task ID
parent_id integer the parent task ID, if available
bucket string the AWS S3 bucket (provided by XINA lambda payload)
queue string the AWS SQS queue URL (provided by XINA lambda payload)
ignore boolean indicates if failure should be ignored (optional, default false)

The API action may be provided directly in the payload, as an S3 object, or from an arbitrary URL. If source is "default", the action will be loaded as the JSON object of the action property. If source is "s3", action must be a JSON object specifying the S3 bucket and key. If source is "url", the action must be a string of the URL from which to load the action. Synchronous lambda invocation has a maximum payload size of 6MB, so any action close to this size should use the S3 or URL source settings.

Actions requiring additional files can specify them through the objects property. Each value of the objects object must either be a string representation of a URL from which to load the file, or a JSON object specifying a bucket and key property to load the file from S3. The property name is used to reference the file in the action, but it must be preceeded by a $ character. For example,

 {
  "objects": {
   "big_csv" : { "bucket": "foo", "key": "bar" }
  },
  "action": {
   "action": "load",
   "database": "some.database",
   "object_id": "$big_csv"
  }
 }

If the action is queued successfully, xina-queue will return a JSON object with the property queue_id, which will be a 40 character string representation of a UUID assigned to the action. This ID can be passed to subsequent API actions to reference the results of the queued action, once complete.

Note that, by default, if any queued API action fails, or a chained task fails in execution, the originating task will automatically be marked as failed, and any subsequent queued actions will not be run by the server. This approach has two intended benefits. Firstly, it removes the need for chained tasks to check if actions ran successfully (because if the task is executing, it can assume any previous actions were successful). Secondly, it prevents unintended consequences, as in many cases there may be multiple actions relying on serial completion, and correcting an issue may be more difficult if only some actions complete successfully. If the outcome of an action does not imply success of the originating task, the ignore flag may be set on the xina-queue invocation, which will attempt to execute the action regardless of the state of the task.

Example

The following example demonstrates many of the features of the XINA lambda queue workflow. It is implemented as two JavaScript lambda functions, xina-queue-demo-a and xina-queue-demo-b. The task will load some data from the server, then post that data to the XINA wall along with a text message provided in the configuration.

Note that some details of AWS API implementation and Lambda integration are JavaScript-specific in these examples. For different languages consult the Lambda developer guide.

xina_queue_demo.json

This is the API action used to run the task in our example. The task name queue_demo is registered on the XINA server to the lambda function xina-queue-demo-a.

 {
  "action" : "run",
  "tasks"  : [
   {
    "name" : "queue_demo",
    "conf" : { "message": "Hello world!" },
    "open" : true
   }
  ]
 }

xina-queue-demo-a

First we initialize the required AWS libraries and create the handler function. The basic task information can be loaded from the event object.

const aws = require('aws-sdk');
const lambda = new aws.Lambda();

exports.handler = async(event) => {
    // get metadata from the event
    const task_id = event.task_id;
    const bucket = event.bucket;
    const queue = event.queue;
    
    // get the message from the event
    const message = event.conf.message;

Now we create the API action (in this case, the SCHEMA action, which will return a JSON representation of the XINA environment), the payload for the xina-queue function, and the invocation parameters.

Note that if queuing multiple actions from a single lambda function it is important to use the RequestResponse invocation type, as this will execute the queue operation synchronously. Otherwise they may be queued in a different order than intended.

    // the action to queue
    let action = {
        action: "schema"
    }
    
    // the payload for the xina-queue function
    let payload = {
	    parent_id : task_id, // provide task ID as parent ID to start event chain
        task_id,
        bucket,
        queue,
        action
    };
    
    // the parameters to invoke the xina-queue function
    let params = {
        FunctionName   : 'xina-queue',
        InvocationType : 'RequestResponse', // run synchronously
        LogType        : 'Tail',            // include log
        Payload        : JSON.stringify(payload)
    };

Now we invoke the xina-queue lambda function. If it completes successfully, load the queue ID (UUID string) from the response JSON object.

    return lambda.invoke(params).promise().then((data) => {
        // check for errors reported by xina-queue
        if (data.FunctionError) return Promise.reject(data.FunctionError);
        
        // parse the JSON object from the response payload
        let response = JSON.parse(data.Payload.toString());
        
        // get the UUID from the response, use this to reference the result
        let queue_id = response.queue_id;

Finally, invoke xina-queue again with a RUN action for a queue_demo_b task, which will map the xina-queue-demo-b lambda function. Note we pass the task_id of the current task as the parent here, and the generated queue_id of the previous xina-queue invocation, so we can load the results when xina-queue-demo-b executes.

        // action to run the next lambda function
        action = {
            action : 'run',
            parent : task_id, // assign the parent ID from the current task ID
            tasks  : [{
                name: 'queue_demo_b',
                conf: { message, data: queue_id } // next fn uses queue_id to access results
            }]
        }
        
        payload.action = action;
        
        params.Payload = JSON.stringify(payload);
        
        return lambda.invoke(params).promise();
    }).then((data) => {
        // check for errors reported by xina-queue
        if (data.FunctionError) return Promise.reject(data.FunctionError);
        
        console.log('executed successfully');
    });
};

xina-queue-demo-b

First we initialize the required AWS libraries and create the handler function, as before. The S3 library is now required to access the results of the previous query.

const aws = require('aws-sdk');
const lambda = new aws.Lambda();
const s3 = new aws.S3();

exports.handler = async(event) => {
	// get metadata from the event
	const parent_id = event.parent_id;
	const task_id = event.task_id;
	const bucket = event.bucket;
	const queue = event.queue;

	// get the conf from the event
	const conf = event.conf;
	const message = conf.message;
	const queue_id = conf.data;

Request the object from S3, using the provided bucket and standard key as defined by xina-queue. In the real world we would do something interesting with the result, here we just grab the length to include in our post.

	let params = {
	    Bucket: bucket,
	    Key: "queue/" + queue_id + "/out/content.json"
	};
	
	s3.getObject(params).promise().then((data) => {
	    const length = data.Body.length;

Now we build our POST action using the message and result of the query, and pass along to xina-queue. Note we don't bother getting the queue_id as we won't need to load the result of this API call.

	    let action = {
	        action: "post",
	        wall: "$",
	        post: { text: message + ' (schema is this long: ' + length + ')' }
	    };
	    
    	let payload = {
    	    parent_id,
    		task_id,
    		bucket,
    		queue,
    		action
    	};
    
    	// the parameters to invoke the xina-queue function
    	params = {
    		FunctionName: 'xina-queue',
    		InvocationType: 'RequestResponse',
    		LogType: 'Tail',
    		Payload: JSON.stringify(payload)
    	};
    	
    	return lambda.invoke(params).promise();
	}).then((data) => {
		// check for errors reported by xina-queue
		if (data.FunctionError) return Promise.reject(data.FunctionError);

Last we pass the CONCLUDE action to xina-queue with the parent_id, to notify the XINA server that the task and all subsequent API calls have completed.

		// action to run the next lambda function
		let action = {
			action : 'conclude',
			task   : parent_id
		};
		
		let payload = {
    	    parent_id,
    		task_id,
    		bucket,
    		queue,
    		action
    	};

		params.Payload = JSON.stringify(payload);

		return lambda.invoke(params).promise();
	}).then((data) => {
		// check for errors reported by xina-queue
		if (data.FunctionError) return Promise.reject(data.FunctionError);

		console.log('executed successfully');
	});
};