"""Create web2py model (python code) to represent MS SQL Server tables. Features: * Uses ANSI Standard INFORMATION_SCHEMA (might work with other RDBMS) * Detects legacy "keyed" tables (not having an "id" PK) * Handles 'funny' column names. web2py requires all column names be valid python identifiers. This script uses rname * for column names that have spaces or are otherwise invalid python identifiers. * Connects directly to running databases, no need to do a SQL dump * Handles notnull, unique and referential constraints * Detects most common datatypes and default values * Supports running from the command line as well as from an IDE's debug menu. See the COMMAND_LINE_MODE constant below * for more info. Requirements: * Needs pyodbc python connector Created by Kyle Flanagan. Based on a script by Mariano Reingart which was based on a script to "generate schemas from dbs" (mysql) by Alexandre Andrade """ _author__ = "Kyle Flanagan " HELP = """ USAGE: extract_mssql_models db host port user passwd Call with SQL Server database connection parameters, web2py model will be printed on standard output. EXAMPLE: python extract_mssql_models.py mydb localhost 3306 kflanaga pass or python extract_mssql_models.py mydb localhost 3306 kflanaga pass > db_model.py """ # Config options DEBUG = False # print debug messages to STDERR SCHEMA = 'dbo' COMMAND_LINE_MODE = True # running from command prompt. Disable to specify variables and use in IDE # Only specify values below if not running from command line DB = None HOST = None USER = None PASSWD = None PORT = None # Constant for Field keyword parameter order (and filter): KWARGS = ('type', 'length', 'default', 'required', 'ondelete', 'notnull', 'unique', 'label', 'comment', 'rname') import sys import re # This is from pydal/helpers/regex.py as of 2016-06-16 # Use this to recognize if a field name need to have an rname representation REGEX_VALID_TB_FLD = re.compile(r'^[^\d_][_0-9a-zA-Z]*\Z') # For replacing invalid characters in field names INVALID_CHARS = re.compile(r'[^a-zA-Z0-9_]') def get_valid_column_name(field): """Return a valid column name that follows Python's rules for identifiers, which is what web2py requires for column names. Replaces invalid characters with underscores and leading digits with their associated English word.""" if not REGEX_VALID_TB_FLD.match(field): # If the first character is a digit, replace it with its word counterpart if re.match(r'^[0-9]', field): numbers = ['Zero', 'One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven', 'Eight', 'Nine'] field = numbers[int(field[0])] + field[1:] field = INVALID_CHARS.sub('_', field) return field def query(conn, sql, *args): "Execute a SQL query and return rows as a list of dicts" cur = conn.cursor() ret = [] try: if DEBUG: print >> sys.stderr, "QUERY: ", sql % args cur.execute(sql % args) for row in cur: dic = {} for i, value in enumerate(row): field = cur.description[i][0] dic[field] = value if DEBUG: print >> sys.stderr, "RET: ", dic ret.append(dic) return ret finally: cur.close() def get_tables(conn, schema=SCHEMA): "List table names in a given schema" rows = query(conn, """SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' ORDER BY table_name""", schema) return [row['table_name'] for row in rows] def get_fields(conn, table): "Retrieve field list for a given table" if DEBUG: print >> sys.stderr, "Processing TABLE", table rows = query(conn, """ SELECT column_name, data_type, is_nullable, character_maximum_length, numeric_precision, numeric_precision_radix, numeric_scale, column_default FROM information_schema.columns WHERE table_name='%s' ORDER BY ordinal_position""", table) return rows def define_field(conn, table, field, pks): "Determine field type, default value, references, etc." f = {} ref = references(conn, table, field['column_name']) if ref: f.update(ref) elif field['column_default'] and \ field['column_default'].startswith("nextval") and \ field['column_name'] in pks: f['type'] = "'id'" elif field['data_type'].startswith('character'): f['type'] = "'string'" if field['character_maximum_length']: f['length'] = field['character_maximum_length'] elif field['data_type'] in ('text', 'ntext'): f['type'] = "'text'" elif field['data_type'] in ('boolean', 'bit'): f['type'] = "'boolean'" elif field['data_type'] in ('tinyint', 'smallint', 'bigint', 'int'): f['type'] = "'integer'" elif field['data_type'] in ('real', 'float'): f['type'] = "'double'" elif field['data_type'] in ('datetime', 'datetime2', 'smalldatetime'): f['type'] = "'datetime'" elif field['data_type'] in ('timestamp',): f['type'] = "'datetime'" f['default'] = "request.now" f['update'] = "request.now" elif field['data_type'] in ('date',): f['type'] = "'date'" elif field['data_type'] in ('time',): f['type'] = "'time'" elif field['data_type'] in ('numeric', 'money', 'smallmoney', 'decimal'): f['type'] = "'decimal'" f['precision'] = field['numeric_precision'] f['scale'] = field['numeric_scale'] or 0 elif field['data_type'] in ('binary', 'varbinary', 'image'): f['type'] = "'blob'" elif field['data_type'] in ('point', 'lseg', 'polygon', 'unknown', 'USER-DEFINED', 'sql_variant'): f['type'] = "" # unsupported? elif field['data_type'] in ('varchar', 'char', 'nchar', 'nvarchar', 'uniqueidentifer'): f['type'] = "'string'" else: raise RuntimeError("Data Type not supported: %s " % str(field)) try: if field['column_default']: if field['column_default'] == "now()": d = "request.now" elif field['column_default'] == "true": d = "True" elif field['column_default'] == "false": d = "False" else: d = repr(eval(field['column_default'])) f['default'] = str(d) except (ValueError, SyntaxError): pass except Exception, e: raise RuntimeError("Default unsupported '%s'" % field['column_default']) if not field['is_nullable']: f['notnull'] = "True" # For field names that are not valid python identifiers, we need to add a reference to their actual name # in the back end database if not REGEX_VALID_TB_FLD.match(field['column_name']): f['rname'] = "'[%s]'" % field['column_name'] return f def is_unique(conn, table, field): "Find unique columns (incomplete support)" rows = query(conn, """ SELECT c.column_name FROM information_schema.table_constraints t INNER JOIN information_schema.constraint_column_usage c ON (t.CONSTRAINT_CATALOG = c.CONSTRAINT_CATALOG AND t.CONSTRAINT_NAME = c.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA AND t.TABLE_CATALOG = c.TABLE_CATALOG AND t.TABLE_NAME = c.TABLE_NAME AND t.TABLE_SCHEMA = c.TABLE_SCHEMA) WHERE t.table_name='%s' AND c.column_name='%s' AND t.constraint_type='UNIQUE' ;""", table, field['column_name']) return rows and True or False def primarykeys(conn, table): "Find primary keys" rows = query(conn, """ SELECT c.column_name FROM information_schema.table_constraints t INNER JOIN information_schema.constraint_column_usage c ON (t.CONSTRAINT_CATALOG = c.CONSTRAINT_CATALOG AND t.CONSTRAINT_NAME = c.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA AND t.TABLE_CATALOG = c.TABLE_CATALOG AND t.TABLE_NAME = c.TABLE_NAME AND t.TABLE_SCHEMA = c.TABLE_SCHEMA) WHERE t.table_name='%s' AND t.constraint_type='PRIMARY KEY' ;""", table) return [row['column_name'] for row in rows] def references(conn, table, field): "Find a FK (fails if multiple)" rows1 = query(conn, """ SELECT k.table_name, k.column_name, k.constraint_name, r.update_rule, r.delete_rule, k.ordinal_position FROM information_schema.key_column_usage k INNER JOIN information_schema.referential_constraints r ON (k.CONSTRAINT_CATALOG = r.CONSTRAINT_CATALOG AND k.CONSTRAINT_NAME = r.CONSTRAINT_NAME AND k.CONSTRAINT_SCHEMA = r.CONSTRAINT_SCHEMA) INNER JOIN information_schema.table_constraints t ON (r.CONSTRAINT_CATALOG = t.CONSTRAINT_CATALOG AND r.CONSTRAINT_NAME = t.CONSTRAINT_NAME AND r.CONSTRAINT_SCHEMA = t.CONSTRAINT_SCHEMA) WHERE k.table_name='%s' AND k.column_name='%s' AND t.constraint_type='FOREIGN KEY' ;""", table, field) if len(rows1) == 1: rows2 = query(conn, """ SELECT table_name, column_name, * FROM information_schema.constraint_column_usage WHERE constraint_name='%s' """, rows1[0]['constraint_name']) row = None if len(rows2) > 1: row = rows2[int(rows1[0]['ordinal_position']) - 1] keyed = True if len(rows2) == 1: row = rows2[0] keyed = False if row: if keyed: # THIS IS BAD, DON'T MIX "id" and primarykey!!! ref = {'type': "'reference %s.%s'" % (row['table_name'], row['column_name'])} else: ref = {'type': "'reference %s'" % (row['table_name'],)} if rows1[0]['delete_rule'] != "NO ACTION": ref['ondelete'] = repr(rows1[0]['delete_rule']) return ref elif rows2: raise RuntimeError("Unsupported foreign key reference: %s" % str(rows2)) elif rows1: raise RuntimeError("Unsupported referential constraint: %s" % str(rows1)) def define_table(conn, table): "Output single table definition" fields = get_fields(conn, table) pks = primarykeys(conn, table) print "db.define_table('%s'," % (table,) for field in fields: fname = field['column_name'] fdef = define_field(conn, table, field, pks) if fname not in pks and is_unique(conn, table, field): fdef['unique'] = "True" if fdef['type'] == "'id'" and fname in pks: pks.pop(pks.index(fname)) print " Field('%s', %s)," % (get_valid_column_name(fname), ', '.join(["%s=%s" % (k, fdef[k]) for k in KWARGS if k in fdef and fdef[k]])) if pks: print " primarykey=[%s]," % ", ".join(["'%s'" % pk for pk in pks]) print " migrate=migrate)" print def define_db(conn, db, host, port, user, passwd): "Output database definition (model)" dal = 'db = DAL("mssql4://%s:%s@%s:%s/%s", pool_size=10, decode_credentials=True)' print dal % ( user.replace('@', '%40').replace(':', '%3A'), passwd.replace('@', '%40').replace(':', '%3A'), host, port, db) print print "migrate = False" print for table in get_tables(conn): define_table(conn, table) if __name__ == "__main__": # Parse arguments from command line: if len(sys.argv) < 6 and COMMAND_LINE_MODE: print HELP else: # Parse arguments from command line: if COMMAND_LINE_MODE: db, host, port, user, passwd = sys.argv[1:6] else: db = DB host = HOST user = USER passwd = PASSWD port = PORT # Make the database connection (change driver if required) import pyodbc # cnn = pyodbc.connect(database=db, host=host, port=port, # user=user, password=passwd, # ) cnn = pyodbc.connect( r'DRIVER={{SQL Server Native Client 11.0}};SERVER={server};PORT={port};DATABASE={db};UID={user};PWD={passwd}'.format( server=host, port=port, db=db, user=user, passwd=passwd) ) # Start model code generation: define_db(cnn, db, host, port, user, passwd)