Advanced Recipes


This mixin keeps a simple history of changes that have been made to a resource, storing them in a database table with a JSON field. HistoryMixin is a drop-in addition to any ModelResource.

ChangeSet = fields.Object({
    "updated_at": fields.DateTime(),
    "changes": fields.List(fields.Object({
        "attribute": fields.String(),
        "old": fields.Any(nullable=True),
        "new": fields.Any(nullable=True)

class HistoryRecord(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    object_type = db.Column(db.String(20), index=True, nullable=False)
    object_id = db.Column(db.Integer, index=True, nullable=False)
    updated_at = db.Column(db.DateTime,, nullable=False)
    changes = db.Column(postgresql.JSONB)

    __mapper_args__ = {
        "order_by": "updated_at"

class HistoryMixin(object):
    @ItemRoute.GET('/history', rel="history")
    def history(self, item) -> fields.List(ChangeSet):
        history = HistoryRecord.query \
                       object_id=getattr(item, self.meta.get('id_attribute', 'id'))) \

        return history

@before_update.connect_via(ANY, weak=False)
def history_on_update(resource, item, changes):
    if issubclass(resource, HistoryMixin):
        history = HistoryRecord(object_type=item.__tablename__,
                                object_id=getattr(item, resource.meta.get('id_attribute', 'id')),

        fields_by_attribute = {
            field.attribute or key: field for key, field in resource.schema.fields.items()

        for attribute, change in changes.items():
            field = fields_by_attribute[attribute]
                "attribute": attribute,
                "old": field.output(attribute, item),
                "new": field.output(attribute, changes)



Sometimes soft-deletion is preferable over full deletion. This custom ModelResource and Manager does not delete items, instead it archives them, removing them from the main instances route. Archived items can be viewed in the archive route from where they can be restored but not updated.

Replace RelationalManager with an appropriate base class, such as SQLAlchemyManager. PrincipalManager can also be used as the base class for the manager with some minor changes.

class Location(Enum):
    BOTH = 3

class ArchiveManager(RelationalManager):
    def _query(self, source=Location.INSTANCES_ONLY):
        query = super()._query(self)

        if source == Location.BOTH:
            return query
        elif source == Location.ARCHIVE_ONLY:
            return query.filter(getattr(self.model, 'is_archived') == True)
            return query.filter(getattr(self.model, 'is_archived') == False)

    def instances(self, where=None, sort=None, source=Location.INSTANCES_ONLY):
        query = self._query(source)
        if where:
            expressions = [self._expression_for_condition(condition) for condition in where]
            query = self._query_filter(query, self._and_expression(expressions))
        if sort:
            query = self._query_order_by(query, sort)
        return query

    def archive_instances(self, page, per_page, where=None, sort=None):
        return self\
            .instances(where=where, sort=sort, source=Location.ARCHIVE_ONLY)\
            .paginate(page=page, per_page=per_page)

    def read(self, id, source=Location.INSTANCES_ONLY):
        query = self._query(source)
        if query is None:
            raise ItemNotFound(self.resource, id=id)
        return self._query_filter_by_id(query, id)

class ArchivingResource(ModelResource):
    class Meta:
        manager = ArchiveManager
        exclude_routes = ['destroy'] # we're using rel="archive" instead.

    class Schema:
        is_archived = fields.Boolean(io='r')

    @Route.GET('/<int:id>', rel="self", attribute="instance")
    def read(self, id) -> fields.Inline('self'):
        return, source=Location.BOTH)

    def update(self, properties, id):
        item =, source=Location.INSTANCES_ONLY)
        updated_item = self.manager.update(item, properties)
        return updated_item

    update.response_schema = update.request_schema = fields.Inline('self', patch_instance=True)

    def destroy(self, id):
        item =, source=Location.INSTANCES_ONLY)
        self.manager.update(item, {"is_archived": True})
        return None, 204

    def archive_instances(self, **kwargs):
        return self.manager.archive_instances(**kwargs)

    archive_instances.request_schema = archive_instances.response_schema = Instances()

    @Route.GET('/archive/<int:id>', rel="readArchived")
    def read_archive(self, id) -> fields.Inline('self'):
        item =, source=Location.ARCHIVE_ONLY)

    @Route.POST('/archive/<int:id>/restore', rel="restoreFromArchive")
    def restore_from_archive(self, id) -> fields.Inline('self'):
        item =, source=Location.ARCHIVE_ONLY)
        return self.manager.update(item, {"is_archived": False})