I've been playing with making a service that runs containers via Amazon's Elastic Container Service. This evening I finally got to the point where it was time to start experimenting with schedulers and other such fun-ness.

After a few quick tests, it became clear that the scheduler I'm working on might need to have more information to intelligently schedule tasks in the cluster. Let me try to illustrate what I mean

  • Task definition: 100MB ram, less than 1 core, and takes 5 minutes to complete
  • Container Instance: 512MB of ram / 1 core

I queue 100 of these tasks in SQS. The scheduler quickly pulls the messages out of the queue and tries to schedule the tasks to run in the ECS cluster. After 5 tasks are active, all ram on the container instance is consumed and no new tasks can be run until the existing tasks finish.

Upon ECS task rejection, the scheduler performs no further action on the SQS message. This means that the message eventually gets requeued (after a default of 30 seconds it becomes visible again). This sounded like an easy way to do things, but if the cluster is full, and high priority tasks get rejected, it is possible for lower scheduled tasks to run first if the higher priority messages are invisible (waiting to be requeued) in SQS.

So ideally the scheduler will know the present state of the cluster before actually calling RunTask or StartTask. Unfortunately its kind of a pain to query all of the cluster metadata every time we want to run a task. To gather and collect all of the pertinent information about a given cluster it requires 4 HTTP queries, and a fair amount of json mangling.

I started thinking about this a little, and searching for how other people do stuff like this and I stumbled upon http://williamthurston.com/2015/08/20/create-custom-aws-ecs-schedulers-with-ecs-state.html + https://github.com/jhspaybar/ecs_state

Basically it is a proto-scheduler that keeps cluster state in an in-memory sqlite database. They provide a nice example of how this sort of thing could be used to build a completely customized StartTask API scheduler. For example: https://github.com/jhspaybar/ecs_state/blob/6686cdfc418385e8db76d6bc719c7c278a10b471/ecs_state.go#L373

But to get started I really just want to use the RunTask API, so all I really need to know is "is there space in the cluster right now?", so I wrote a simple tool to cache cluster metadata in redis which is super easy/fast to query and make simple decisions from.

import boto3  
import redis


def summarize_resources(resource_list):  
    response = {}
    for r in resource_list:
        response[r['name']] = r['integerValue']
    return response


def cluster_remaining_resources(cluster_name, instance_arns):  
    remaining_cpu = 0
    remaining_memory = 0

    for arn in instance_arns:
        instance_key = '%s:instance:%s' % (cluster_name, arn)
        remaining_cpu += int(redis_client.hget(instance_key, 'remaining_cpu'))
        remaining_memory += int(redis_client.hget(instance_key, 'remaining_memory'))
    return dict(cpu=remaining_cpu, memory=remaining_memory)


pool = redis.ConnectionPool(host='localhost', port=6379, db=0)  
redis_client = redis.Redis(connection_pool=pool)

ecs = boto3.client('ecs')

cluster_arns = ecs.list_clusters()['clusterArns']  
clusters = ecs.describe_clusters(clusters=cluster_arns)['clusters']

for cluster in clusters:  
    cluster_arn = cluster['clusterArn']
    container_instance_list = ecs.list_container_instances(cluster=cluster_arn)
    container_instance_arns = container_instance_list['containerInstanceArns']
    instances = ecs.describe_container_instances(cluster=cluster_arn,
                                                 containerInstances=container_instance_arns)

    for i in instances['containerInstances']:
        registered_resources = summarize_resources(i['registeredResources'])
        remaining_resources = summarize_resources(i['remainingResources'])

        instance_state = {
            'status': i['status'],
            'active_tasks': i['runningTasksCount'],
            'registered_cpu': registered_resources['CPU'],
            'registered_memory': registered_resources['MEMORY'],
            'remaining_cpu': remaining_resources['CPU'],
            'remaining_memory': remaining_resources['MEMORY']
        }

        instance_key = '{cluster_arn}:instance:{instance_arn}'
        key = instance_key.format(cluster_arn=cluster_arn,
                                  instance_arn=i['containerInstanceArn'])
        redis_client.hmset(key, instance_state)

    cluster_resources = cluster_remaining_resources(cluster_arn,
                                                    container_instance_arns)

    cluster_rm_key = '%s:remaining_memory' % cluster_arn
    cluster_rcpu_key = '%s:remaining_cpu' % cluster_arn

    redis_client.set(cluster_rcpu_key, cluster_resources['cpu'])
    redis_client.set(cluster_rm_key, cluster_resources['memory'])

Once that is done you can query whether a cluster has enough resources by running

if redis.get('cluster_arn:remaining_memory') < task_definition_memory_requirement:  
    # unable to run task, don't try to RunTask