Source code for refugee.manager
"""
Manages the registration and execution of migrations
"""
from fnmatch import filter as file_filter
from imp import find_module, load_module
from os import makedirs, walk
from os.path import join as pjoin
from sqlalchemy import create_engine
from migration import (
Direction,
MigrationError,
migration_tmpl,
registry,
UnknowDirectionError,
)
def build_engine(connection_string):
return create_engine(connection_string)
configuration_tmpl = """\
[refugee]
# Adjust this url to your database
bind_url = sqlite:///:memory:
migration_home = {path}
"""
#XXX:dc: think about using entry points for registering migrations as well?
migration_registry = registry()
[docs]def register(migration_cls):
"""
Inserts migration into registry
"""
global migration_registry
migration_registry.update({migration_cls.name: migration_cls})
return migration_cls
class MigrationManager(object):
engine = None
configured = False
def collect(self):
# Walk self.migration_home and return all potential modules
for root, dirname, files in walk(self.migration_home):
for file_name in file_filter(files, "*.py"):
file_name = file_name.replace('.py', '')
file = None
try:
if file_name == '__init__':
continue
file, pathname, description = find_module(
file_name, [root])
load_module(file_name, file, pathname, description)
finally:
if file is not None:
file.close()
def configure(self, config):
self.engine = build_engine(config.get('bind_url'))
self.migration_home = config.get('migration_home')
self.configured = True
def new(self, name):
"""
Build a stub migration with name + auto-id in config['migration_home']
There is no guarantee this id will be unique for all remote migration
configurations. Conflicts will require manual management.
"""
#XXX:dc: assert that the name is somewhat sane and follows python
# naming conventions
next_id = 0
cls_name = '_'.join((name, next_id))
with open(pjoin(self.migration_home, name), "w+") as new_migration:
print >> new_migration, migration_tmpl.format(
cls_name=cls_name, migration_name=name)
def init(self, directory):
path = pjoin(directory, 'migrations')
makedirs(path)
with open(pjoin(directory, 'refugee.ini'), 'w+') as conf:
print >> conf, configuration_tmpl.format(path=path)
def list(self):
self.collect()
for k in migration_registry.keys():
print k
def run_all(self, direction):
for key in sorted(migration_registry.keys):
self.run(key, direction)
def run(self, key, direction):
if not self.engine:
raise AttributeError("No engine configured for MigrationManager")
connection = self.engine.connect()
trans = connection.begin()
try:
migration = migration_registry[key]()
if migration.preflight():
trans = connection.begin()
if direction == Direction.UP:
migration.up(connection)
elif direction == Direction.DOWN:
migration.down(connection)
else:
raise UnknowDirectionError
if migration.check():
trans.commit()
else:
raise MigrationError("Migration failed consistency checks")
except Exception, e:
trans.rollback()
#XXX:dc: do more to introspect why we failed
raise e
migration_manager = MigrationManager()