June 15, 2020

Querying and paging a DynamoDB Partition using .NET

The AWS documentation for working with DynamoDB using .NET can be a little confusing at first, especially given there are three different APIs you can use.

I recently had the need to return all the items in a DynamoDB partition without providing a range (sort) key. The example below demonstrates how to do this using the DynamoDB .NET Object Persistence Model, aka DynamoDBContext:

public async Task<IEnumerable<SubjectEvent>> GetEventsBySubjectAsync(Id accountId, string subjectId, CancellationToken cancellationToken)
{
    accountId.ThrowIfNull(nameof(accountId));
    subjectId.ThrowIfNullOrWhiteSpace(subjectId);

    AsyncSearch<SubjectEventDocument> search = DbContext.QueryAsync<SubjectEventDocument>(
        SubjectEventDocument.CreateHashKey(accountId, subjectId),
        new DynamoDBOperationConfig { OverrideTableName = TableName, BackwardQuery = true }
    );

    var events = new List<SubjectEventDocument>();

    do
    {
        using (_metrics.TimeDynamoOperation(TableName, "get_events_by_subject"))
        using (_logger.TimeDebug("Getting events by subject from {Table}", TableName))
        {
            List<SubjectEventDocument> nextSet = await search.GetNextSetAsync(cancellationToken);
            events.AddRange(nextSet);
        }
    }
    while (!search.IsDone);

    return events.Select(e => e.ToSubjectEvent(e)).ToList();
}

The above code will page through all of the items in the partition. According to the docs, by default DynamoDB will return as many documents as possible up to the 1MB limit. If you want to control the number of documents returned, for example, so you can implement pagination, you need to set the limit parameter which unfortunately is not available when using DynamoDBContext.

Paginating queries with the Document Model

If you want to implement pagination with DynamoDB and .NET you need to use the Document Model API. With this API you can provide the limit parameter on queries:

public async Task<PagedResult<SubjectEvent>> GetEventsBySubjectAsync(
    Id accountId,
    string subjectId,
    int pageSize,
    string paginationToken = null,
    CancellationToken cancellationToken = default)
{
    accountId.ThrowIfNull(nameof(accountId));
    subjectId.ThrowIfNullOrWhiteSpace(subjectId);

    Search search = Table.Query(new QueryOperationConfig
    {
        Limit = pageSize,
        BackwardSearch = true,
        PaginationToken = paginationToken,
        KeyExpression = new Expression
        {
            ExpressionStatement = "pk = :pk",
            ExpressionAttributeValues = 
            {
                { "pk", SubjectEventDocument.CreateHashKey(accountId, subjectId) }
            }
        }
    });

    var documents = new List<Document>();

    do
    {
        using (_metrics.TimeDynamoOperation(TableName, "get_events_by_subject"))
        using (_logger.TimeDebug("Getting events by subject from {Table}", TableName))
        {
            List<Document> nextSet = await search.GetNextSetAsync(cancellationToken);
            documents.AddRange(nextSet);
        }
    }
    while (!search.IsDone);

    return new PagedResult<SubjectEvent>
    {
        PaginationToken = search.PaginationToken,
        Data = documents.Select(doc => DbContext.FromDocument<SubjectEvent>(doc)).ToList()
    };
}

In the above example we pass back the pagination token so that it can be provided if the user wants to navigate to the next page of data. The BackwardsSearch parameter can be use to determine if the query should page forwards or backwards.

Adding query conditions

For completeness, below is an example of using the .NET Object Persistence Model to query based on a range key condition:

public async Task<IEnumerable<Action>> GetAllAsync(CancellationToken cancellationToken)
{
    var filter = new QueryFilter(DynamoFields.Pk, QueryOperator.Equal, ActionDocument.CreateHashKey);
    filter.AddCondition(DynamoFields.Sk, QueryOperator.BeginsWith, "action_");
    
    using (_metrics.TimeDynamoOperation(TableName, "get_actions"))
    using (_logger.TimeDebug("Loaded actions from DynamoDb table {Table}", TableName))
    {
        var docs = await DbContext.FromQueryAsync<ActionDocument>(
                new QueryOperationConfig { Filter = filter }, OperationConfig
            )
            .GetRemainingAsync(cancellationToken);

        return docs.Where(x => x.Enabled).Select(doc => doc.To());
    }
}

© 2022 Ben Foster