from collections.abc import MutableMapping
from abc import ABCMeta
from copy import deepcopy, copy
import jsonschema
import rethinkdb as r
import logging
import logging.config
from sondra import help, utils
from sondra.document import Document, signals as doc_signals
from sondra.expose import method_schema
from . import signals
from sondra.utils import mapjson, resolve_class
_validator = jsonschema.Draft4Validator
class CollectionException(Exception):
[docs] """Represents a misconfiguration in a :class:`Collection` class definition"""
class CollectionMetaclass(ABCMeta):
[docs] """The collection is the workhorse of Sondra.
Collections are mutable mapping types, like dicts, whose keys are the keys in the database collection. The database
table, or collection, has the same name as the collection's slug with hyphens replaced by underscores for
compatibility.
Collections expand on a document schema, specifying:
* properties that should be treated specially.
* the primary key
* indexes that should be built
* relationships to other collections
Like applications, collections have webservice endpoints::
http://localhost:5000/application/collection;(schema|help|json|geojson)
http://localhost:5000/application/collection.method;(schema|help|json)
These endpoints allow the user to create, update, filter, list, and delete objects in the collection, which are
individual documents. Also, any methods that are exposed by the ``sondra.decorators.expose`` decorator are exposed
as method endpoints.
To use Python to retrieve individual Document instances, starting with the suite::
> suite['app-name']['collection-name']['primary-key']
...
<sondra.document.Document object at 0x....>
Special properties
------------------
The ``specials`` attribute is a dictionary of property names to ``sondra.document.ValueHandler`` instances, which
tell the collection how to handle properties containing objects that aren't standard JSON. This includes date-time
objects and geometry, which is handled via the `Shapely`_ library. Shapely is not supported by readthedocs, so you
must install it separately. See the individual ValueHandler subclasses in sondra.document for more information.
Attributes:
name (str): read-only. The name of the collection, based on the classname.
slug (str): read-only. The hyphen separated name of the collection, based on the classname
schema (str): read-only. The collection's schema, based on the ``document_class``
suite (sondra.suite.Suite): read-only. The suite this collection's application is a part of. None for abstract
classes.
application (sondra.application.Application). The application this collection is a part of. None for abstract
classes.
document_class (sondra.document.Document): The document class this collection contains. The schema is derived
from this.
primary_key (str): The field (if different from id) to use as the primary key. Individual documents are
referenced by primary key, both in the Python interface and the webservice interface.
private (bool=False). If the collection is private, then it is not exposed by the webservice interface. This
can be very useful for collections whose data should never be available over the 'net.
specials (dict): A dictionary of properties to be treated specially.
indexes ([str])
relations (dict)
anonymous_reads (bool=True)
abstract (bool)
table (ReQL)
url (str)
schema_url (str)
.. _Shapely: https://pypi.python.org/pypi/Shapely
"""
title = None
name = None
slug = None
schema = None
application = None
exposed_methods = None
file_storage = None
document_class = Document
primary_key = "id"
private = False
indexes = []
relations = []
anonymous_reads = True
abstract = False
@property
def suite(self):
[docs] if self.application:
return self.application.suite
else:
return None
@property
def table(self):
[docs] return r.db(self.application.db).table(self.name)
@property
def url(self):
[docs] if self._url:
return self._url
elif self.application:
return self.application.url + "/" + self.slug
else:
return self.slug
@property
def schema_url(self):
[docs] return self.url + ";schema"''
def __init__(self, application):
if self.abstract:
raise CollectionException("Tried to instantiate an abstract collection")
signals.pre_init.send(self.__class__, instance=self)
self.title = self.document_class.title
self.application = application
self._url = '/'.join((self.application.url, self.slug))
self.schema['id'] = self.url + ";schema"
self.schema = mapjson(lambda x: x(context=self.application.suite) if callable(x) else x, self.schema)
self.log = logging.getLogger(self.application.name + "." + self.name)
if self.file_storage:
self.file_storage = self.file_storage(self)
signals.post_init.send(self.__class__, instance=self)
def __str__(self):
return self.url
def help(self, out=None, initial_heading_level=0):
[docs] """Return full reStructuredText help for this class"""
builder = help.SchemaHelpBuilder(self.schema, self.url, out=out, initial_heading_level=initial_heading_level)
builder.begin_subheading(self.title)
builder.begin_list()
builder.define("Application", self.application.url + ';help')
builder.define("Schema URL", self.schema_url)
builder.define("JSON URL", self.url)
builder.define("Primary Key", self.primary_key)
builder.end_list()
builder.end_subheading()
builder.build()
if self.exposed_methods:
builder.begin_subheading("Methods")
for name, method in self.exposed_methods.items():
new_builder = help.SchemaHelpBuilder(method_schema(self, method), initial_heading_level=builder._heading_level)
new_builder.build()
builder.line(new_builder.rst)
builder.end_subheading()
if self.document_class.exposed_methods:
builder.begin_subheading("Document Instance Methods")
for name, method in self.document_class.exposed_methods.items():
new_builder = help.SchemaHelpBuilder(method_schema(None, method), initial_heading_level=builder._heading_level)
new_builder.build()
builder.line(new_builder.rst)
builder.end_subheading()
return builder.rst
def create_table(self, *args, **kwargs):
[docs] """Create the database table for this collection. Args and keyword args are sent along to the rethinkdb
table_create function. Sends pre_table_creation and post_table_creation signals.
"""
signals.pre_table_creation.send(
self.__class__, instance=self, table_name=self.name, db_name=self.application.db)
try:
r.db(self.application.db)\
.table_create(self.name, primary_key=self.primary_key, *args, **kwargs)\
.run(self.application.connection)
except r.ReqlError as e:
self.log.info('Table {0}.{1} already exists.'.format(self.application.db, self.name))
for index in self.indexes:
if isinstance(index, tuple):
index, index_function = index
else:
index_function = None
if self.schema['properties'][index].get('type', None) == 'array':
multi = True
else:
multi = False
if index in self.document_class.specials and self.document_class.specials[index].is_geometry:
geo = True
else:
geo = False
try:
if index_function:
self.table.index_create(index, index_function, multi=multi, geo=geo).run(self.application.connection)
else:
self.table.index_create(index, multi=multi, geo=geo).run(self.application.connection)
except r.ReqlError as e:
self.log.info('Index on table {0}.{1} already exists.'.format(self.application.db, self.name))
signals.post_table_creation.send(
self.__class__, instance=self, table_name=self.name, db_name=self.application.db)
def drop_table(self):
[docs] """Delete the database table for this collection. Sends pre_table_deletion and post_table_deletion signals.
"""
signals.pre_table_deletion.send(
self.__class__, instance=self, table_name=self.name, db_name=self.application.db)
ret = r.db(self.application.db).table_drop(self.name).run(self.application.connection)
self.log.info('Dropped table {0}.{1}'.format(self.application.db, self.name))
signals.post_table_deletion.send(
self.__class__, instance=self, table_name=self.name, db_name=self.application.db)
return ret
def _to_python_repr(self, doc):
for property, special in self.document_class.specials.items():
if property in doc:
doc[property] = special.to_python_repr(doc[property])
def _to_json_repr(self, doc):
for property, special in self.document_class.specials.items():
if property in doc:
doc[property] = special.to_json_repr(doc[property])
def _to_rql_repr(self, doc):
for property, special in self.document_class.specials.items():
if property in doc:
doc[property] = special.to_rql_repr(doc[property])
def __getitem__(self, key):
[docs] """Get an object from the database and populate an instance of self.document_class with its contents.
Args:
key (str or int): Primary key for the document.
Returns:
Document: An instance of self.document_class with data from the database.
Raises:
KeyError if the object is not found in the database.
"""
doc = self.table.get(key).run(self.application.connection)
if doc:
self._to_python_repr(doc)
return self.document_class(doc, collection=self, from_db=True)
else:
raise KeyError('{0} not found in {1}'.format(key, self.url))
def __setitem__(self, key, value):
[docs] """Add or replace a document object to the database.
Sends pre- and post- save signals. See signals documentation for more details.
Args:
key (str or int): The primary key for the document.
value: (dict or Document): If a Document, it should be this collection's document_class.
"""
value[self.primary_key] = key
return self.save(value, conflict='replace').run(self.application.connection)
def __delitem__(self, key):
[docs] """Delete an object from the database.
Sends pre- and post- delete signals. See signals documentation for more details.
Args:
key (str or int): The primary key for the document.
"""
doc_signals.pre_delete.send(self.document_class, key=key)
results = self.table.get(key).delete().run(self.application.connection)
doc_signals.post_delete.send(self.document_class, results=results)
def __iter__(self):
for doc in self.table.run(self.application.connection):
self._to_python_repr(doc)
yield self.document_class(doc, collection=self, from_db=True)
def __contains__(self, item):
[docs] """Checks to see if the primary key is in the database.
Args:
item (dict, Document, str, or int): If a dict or a document, then the primary key will be checked. Str or
ints are assumed to be the primary key.
Returns:
True or False.
"""
if isinstance(item, dict) or isinstance(item, Document):
key = item[self.primary_key]
else:
key = item
doc = self.table.get(key).run(self.application.connection)
return doc is not None
def __len__(self):
return self.table.count().run(self.application.connection)
def q(self, query):
[docs] """Perform a query on this collection's database connection.
Args:
query (ReQL): Should be a RethinkDB query that returns documents for this collection.
Yields:
Document instances.
"""
for doc in query.run(self.application.connection):
if 'doc' in doc:
doc = doc['doc'] # some queries return results that encapsulate the document with metadata
self._to_python_repr(doc)
yield self.document_class(doc, collection=self, from_db=True)
def doc(self, value):
[docs] """Return a document instance populated from a dict. Does **not** save document before returning.
Args:
value (dict): The value to use for the document. Should conform to document_class's schema.
Returns:
Document instance.
"""
return self.document_class(value, collection=self)
def create(self, value):
[docs] """Create a document from a dict. Saves document before returning, and thus also sends pre- and post- save
signals.
Args:
value (dict): The value to use for the new document.
Returns:
Document instance, guaranteed to have been saved.
"""
if isinstance(value, list):
docs = [(self.document_class(v, collection=self) if not isinstance(v, self.document_class) else v) for v in value]
else:
docs = [self.document_class(value, collection=self)]
ret = self.save(docs, conflict="error")
if 'generated_keys' in ret:
for i, k in enumerate(ret['generated_keys']):
docs[i].id = k
if isinstance(value, list):
return docs
else:
return docs[0]
def validator(self, value):
[docs] """Override this method to do extra validation above and beyond a simple schema check.
Args:
value (Document): The value to validate.
Returns:
bool
Raises:
ValidationError if the document fails to validate.
"""
return True
def delete(self, docs=None, **kwargs):
[docs] """Delete a document or list of documents from the database.
Args:
docs (Document or [Document] or [primary_key]): List of documents to delete.
**kwargs: Passed to rethinkdb.delete
Returns:
The result of RethinkDB delete.
"""
if not docs:
return self.table.delete(**kwargs).run(self.application.connection)
if not isinstance(docs, list):
docs = [docs]
values = [v.id if isinstance(v, Document) else v for v in docs]
return self.table.get_all(*values).delete(**kwargs).run(self.application.connection)
def save(self, docs, **kwargs):
[docs] """Save a document or list of documents to the database.
Args:
docs (Document or [Document] or [dict]): List of documents to save.
**kwargs: Passed to rethinkdb.save
Returns:
The result of the RethinkDB save.
"""
if not isinstance(docs, list):
docs = [docs]
values = []
doc_signals.pre_save.send(self.document_class, docs=docs)
for value in docs:
if isinstance(value, Document):
value._saved = True
if self.file_storage:
self.file_storage.persist_document_files(value)
value = copy(value.obj)
self._to_json_repr(value)
jsonschema.validate(value, self.schema)
self.validator(value)
self._to_rql_repr(value)
values.append(value)
ret = self.table.insert(values, **kwargs).run(self.application.connection)
doc_signals.post_save.send(self.document_class, results=ret)
return ret
def json(self, docs):
[docs] pop = False
if not isinstance(docs, list):
docs = [docs]
pop = True
values = []
for value in docs:
if isinstance(value, Document):
v = copy(value.obj)
v['_url'] = value.url
value = v
self._to_json_repr(value)
values.append(value)
if pop:
return values[0]
else:
return values