# -*- coding: utf-8 -*- # PyPyODBC is develped from RealPyODBC 0.1 beta released in 2004 by Michele Petrazzo. Thanks Michele. # The MIT License (MIT) # # Copyright (c) 2014 Henry Zhou and PyPyODBC contributors # Copyright (c) 2004 Michele Petrazzo # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated # documentation files (the "Software"), to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all copies or substantial portions # of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO # THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF # CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. pooling = True apilevel = '2.0' paramstyle = 'qmark' threadsafety = 1 version = '1.3.3' lowercase=True DEBUG = 0 # Comment out all "if DEBUG:" statements like below for production #if DEBUG:print 'DEBUGGING' import sys, os, datetime, ctypes, threading from decimal import Decimal py_ver = sys.version[:3] py_v3 = py_ver >= '3.0' if py_v3: long = int unicode = str str_8b = bytes buffer = memoryview BYTE_1 = bytes('1','ascii') use_unicode = True else: str_8b = str BYTE_1 = '1' use_unicode = False if py_ver < '2.6': bytearray = str if not hasattr(ctypes, 'c_ssize_t'): if ctypes.sizeof(ctypes.c_uint) == ctypes.sizeof(ctypes.c_void_p): ctypes.c_ssize_t = ctypes.c_int elif ctypes.sizeof(ctypes.c_ulong) == ctypes.sizeof(ctypes.c_void_p): ctypes.c_ssize_t = ctypes.c_long elif ctypes.sizeof(ctypes.c_ulonglong) == ctypes.sizeof(ctypes.c_void_p): ctypes.c_ssize_t = ctypes.c_longlong lock = threading.Lock() shared_env_h = None SQLWCHAR_SIZE = ctypes.sizeof(ctypes.c_wchar) #determin the size of Py_UNICODE #sys.maxunicode > 65536 and 'UCS4' or 'UCS2' UNICODE_SIZE = sys.maxunicode > 65536 and 4 or 2 # Define ODBC constants. They are widly used in ODBC documents and programs # They are defined in cpp header files: sql.h sqlext.h sqltypes.h sqlucode.h # and you can get these files from the mingw32-runtime_3.13-1_all.deb package SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC2, SQL_OV_ODBC3 = 200, 2, 3 SQL_DRIVER_NOPROMPT = 0 SQL_ATTR_CONNECTION_POOLING = 201; SQL_CP_ONE_PER_HENV = 2 SQL_FETCH_NEXT, SQL_FETCH_FIRST, SQL_FETCH_LAST = 0x01, 0x02, 0x04 SQL_NULL_HANDLE, SQL_HANDLE_ENV, SQL_HANDLE_DBC, SQL_HANDLE_STMT = 0, 1, 2, 3 SQL_SUCCESS, SQL_SUCCESS_WITH_INFO, SQL_ERROR = 0, 1, -1 SQL_NO_DATA = 100; SQL_NO_TOTAL = -4 SQL_ATTR_ACCESS_MODE = SQL_ACCESS_MODE = 101 SQL_ATTR_AUTOCOMMIT = SQL_AUTOCOMMIT = 102 SQL_MODE_DEFAULT = SQL_MODE_READ_WRITE = 0; SQL_MODE_READ_ONLY = 1 SQL_AUTOCOMMIT_OFF, SQL_AUTOCOMMIT_ON = 0, 1 SQL_IS_UINTEGER = -5 SQL_ATTR_LOGIN_TIMEOUT = 103; SQL_ATTR_CONNECTION_TIMEOUT = 113;SQL_ATTR_QUERY_TIMEOUT = 0 SQL_COMMIT, SQL_ROLLBACK = 0, 1 SQL_INDEX_UNIQUE,SQL_INDEX_ALL = 0,1 SQL_QUICK,SQL_ENSURE = 0,1 SQL_FETCH_NEXT = 1 SQL_COLUMN_DISPLAY_SIZE = 6 SQL_INVALID_HANDLE = -2 SQL_NO_DATA_FOUND = 100; SQL_NULL_DATA = -1; SQL_NTS = -3 SQL_HANDLE_DESCR = 4 SQL_TABLE_NAMES = 3 SQL_PARAM_INPUT = 1; SQL_PARAM_INPUT_OUTPUT = 2 SQL_PARAM_TYPE_UNKNOWN = 0 SQL_RESULT_COL = 3 SQL_PARAM_OUTPUT = 4 SQL_RETURN_VALUE = 5 SQL_PARAM_TYPE_DEFAULT = SQL_PARAM_INPUT_OUTPUT SQL_RESET_PARAMS = 3 SQL_UNBIND = 2 SQL_CLOSE = 0 # Below defines The constants for sqlgetinfo method, and their coresponding return types SQL_QUALIFIER_LOCATION = 114 SQL_QUALIFIER_NAME_SEPARATOR = 41 SQL_QUALIFIER_TERM = 42 SQL_QUALIFIER_USAGE = 92 SQL_OWNER_TERM = 39 SQL_OWNER_USAGE = 91 SQL_ACCESSIBLE_PROCEDURES = 20 SQL_ACCESSIBLE_TABLES = 19 SQL_ACTIVE_ENVIRONMENTS = 116 SQL_AGGREGATE_FUNCTIONS = 169 SQL_ALTER_DOMAIN = 117 SQL_ALTER_TABLE = 86 SQL_ASYNC_MODE = 10021 SQL_BATCH_ROW_COUNT = 120 SQL_BATCH_SUPPORT = 121 SQL_BOOKMARK_PERSISTENCE = 82 SQL_CATALOG_LOCATION = SQL_QUALIFIER_LOCATION SQL_CATALOG_NAME = 10003 SQL_CATALOG_NAME_SEPARATOR = SQL_QUALIFIER_NAME_SEPARATOR SQL_CATALOG_TERM = SQL_QUALIFIER_TERM SQL_CATALOG_USAGE = SQL_QUALIFIER_USAGE SQL_COLLATION_SEQ = 10004 SQL_COLUMN_ALIAS = 87 SQL_CONCAT_NULL_BEHAVIOR = 22 SQL_CONVERT_FUNCTIONS = 48 SQL_CONVERT_VARCHAR = 70 SQL_CORRELATION_NAME = 74 SQL_CREATE_ASSERTION = 127 SQL_CREATE_CHARACTER_SET = 128 SQL_CREATE_COLLATION = 129 SQL_CREATE_DOMAIN = 130 SQL_CREATE_SCHEMA = 131 SQL_CREATE_TABLE = 132 SQL_CREATE_TRANSLATION = 133 SQL_CREATE_VIEW = 134 SQL_CURSOR_COMMIT_BEHAVIOR = 23 SQL_CURSOR_ROLLBACK_BEHAVIOR = 24 SQL_DATABASE_NAME = 16 SQL_DATA_SOURCE_NAME = 2 SQL_DATA_SOURCE_READ_ONLY = 25 SQL_DATETIME_LITERALS = 119 SQL_DBMS_NAME = 17 SQL_DBMS_VER = 18 SQL_DDL_INDEX = 170 SQL_DEFAULT_TXN_ISOLATION = 26 SQL_DESCRIBE_PARAMETER = 10002 SQL_DM_VER = 171 SQL_DRIVER_NAME = 6 SQL_DRIVER_ODBC_VER = 77 SQL_DRIVER_VER = 7 SQL_DROP_ASSERTION = 136 SQL_DROP_CHARACTER_SET = 137 SQL_DROP_COLLATION = 138 SQL_DROP_DOMAIN = 139 SQL_DROP_SCHEMA = 140 SQL_DROP_TABLE = 141 SQL_DROP_TRANSLATION = 142 SQL_DROP_VIEW = 143 SQL_DYNAMIC_CURSOR_ATTRIBUTES1 = 144 SQL_DYNAMIC_CURSOR_ATTRIBUTES2 = 145 SQL_EXPRESSIONS_IN_ORDERBY = 27 SQL_FILE_USAGE = 84 SQL_FORWARD_ONLY_CURSOR_ATTRIBUTES1 = 146 SQL_FORWARD_ONLY_CURSOR_ATTRIBUTES2 = 147 SQL_GETDATA_EXTENSIONS = 81 SQL_GROUP_BY = 88 SQL_IDENTIFIER_CASE = 28 SQL_IDENTIFIER_QUOTE_CHAR = 29 SQL_INDEX_KEYWORDS = 148 SQL_INFO_SCHEMA_VIEWS = 149 SQL_INSERT_STATEMENT = 172 SQL_INTEGRITY = 73 SQL_KEYSET_CURSOR_ATTRIBUTES1 = 150 SQL_KEYSET_CURSOR_ATTRIBUTES2 = 151 SQL_KEYWORDS = 89 SQL_LIKE_ESCAPE_CLAUSE = 113 SQL_MAX_ASYNC_CONCURRENT_STATEMENTS = 10022 SQL_MAX_BINARY_LITERAL_LEN = 112 SQL_MAX_CATALOG_NAME_LEN = 34 SQL_MAX_CHAR_LITERAL_LEN = 108 SQL_MAX_COLUMNS_IN_GROUP_BY = 97 SQL_MAX_COLUMNS_IN_INDEX = 98 SQL_MAX_COLUMNS_IN_ORDER_BY = 99 SQL_MAX_COLUMNS_IN_SELECT = 100 SQL_MAX_COLUMNS_IN_TABLE = 101 SQL_MAX_COLUMN_NAME_LEN = 30 SQL_MAX_CONCURRENT_ACTIVITIES = 1 SQL_MAX_CURSOR_NAME_LEN = 31 SQL_MAX_DRIVER_CONNECTIONS = 0 SQL_MAX_IDENTIFIER_LEN = 10005 SQL_MAX_INDEX_SIZE = 102 SQL_MAX_PROCEDURE_NAME_LEN = 33 SQL_MAX_ROW_SIZE = 104 SQL_MAX_ROW_SIZE_INCLUDES_LONG = 103 SQL_MAX_SCHEMA_NAME_LEN = 32 SQL_MAX_STATEMENT_LEN = 105 SQL_MAX_TABLES_IN_SELECT = 106 SQL_MAX_TABLE_NAME_LEN = 35 SQL_MAX_USER_NAME_LEN = 107 SQL_MULTIPLE_ACTIVE_TXN = 37 SQL_MULT_RESULT_SETS = 36 SQL_NEED_LONG_DATA_LEN = 111 SQL_NON_NULLABLE_COLUMNS = 75 SQL_NULL_COLLATION = 85 SQL_NUMERIC_FUNCTIONS = 49 SQL_ODBC_INTERFACE_CONFORMANCE = 152 SQL_ODBC_VER = 10 SQL_OJ_CAPABILITIES = 65003 SQL_ORDER_BY_COLUMNS_IN_SELECT = 90 SQL_PARAM_ARRAY_ROW_COUNTS = 153 SQL_PARAM_ARRAY_SELECTS = 154 SQL_PROCEDURES = 21 SQL_PROCEDURE_TERM = 40 SQL_QUOTED_IDENTIFIER_CASE = 93 SQL_ROW_UPDATES = 11 SQL_SCHEMA_TERM = SQL_OWNER_TERM SQL_SCHEMA_USAGE = SQL_OWNER_USAGE SQL_SCROLL_OPTIONS = 44 SQL_SEARCH_PATTERN_ESCAPE = 14 SQL_SERVER_NAME = 13 SQL_SPECIAL_CHARACTERS = 94 SQL_SQL92_DATETIME_FUNCTIONS = 155 SQL_SQL92_FOREIGN_KEY_DELETE_RULE = 156 SQL_SQL92_FOREIGN_KEY_UPDATE_RULE = 157 SQL_SQL92_GRANT = 158 SQL_SQL92_NUMERIC_VALUE_FUNCTIONS = 159 SQL_SQL92_PREDICATES = 160 SQL_SQL92_RELATIONAL_JOIN_OPERATORS = 161 SQL_SQL92_REVOKE = 162 SQL_SQL92_ROW_VALUE_CONSTRUCTOR = 163 SQL_SQL92_STRING_FUNCTIONS = 164 SQL_SQL92_VALUE_EXPRESSIONS = 165 SQL_SQL_CONFORMANCE = 118 SQL_STANDARD_CLI_CONFORMANCE = 166 SQL_STATIC_CURSOR_ATTRIBUTES1 = 167 SQL_STATIC_CURSOR_ATTRIBUTES2 = 168 SQL_STRING_FUNCTIONS = 50 SQL_SUBQUERIES = 95 SQL_SYSTEM_FUNCTIONS = 51 SQL_TABLE_TERM = 45 SQL_TIMEDATE_ADD_INTERVALS = 109 SQL_TIMEDATE_DIFF_INTERVALS = 110 SQL_TIMEDATE_FUNCTIONS = 52 SQL_TXN_CAPABLE = 46 SQL_TXN_ISOLATION_OPTION = 72 SQL_UNION = 96 SQL_USER_NAME = 47 SQL_XOPEN_CLI_YEAR = 10000 aInfoTypes = { SQL_ACCESSIBLE_PROCEDURES : 'GI_YESNO',SQL_ACCESSIBLE_TABLES : 'GI_YESNO',SQL_ACTIVE_ENVIRONMENTS : 'GI_USMALLINT', SQL_AGGREGATE_FUNCTIONS : 'GI_UINTEGER',SQL_ALTER_DOMAIN : 'GI_UINTEGER', SQL_ALTER_TABLE : 'GI_UINTEGER',SQL_ASYNC_MODE : 'GI_UINTEGER',SQL_BATCH_ROW_COUNT : 'GI_UINTEGER', SQL_BATCH_SUPPORT : 'GI_UINTEGER',SQL_BOOKMARK_PERSISTENCE : 'GI_UINTEGER',SQL_CATALOG_LOCATION : 'GI_USMALLINT', SQL_CATALOG_NAME : 'GI_YESNO',SQL_CATALOG_NAME_SEPARATOR : 'GI_STRING',SQL_CATALOG_TERM : 'GI_STRING', SQL_CATALOG_USAGE : 'GI_UINTEGER',SQL_COLLATION_SEQ : 'GI_STRING',SQL_COLUMN_ALIAS : 'GI_YESNO', SQL_CONCAT_NULL_BEHAVIOR : 'GI_USMALLINT',SQL_CONVERT_FUNCTIONS : 'GI_UINTEGER',SQL_CONVERT_VARCHAR : 'GI_UINTEGER', SQL_CORRELATION_NAME : 'GI_USMALLINT',SQL_CREATE_ASSERTION : 'GI_UINTEGER',SQL_CREATE_CHARACTER_SET : 'GI_UINTEGER', SQL_CREATE_COLLATION : 'GI_UINTEGER',SQL_CREATE_DOMAIN : 'GI_UINTEGER',SQL_CREATE_SCHEMA : 'GI_UINTEGER', SQL_CREATE_TABLE : 'GI_UINTEGER',SQL_CREATE_TRANSLATION : 'GI_UINTEGER',SQL_CREATE_VIEW : 'GI_UINTEGER', SQL_CURSOR_COMMIT_BEHAVIOR : 'GI_USMALLINT',SQL_CURSOR_ROLLBACK_BEHAVIOR : 'GI_USMALLINT',SQL_DATABASE_NAME : 'GI_STRING', SQL_DATA_SOURCE_NAME : 'GI_STRING',SQL_DATA_SOURCE_READ_ONLY : 'GI_YESNO',SQL_DATETIME_LITERALS : 'GI_UINTEGER', SQL_DBMS_NAME : 'GI_STRING',SQL_DBMS_VER : 'GI_STRING',SQL_DDL_INDEX : 'GI_UINTEGER', SQL_DEFAULT_TXN_ISOLATION : 'GI_UINTEGER',SQL_DESCRIBE_PARAMETER : 'GI_YESNO',SQL_DM_VER : 'GI_STRING', SQL_DRIVER_NAME : 'GI_STRING',SQL_DRIVER_ODBC_VER : 'GI_STRING',SQL_DRIVER_VER : 'GI_STRING',SQL_DROP_ASSERTION : 'GI_UINTEGER', SQL_DROP_CHARACTER_SET : 'GI_UINTEGER', SQL_DROP_COLLATION : 'GI_UINTEGER',SQL_DROP_DOMAIN : 'GI_UINTEGER', SQL_DROP_SCHEMA : 'GI_UINTEGER',SQL_DROP_TABLE : 'GI_UINTEGER',SQL_DROP_TRANSLATION : 'GI_UINTEGER', SQL_DROP_VIEW : 'GI_UINTEGER',SQL_DYNAMIC_CURSOR_ATTRIBUTES1 : 'GI_UINTEGER',SQL_DYNAMIC_CURSOR_ATTRIBUTES2 : 'GI_UINTEGER', SQL_EXPRESSIONS_IN_ORDERBY : 'GI_YESNO',SQL_FILE_USAGE : 'GI_USMALLINT', SQL_FORWARD_ONLY_CURSOR_ATTRIBUTES1 : 'GI_UINTEGER',SQL_FORWARD_ONLY_CURSOR_ATTRIBUTES2 : 'GI_UINTEGER', SQL_GETDATA_EXTENSIONS : 'GI_UINTEGER',SQL_GROUP_BY : 'GI_USMALLINT',SQL_IDENTIFIER_CASE : 'GI_USMALLINT', SQL_IDENTIFIER_QUOTE_CHAR : 'GI_STRING',SQL_INDEX_KEYWORDS : 'GI_UINTEGER',SQL_INFO_SCHEMA_VIEWS : 'GI_UINTEGER', SQL_INSERT_STATEMENT : 'GI_UINTEGER',SQL_INTEGRITY : 'GI_YESNO',SQL_KEYSET_CURSOR_ATTRIBUTES1 : 'GI_UINTEGER', SQL_KEYSET_CURSOR_ATTRIBUTES2 : 'GI_UINTEGER',SQL_KEYWORDS : 'GI_STRING', SQL_LIKE_ESCAPE_CLAUSE : 'GI_YESNO',SQL_MAX_ASYNC_CONCURRENT_STATEMENTS : 'GI_UINTEGER', SQL_MAX_BINARY_LITERAL_LEN : 'GI_UINTEGER',SQL_MAX_CATALOG_NAME_LEN : 'GI_USMALLINT', SQL_MAX_CHAR_LITERAL_LEN : 'GI_UINTEGER',SQL_MAX_COLUMNS_IN_GROUP_BY : 'GI_USMALLINT', SQL_MAX_COLUMNS_IN_INDEX : 'GI_USMALLINT',SQL_MAX_COLUMNS_IN_ORDER_BY : 'GI_USMALLINT', SQL_MAX_COLUMNS_IN_SELECT : 'GI_USMALLINT',SQL_MAX_COLUMNS_IN_TABLE : 'GI_USMALLINT', SQL_MAX_COLUMN_NAME_LEN : 'GI_USMALLINT',SQL_MAX_CONCURRENT_ACTIVITIES : 'GI_USMALLINT', SQL_MAX_CURSOR_NAME_LEN : 'GI_USMALLINT',SQL_MAX_DRIVER_CONNECTIONS : 'GI_USMALLINT', SQL_MAX_IDENTIFIER_LEN : 'GI_USMALLINT',SQL_MAX_INDEX_SIZE : 'GI_UINTEGER', SQL_MAX_PROCEDURE_NAME_LEN : 'GI_USMALLINT',SQL_MAX_ROW_SIZE : 'GI_UINTEGER', SQL_MAX_ROW_SIZE_INCLUDES_LONG : 'GI_YESNO',SQL_MAX_SCHEMA_NAME_LEN : 'GI_USMALLINT', SQL_MAX_STATEMENT_LEN : 'GI_UINTEGER',SQL_MAX_TABLES_IN_SELECT : 'GI_USMALLINT', SQL_MAX_TABLE_NAME_LEN : 'GI_USMALLINT',SQL_MAX_USER_NAME_LEN : 'GI_USMALLINT', SQL_MULTIPLE_ACTIVE_TXN : 'GI_YESNO',SQL_MULT_RESULT_SETS : 'GI_YESNO', SQL_NEED_LONG_DATA_LEN : 'GI_YESNO',SQL_NON_NULLABLE_COLUMNS : 'GI_USMALLINT', SQL_NULL_COLLATION : 'GI_USMALLINT',SQL_NUMERIC_FUNCTIONS : 'GI_UINTEGER', SQL_ODBC_INTERFACE_CONFORMANCE : 'GI_UINTEGER',SQL_ODBC_VER : 'GI_STRING',SQL_OJ_CAPABILITIES : 'GI_UINTEGER', SQL_ORDER_BY_COLUMNS_IN_SELECT : 'GI_YESNO',SQL_PARAM_ARRAY_ROW_COUNTS : 'GI_UINTEGER', SQL_PARAM_ARRAY_SELECTS : 'GI_UINTEGER',SQL_PROCEDURES : 'GI_YESNO',SQL_PROCEDURE_TERM : 'GI_STRING', SQL_QUOTED_IDENTIFIER_CASE : 'GI_USMALLINT',SQL_ROW_UPDATES : 'GI_YESNO',SQL_SCHEMA_TERM : 'GI_STRING', SQL_SCHEMA_USAGE : 'GI_UINTEGER',SQL_SCROLL_OPTIONS : 'GI_UINTEGER',SQL_SEARCH_PATTERN_ESCAPE : 'GI_STRING', SQL_SERVER_NAME : 'GI_STRING',SQL_SPECIAL_CHARACTERS : 'GI_STRING',SQL_SQL92_DATETIME_FUNCTIONS : 'GI_UINTEGER', SQL_SQL92_FOREIGN_KEY_DELETE_RULE : 'GI_UINTEGER',SQL_SQL92_FOREIGN_KEY_UPDATE_RULE : 'GI_UINTEGER', SQL_SQL92_GRANT : 'GI_UINTEGER',SQL_SQL92_NUMERIC_VALUE_FUNCTIONS : 'GI_UINTEGER', SQL_SQL92_PREDICATES : 'GI_UINTEGER',SQL_SQL92_RELATIONAL_JOIN_OPERATORS : 'GI_UINTEGER', SQL_SQL92_REVOKE : 'GI_UINTEGER',SQL_SQL92_ROW_VALUE_CONSTRUCTOR : 'GI_UINTEGER', SQL_SQL92_STRING_FUNCTIONS : 'GI_UINTEGER',SQL_SQL92_VALUE_EXPRESSIONS : 'GI_UINTEGER', SQL_SQL_CONFORMANCE : 'GI_UINTEGER',SQL_STANDARD_CLI_CONFORMANCE : 'GI_UINTEGER', SQL_STATIC_CURSOR_ATTRIBUTES1 : 'GI_UINTEGER',SQL_STATIC_CURSOR_ATTRIBUTES2 : 'GI_UINTEGER', SQL_STRING_FUNCTIONS : 'GI_UINTEGER',SQL_SUBQUERIES : 'GI_UINTEGER', SQL_SYSTEM_FUNCTIONS : 'GI_UINTEGER',SQL_TABLE_TERM : 'GI_STRING',SQL_TIMEDATE_ADD_INTERVALS : 'GI_UINTEGER', SQL_TIMEDATE_DIFF_INTERVALS : 'GI_UINTEGER',SQL_TIMEDATE_FUNCTIONS : 'GI_UINTEGER', SQL_TXN_CAPABLE : 'GI_USMALLINT',SQL_TXN_ISOLATION_OPTION : 'GI_UINTEGER', SQL_UNION : 'GI_UINTEGER',SQL_USER_NAME : 'GI_STRING',SQL_XOPEN_CLI_YEAR : 'GI_STRING', } #Definations for types BINARY = bytearray Binary = bytearray DATETIME = datetime.datetime Date = datetime.date Time = datetime.time Timestamp = datetime.datetime STRING = str NUMBER = float ROWID = int DateFromTicks = datetime.date.fromtimestamp TimeFromTicks = lambda x: datetime.datetime.fromtimestamp(x).time() TimestampFromTicks = datetime.datetime.fromtimestamp #Define exceptions class OdbcNoLibrary(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class OdbcLibraryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class OdbcInvalidHandle(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class OdbcGenericError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class Warning(Exception): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class Error(Exception): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class InterfaceError(Error): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class DatabaseError(Error): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class InternalError(DatabaseError): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class ProgrammingError(DatabaseError): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class DataError(DatabaseError): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class IntegrityError(DatabaseError): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class NotSupportedError(Error): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) class OperationalError(DatabaseError): def __init__(self, error_code, error_desc): self.value = (error_code, error_desc) self.args = (error_code, error_desc) ############################################################################ # # Find the ODBC library on the platform and connect to it using ctypes # ############################################################################ # Get the References of the platform's ODBC functions via ctypes odbc_decoding = 'utf_16' odbc_encoding = 'utf_16_le' ucs_length = 2 if sys.platform in ('win32','cli'): ODBC_API = ctypes.windll.odbc32 # On Windows, the size of SQLWCHAR is hardcoded to 2-bytes. SQLWCHAR_SIZE = ctypes.sizeof(ctypes.c_ushort) else: # Set load the library on linux try: # First try direct loading libodbc.so ODBC_API = ctypes.cdll.LoadLibrary('libodbc.so') except: # If direct loading libodbc.so failed # We try finding the libodbc.so by using find_library from ctypes.util import find_library library = find_library('odbc') if library is None: # If find_library still can not find the library # we try finding it manually from where libodbc.so usually appears lib_paths = ("/usr/lib/libodbc.so","/usr/lib/i386-linux-gnu/libodbc.so","/usr/lib/x86_64-linux-gnu/libodbc.so","/usr/lib/libiodbc.dylib") lib_paths = [path for path in lib_paths if os.path.exists(path)] if len(lib_paths) == 0 : raise OdbcNoLibrary('ODBC Library is not found. Is LD_LIBRARY_PATH set?') else: library = lib_paths[0] # Then we try loading the found libodbc.so again try: ODBC_API = ctypes.cdll.LoadLibrary(library) except: # If still fail loading, abort. raise OdbcLibraryError('Error while loading ' + library) # only iODBC uses utf-32 / UCS4 encoding data, others normally use utf-16 / UCS2 # So we set those for handling. if 'libiodbc.dylib' in library: odbc_decoding = 'utf_32' odbc_encoding = 'utf_32_le' ucs_length = 4 # unixODBC defaults to 2-bytes SQLWCHAR, unless "-DSQL_WCHART_CONVERT" was # added to CFLAGS, in which case it will be the size of wchar_t. # Note that using 4-bytes SQLWCHAR will break most ODBC drivers, as driver # development mostly targets the Windows platform. if py_v3: from subprocess import getstatusoutput else: from commands import getstatusoutput status, output = getstatusoutput('odbc_config --cflags') if status == 0 and 'SQL_WCHART_CONVERT' in output: SQLWCHAR_SIZE = ctypes.sizeof(ctypes.c_wchar) else: SQLWCHAR_SIZE = ctypes.sizeof(ctypes.c_ushort) create_buffer_u = ctypes.create_unicode_buffer create_buffer = ctypes.create_string_buffer wchar_pointer = ctypes.c_wchar_p UCS_buf = lambda s: s def UCS_dec(buffer): i = 0 uchars = [] while True: uchar = buffer.raw[i:i + ucs_length].decode(odbc_decoding) if uchar == unicode('\x00'): break uchars.append(uchar) i += ucs_length return ''.join(uchars) from_buffer_u = lambda buffer: buffer.value # This is the common case on Linux, which uses wide Python build together with # the default unixODBC without the "-DSQL_WCHART_CONVERT" CFLAGS. if sys.platform not in ('win32','cli'): if UNICODE_SIZE >= SQLWCHAR_SIZE: # We can only use unicode buffer if the size of wchar_t (UNICODE_SIZE) is # the same as the size expected by the driver manager (SQLWCHAR_SIZE). create_buffer_u = create_buffer wchar_pointer = ctypes.c_char_p def UCS_buf(s): return s.encode(odbc_encoding) from_buffer_u = UCS_dec # Exoteric case, don't really care. elif UNICODE_SIZE < SQLWCHAR_SIZE: raise OdbcLibraryError('Using narrow Python build with ODBC library ' 'expecting wide unicode is not supported.') ############################################################ # Database value to Python data type mappings SQL_TYPE_NULL = 0 SQL_DECIMAL = 3 SQL_FLOAT = 6 SQL_DATE = 9 SQL_TIME = 10 SQL_TIMESTAMP = 11 SQL_VARCHAR = 12 SQL_LONGVARCHAR = -1 SQL_VARBINARY = -3 SQL_LONGVARBINARY = -4 SQL_BIGINT = -5 SQL_WVARCHAR = -9 SQL_WLONGVARCHAR = -10 SQL_ALL_TYPES = 0 SQL_SIGNED_OFFSET = -20 SQL_SS_VARIANT = -150 SQL_SS_UDT = -151 SQL_SS_XML = -152 SQL_SS_TIME2 = -154 SQL_C_CHAR = SQL_CHAR = 1 SQL_C_NUMERIC = SQL_NUMERIC = 2 SQL_C_LONG = SQL_INTEGER = 4 SQL_C_SLONG = SQL_C_LONG + SQL_SIGNED_OFFSET SQL_C_SHORT = SQL_SMALLINT = 5 SQL_C_FLOAT = SQL_REAL = 7 SQL_C_DOUBLE = SQL_DOUBLE = 8 SQL_C_TYPE_DATE = SQL_TYPE_DATE = 91 SQL_C_TYPE_TIME = SQL_TYPE_TIME = 92 SQL_C_BINARY = SQL_BINARY = -2 SQL_C_SBIGINT = SQL_BIGINT + SQL_SIGNED_OFFSET SQL_C_TINYINT = SQL_TINYINT = -6 SQL_C_BIT = SQL_BIT = -7 SQL_C_WCHAR = SQL_WCHAR = -8 SQL_C_GUID = SQL_GUID = -11 SQL_C_TYPE_TIMESTAMP = SQL_TYPE_TIMESTAMP = 93 SQL_C_DEFAULT = 99 SQL_DESC_DISPLAY_SIZE = SQL_COLUMN_DISPLAY_SIZE def dttm_cvt(x): if py_v3: x = x.decode('ascii') if x == '': return None x = x.ljust(26,'0') return datetime.datetime(int(x[0:4]),int(x[5:7]),int(x[8:10]),int(x[10:13]),int(x[14:16]),int(x[17:19]),int(x[20:26])) def tm_cvt(x): if py_v3: x = x.decode('ascii') if x == '': return None x = x.ljust(15,'0') return datetime.time(int(x[0:2]),int(x[3:5]),int(x[6:8]),int(x[9:15])) def dt_cvt(x): if py_v3: x = x.decode('ascii') if x == '': return None else:return datetime.date(int(x[0:4]),int(x[5:7]),int(x[8:10])) def Decimal_cvt(x): if py_v3: x = x.decode('ascii') return Decimal(x) bytearray_cvt = bytearray if sys.platform == 'cli': bytearray_cvt = lambda x: bytearray(buffer(x)) # Below Datatype mappings referenced the document at # http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.sdk_12.5.1.aseodbc/html/aseodbc/CACFDIGH.htm SQL_data_type_dict = { \ #SQL Data TYPE 0.Python Data Type 1.Default Output Converter 2.Buffer Type 3.Buffer Allocator 4.Default Size 5.Variable Length SQL_TYPE_NULL : (None, lambda x: None, SQL_C_CHAR, create_buffer, 2 , False ), SQL_CHAR : (str, lambda x: x, SQL_C_CHAR, create_buffer, 2048 , False ), SQL_NUMERIC : (Decimal, Decimal_cvt, SQL_C_CHAR, create_buffer, 150 , False ), SQL_DECIMAL : (Decimal, Decimal_cvt, SQL_C_CHAR, create_buffer, 150 , False ), SQL_INTEGER : (int, int, SQL_C_CHAR, create_buffer, 150 , False ), SQL_SMALLINT : (int, int, SQL_C_CHAR, create_buffer, 150 , False ), SQL_FLOAT : (float, float, SQL_C_CHAR, create_buffer, 150 , False ), SQL_REAL : (float, float, SQL_C_CHAR, create_buffer, 150 , False ), SQL_DOUBLE : (float, float, SQL_C_CHAR, create_buffer, 200 , False ), SQL_DATE : (datetime.date, dt_cvt, SQL_C_CHAR, create_buffer, 30 , False ), SQL_TIME : (datetime.time, tm_cvt, SQL_C_CHAR, create_buffer, 20 , False ), SQL_SS_TIME2 : (datetime.time, tm_cvt, SQL_C_CHAR, create_buffer, 20 , False ), SQL_TIMESTAMP : (datetime.datetime, dttm_cvt, SQL_C_CHAR, create_buffer, 30 , False ), SQL_VARCHAR : (str, lambda x: x, SQL_C_CHAR, create_buffer, 2048 , False ), SQL_LONGVARCHAR : (str, lambda x: x, SQL_C_CHAR, create_buffer, 20500 , True ), SQL_BINARY : (bytearray, bytearray_cvt, SQL_C_BINARY, create_buffer, 5120 , True ), SQL_VARBINARY : (bytearray, bytearray_cvt, SQL_C_BINARY, create_buffer, 5120 , True ), SQL_LONGVARBINARY : (bytearray, bytearray_cvt, SQL_C_BINARY, create_buffer, 20500 , True ), SQL_BIGINT : (long, long, SQL_C_CHAR, create_buffer, 150 , False ), SQL_TINYINT : (int, int, SQL_C_CHAR, create_buffer, 150 , False ), SQL_BIT : (bool, lambda x:x == BYTE_1, SQL_C_CHAR, create_buffer, 2 , False ), SQL_WCHAR : (unicode, lambda x: x, SQL_C_WCHAR, create_buffer_u, 2048 , False ), SQL_WVARCHAR : (unicode, lambda x: x, SQL_C_WCHAR, create_buffer_u, 2048 , False ), SQL_GUID : (str, str, SQL_C_CHAR, create_buffer, 2048 , False ), SQL_WLONGVARCHAR : (unicode, lambda x: x, SQL_C_WCHAR, create_buffer_u, 20500 , True ), SQL_TYPE_DATE : (datetime.date, dt_cvt, SQL_C_CHAR, create_buffer, 30 , False ), SQL_TYPE_TIME : (datetime.time, tm_cvt, SQL_C_CHAR, create_buffer, 20 , False ), SQL_TYPE_TIMESTAMP : (datetime.datetime, dttm_cvt, SQL_C_CHAR, create_buffer, 30 , False ), SQL_SS_VARIANT : (str, lambda x: x, SQL_C_CHAR, create_buffer, 2048 , True ), SQL_SS_XML : (unicode, lambda x: x, SQL_C_WCHAR, create_buffer_u, 20500 , True ), SQL_SS_UDT : (bytearray, bytearray_cvt, SQL_C_BINARY, create_buffer, 5120 , True ), } """ Types mapping, applicable for 32-bit and 64-bit Linux / Windows / Mac OS X. SQLPointer -> ctypes.c_void_p SQLCHAR * -> ctypes.c_char_p SQLWCHAR * -> ctypes.c_wchar_p on Windows, ctypes.c_char_p with unixODBC SQLINT -> ctypes.c_int SQLSMALLINT -> ctypes.c_short SQMUSMALLINT -> ctypes.c_ushort SQLLEN -> ctypes.c_ssize_t SQLULEN -> ctypes.c_size_t SQLRETURN -> ctypes.c_short """ # Define the python return type for ODBC functions with ret result. funcs_with_ret = [ "SQLAllocHandle", "SQLBindParameter", "SQLBindCol", "SQLCloseCursor", "SQLColAttribute", "SQLColumns", "SQLColumnsW", "SQLConnect", "SQLConnectW", "SQLDataSources", "SQLDataSourcesW", "SQLDescribeCol", "SQLDescribeColW", "SQLDescribeParam", "SQLDisconnect", "SQLDriverConnect", "SQLDriverConnectW", "SQLDrivers", "SQLDriversW", "SQLEndTran", "SQLExecDirect", "SQLExecDirectW", "SQLExecute", "SQLFetch", "SQLFetchScroll", "SQLForeignKeys", "SQLForeignKeysW", "SQLFreeHandle", "SQLFreeStmt", "SQLGetData", "SQLGetDiagRec", "SQLGetDiagRecW", "SQLGetInfo", "SQLGetInfoW", "SQLGetTypeInfo", "SQLMoreResults", "SQLNumParams", "SQLNumResultCols", "SQLPrepare", "SQLPrepareW", "SQLPrimaryKeys", "SQLPrimaryKeysW", "SQLProcedureColumns", "SQLProcedureColumnsW", "SQLProcedures", "SQLProceduresW", "SQLRowCount", "SQLSetConnectAttr", "SQLSetEnvAttr", "SQLStatistics", "SQLStatisticsW", "SQLTables", "SQLTablesW", "SQLSetStmtAttr" ] for func_name in funcs_with_ret: getattr(ODBC_API, func_name).restype = ctypes.c_short if sys.platform not in ('cli'): #Seems like the IronPython can not declare ctypes.POINTER type arguments ODBC_API.SQLAllocHandle.argtypes = [ ctypes.c_short, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p), ] ODBC_API.SQLBindParameter.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_short, ctypes.c_short, ctypes.c_short, ctypes.c_size_t, ctypes.c_short, ctypes.c_void_p, ctypes.c_ssize_t, ctypes.POINTER(ctypes.c_ssize_t), ] ODBC_API.SQLColAttribute.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_ushort, ctypes.c_void_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_ssize_t), ] ODBC_API.SQLDataSources.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLDescribeCol.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLDescribeParam.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_short), ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLDriverConnect.argtypes = [ ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ctypes.c_ushort, ] ODBC_API.SQLDrivers.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLGetData.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_short, ctypes.c_void_p, ctypes.c_ssize_t, ctypes.POINTER(ctypes.c_ssize_t), ] ODBC_API.SQLGetDiagRec.argtypes = [ ctypes.c_short, ctypes.c_void_p, ctypes.c_short, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.c_char_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLGetInfo.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ctypes.c_void_p, ctypes.c_short, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLRowCount.argtypes = [ ctypes.c_void_p, ctypes.POINTER(ctypes.c_ssize_t), ] ODBC_API.SQLNumParams.argtypes = [ ctypes.c_void_p, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLNumResultCols.argtypes = [ ctypes.c_void_p, ctypes.POINTER(ctypes.c_short), ] ODBC_API.SQLCloseCursor.argtypes = [ctypes.c_void_p] ODBC_API.SQLColumns.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLConnect.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLDisconnect.argtypes = [ctypes.c_void_p] ODBC_API.SQLEndTran.argtypes = [ ctypes.c_short, ctypes.c_void_p, ctypes.c_short, ] ODBC_API.SQLExecute.argtypes = [ctypes.c_void_p] ODBC_API.SQLExecDirect.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ] ODBC_API.SQLFetch.argtypes = [ctypes.c_void_p] ODBC_API.SQLFetchScroll.argtypes = [ ctypes.c_void_p, ctypes.c_short, ctypes.c_ssize_t, ] ODBC_API.SQLForeignKeys.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLFreeHandle.argtypes = [ ctypes.c_short, ctypes.c_void_p, ] ODBC_API.SQLFreeStmt.argtypes = [ ctypes.c_void_p, ctypes.c_ushort, ] ODBC_API.SQLGetTypeInfo.argtypes = [ ctypes.c_void_p, ctypes.c_short, ] ODBC_API.SQLMoreResults.argtypes = [ctypes.c_void_p] ODBC_API.SQLPrepare.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ] ODBC_API.SQLPrimaryKeys.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLProcedureColumns.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLProcedures.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] ODBC_API.SQLSetConnectAttr.argtypes = [ ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ] ODBC_API.SQLSetEnvAttr.argtypes = [ ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ] ODBC_API.SQLStatistics.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_ushort, ctypes.c_ushort, ] ODBC_API.SQLTables.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ctypes.c_char_p, ctypes.c_short, ] def to_wchar(argtypes): if argtypes: # Under IronPython some argtypes are not declared result = [] for x in argtypes: if x == ctypes.c_char_p: result.append(wchar_pointer) else: result.append(x) return result else: return argtypes ODBC_API.SQLColumnsW.argtypes = to_wchar(ODBC_API.SQLColumns.argtypes) ODBC_API.SQLConnectW.argtypes = to_wchar(ODBC_API.SQLConnect.argtypes) ODBC_API.SQLDataSourcesW.argtypes = to_wchar(ODBC_API.SQLDataSources.argtypes) ODBC_API.SQLDescribeColW.argtypes = to_wchar(ODBC_API.SQLDescribeCol.argtypes) ODBC_API.SQLDriverConnectW.argtypes = to_wchar(ODBC_API.SQLDriverConnect.argtypes) ODBC_API.SQLDriversW.argtypes = to_wchar(ODBC_API.SQLDrivers.argtypes) ODBC_API.SQLExecDirectW.argtypes = to_wchar(ODBC_API.SQLExecDirect.argtypes) ODBC_API.SQLForeignKeysW.argtypes = to_wchar(ODBC_API.SQLForeignKeys.argtypes) ODBC_API.SQLPrepareW.argtypes = to_wchar(ODBC_API.SQLPrepare.argtypes) ODBC_API.SQLPrimaryKeysW.argtypes = to_wchar(ODBC_API.SQLPrimaryKeys.argtypes) ODBC_API.SQLProcedureColumnsW.argtypes = to_wchar(ODBC_API.SQLProcedureColumns.argtypes) ODBC_API.SQLProceduresW.argtypes = to_wchar(ODBC_API.SQLProcedures.argtypes) ODBC_API.SQLStatisticsW.argtypes = to_wchar(ODBC_API.SQLStatistics.argtypes) ODBC_API.SQLTablesW.argtypes = to_wchar(ODBC_API.SQLTables.argtypes) ODBC_API.SQLGetDiagRecW.argtypes = to_wchar(ODBC_API.SQLGetDiagRec.argtypes) ODBC_API.SQLGetInfoW.argtypes = to_wchar(ODBC_API.SQLGetInfo.argtypes) # Set the alias for the ctypes functions for beter code readbility or performance. ADDR = ctypes.byref c_short = ctypes.c_short c_ssize_t = ctypes.c_ssize_t SQLFetch = ODBC_API.SQLFetch SQLExecute = ODBC_API.SQLExecute SQLBindParameter = ODBC_API.SQLBindParameter SQLGetData = ODBC_API.SQLGetData SQLRowCount = ODBC_API.SQLRowCount SQLNumResultCols = ODBC_API.SQLNumResultCols SQLEndTran = ODBC_API.SQLEndTran # Set alias for beter code readbility or performance. NO_FREE_STATEMENT = 0 FREE_STATEMENT = 1 BLANK_BYTE = str_8b() def ctrl_err(ht, h, val_ret, ansi): """Classify type of ODBC error from (type of handle, handle, return value) , and raise with a list""" if ansi: state = create_buffer(22) Message = create_buffer(1024*4) ODBC_func = ODBC_API.SQLGetDiagRec if py_v3: raw_s = lambda s: bytes(s,'ascii') else: raw_s = str_8b else: state = create_buffer_u(22) Message = create_buffer_u(1024*4) ODBC_func = ODBC_API.SQLGetDiagRecW raw_s = unicode NativeError = ctypes.c_int() Buffer_len = c_short() err_list = [] number_errors = 1 while 1: ret = ODBC_func(ht, h, number_errors, state, \ ADDR(NativeError), Message, 1024, ADDR(Buffer_len)) if ret == SQL_NO_DATA_FOUND: #No more data, I can raise #print(err_list[0][1]) state = err_list[0][0] err_text = raw_s('[')+state+raw_s('] ')+err_list[0][1] if state[:2] in (raw_s('24'),raw_s('25'),raw_s('42')): raise ProgrammingError(state,err_text) elif state[:2] in (raw_s('22')): raise DataError(state,err_text) elif state[:2] in (raw_s('23')) or state == raw_s('40002'): raise IntegrityError(state,err_text) elif state == raw_s('0A000'): raise NotSupportedError(state,err_text) elif state in (raw_s('HYT00'),raw_s('HYT01')): raise OperationalError(state,err_text) elif state[:2] in (raw_s('IM'),raw_s('HY')): raise Error(state,err_text) else: raise DatabaseError(state,err_text) break elif ret == SQL_INVALID_HANDLE: #The handle passed is an invalid handle raise ProgrammingError('', 'SQL_INVALID_HANDLE') elif ret == SQL_SUCCESS: if ansi: err_list.append((state.value, Message.value, NativeError.value)) else: err_list.append((from_buffer_u(state), from_buffer_u(Message), NativeError.value)) number_errors += 1 elif ret == SQL_ERROR: raise ProgrammingError('', 'SQL_ERROR') def check_success(ODBC_obj, ret): """ Validate return value, if not success, raise exceptions based on the handle """ if ret not in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO, SQL_NO_DATA): if isinstance(ODBC_obj, Cursor): ctrl_err(SQL_HANDLE_STMT, ODBC_obj.stmt_h, ret, ODBC_obj.ansi) elif isinstance(ODBC_obj, Connection): ctrl_err(SQL_HANDLE_DBC, ODBC_obj.dbc_h, ret, ODBC_obj.ansi) else: ctrl_err(SQL_HANDLE_ENV, ODBC_obj, ret, False) def AllocateEnv(): if pooling: ret = ODBC_API.SQLSetEnvAttr(SQL_NULL_HANDLE, SQL_ATTR_CONNECTION_POOLING, SQL_CP_ONE_PER_HENV, SQL_IS_UINTEGER) check_success(SQL_NULL_HANDLE, ret) ''' Allocate an ODBC environment by initializing the handle shared_env_h ODBC enviroment needed to be created, so connections can be created under it connections pooling can be shared under one environment ''' global shared_env_h shared_env_h = ctypes.c_void_p() ret = ODBC_API.SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, ADDR(shared_env_h)) check_success(shared_env_h, ret) # Set the ODBC environment's compatibil leve to ODBC 3.0 ret = ODBC_API.SQLSetEnvAttr(shared_env_h, SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC3, 0) check_success(shared_env_h, ret) """ Here, we have a few callables that determine how a result row is returned. A new one can be added by creating a callable that: - accepts a cursor as its parameter. - returns a callable that accepts an iterable containing the row values. """ def TupleRow(cursor): """Normal tuple with added attribute `cursor_description`, as in pyodbc. This is the default. """ class Row(tuple): cursor_description = cursor.description def get(self, field): if not hasattr(self, 'field_dict'): self.field_dict = {} for i,item in enumerate(self): self.field_dict[self.cursor_description[i][0]] = item return self.field_dict.get(field) def __getitem__(self, field): if isinstance(field, (unicode,str)): return self.get(field) else: return tuple.__getitem__(self,field) return Row def NamedTupleRow(cursor): """Named tuple to allow attribute lookup by name. Requires py2.6 or above. """ from collections import namedtuple attr_names = [x[0] for x in cursor._ColBufferList] class Row(namedtuple('Row', attr_names, rename=True)): cursor_description = cursor.description def __new__(cls, iterable): return super(Row, cls).__new__(cls, *iterable) return Row def MutableNamedTupleRow(cursor): """Mutable named tuple to allow attribute to be replaced. This should be compatible with pyodbc's Row type. Requires 3rd-party library "recordtype". """ from recordtype import recordtype attr_names = [x[0] for x in cursor._ColBufferList] class Row(recordtype('Row', attr_names, rename=True)): cursor_description = cursor.description def __init__(self, iterable): super(Row, self).__init__(*iterable) def __iter__(self): for field_name in self.__slots__: yield getattr(self, field_name) def __getitem__(self, index): if isinstance(index, slice): return tuple(getattr(self, x) for x in self.__slots__[index]) return getattr(self, self.__slots__[index]) def __setitem__(self, index, value): setattr(self, self.__slots__[index], value) return Row # When Null is used in a binary parameter, database usually would not # accept the None for a binary field, so the work around is to use a # Specical None that the pypyodbc moudle would know this NULL is for # a binary field. class BinaryNullType(): pass BinaryNull = BinaryNullType() # The get_type function is used to determine if parameters need to be re-binded # against the changed parameter types # 'b' for bool, 'U' for long unicode string, 'u' for short unicode string # 'S' for long 8 bit string, 's' for short 8 bit string, 'l' for big integer, 'i' for normal integer # 'f' for float, 'D' for Decimal, 't' for datetime.time, 'd' for datetime.datetime, 'dt' for datetime.datetime # 'bi' for binary def get_type(v): if isinstance(v, bool): return ('b',) elif isinstance(v, unicode): if len(v) >= 255: return ('U',(len(v)//1000+1)*1000) else: return ('u',) elif isinstance(v, (str_8b,str)): if len(v) >= 255: return ('S',(len(v)//1000+1)*1000) else: return ('s',) elif isinstance(v, (int, long)): #SQL_BIGINT defination: http://msdn.microsoft.com/en-us/library/ms187745.aspx if v > 2147483647 or v < -2147483648: return ('l',) else: return ('i',) elif isinstance(v, float): return ('f',) elif isinstance(v, BinaryNullType): return ('BN',) elif v is None: return ('N',) elif isinstance(v, Decimal): t = v.as_tuple() #1.23 -> (1,2,3),-2 , 1.23*E7 -> (1,2,3),5 return ('D',(len(t[1]),0 - t[2])) # number of digits, and number of decimal digits elif isinstance (v, datetime.datetime): return ('dt',) elif isinstance (v, datetime.date): return ('d',) elif isinstance(v, datetime.time): return ('t',) elif isinstance (v, (bytearray, buffer)): return ('bi',(len(v)//1000+1)*1000) return type(v) # The Cursor Class. class Cursor: def __init__(self, conx, row_type_callable=None): """ Initialize self.stmt_h, which is the handle of a statement A statement is actually the basis of a python"cursor" object """ self.stmt_h = ctypes.c_void_p() self.connection = conx self.ansi = conx.ansi self.row_type_callable = row_type_callable or TupleRow self.statement = None self._last_param_types = None self._ParamBufferList = [] self._ColBufferList = [] self._row_type = None self._buf_cvt_func = [] self.rowcount = -1 self.description = None self.autocommit = None self._ColTypeCodeList = [] self._outputsize = {} self._inputsizers = [] self.arraysize = 1 ret = ODBC_API.SQLAllocHandle(SQL_HANDLE_STMT, self.connection.dbc_h, ADDR(self.stmt_h)) check_success(self, ret) self.timeout = conx.timeout if self.timeout != 0: self.set_timeout(self.timeout) self._PARAM_SQL_TYPE_LIST = [] self.closed = False def set_timeout(self, timeout): self.timeout = timeout ret = ODBC_API.SQLSetStmtAttr(self.stmt_h, SQL_ATTR_QUERY_TIMEOUT, self.timeout, 0) check_success(self, ret) def prepare(self, query_string): """prepare a query""" #self._free_results(FREE_STATEMENT) if not self.connection: self.close() if type(query_string) == unicode: c_query_string = wchar_pointer(UCS_buf(query_string)) ret = ODBC_API.SQLPrepareW(self.stmt_h, c_query_string, len(query_string)) else: c_query_string = ctypes.c_char_p(query_string) ret = ODBC_API.SQLPrepare(self.stmt_h, c_query_string, len(query_string)) if ret != SQL_SUCCESS: check_success(self, ret) self._PARAM_SQL_TYPE_LIST = [] if self.connection.support_SQLDescribeParam: # SQLServer's SQLDescribeParam only supports DML SQL, so avoid the SELECT statement if True:# 'SELECT' not in query_string.upper(): #self._free_results(NO_FREE_STATEMENT) NumParams = c_short() ret = ODBC_API.SQLNumParams(self.stmt_h, ADDR(NumParams)) if ret != SQL_SUCCESS: check_success(self, ret) for col_num in range(NumParams.value): ParameterNumber = ctypes.c_ushort(col_num + 1) DataType = c_short() ParameterSize = ctypes.c_size_t() DecimalDigits = c_short() Nullable = c_short() ret = ODBC_API.SQLDescribeParam( self.stmt_h, ParameterNumber, ADDR(DataType), ADDR(ParameterSize), ADDR(DecimalDigits), ADDR(Nullable), ) if ret != SQL_SUCCESS: try: check_success(self, ret) except DatabaseError: if sys.exc_info()[1].value[0] == '07009': self._PARAM_SQL_TYPE_LIST = [] break else: raise sys.exc_info()[1] except: raise sys.exc_info()[1] self._PARAM_SQL_TYPE_LIST.append((DataType.value,DecimalDigits.value)) self.statement = query_string def _BindParams(self, param_types, pram_io_list = []): """Create parameter buffers based on param types, and bind them to the statement""" # Clear the old Parameters if not self.connection: self.close() #self._free_results(NO_FREE_STATEMENT) # Get the number of query parameters judged by database. NumParams = c_short() ret = ODBC_API.SQLNumParams(self.stmt_h, ADDR(NumParams)) if ret != SQL_SUCCESS: check_success(self, ret) if len(param_types) != NumParams.value: # In case number of parameters provided do not same as number required error_desc = "The SQL contains %d parameter markers, but %d parameters were supplied" \ %(NumParams.value,len(param_types)) raise ProgrammingError('HY000',error_desc) # Every parameter needs to be binded to a buffer ParamBufferList = [] # Temporary holder since we can only call SQLDescribeParam before # calling SQLBindParam. temp_holder = [] for col_num in range(NumParams.value): dec_num = 0 buf_size = 512 if param_types[col_num][0] == 'u': sql_c_type = SQL_C_WCHAR sql_type = SQL_WVARCHAR buf_size = 255 ParameterBuffer = create_buffer_u(buf_size) elif param_types[col_num][0] == 's': sql_c_type = SQL_C_CHAR sql_type = SQL_VARCHAR buf_size = 255 ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'U': sql_c_type = SQL_C_WCHAR sql_type = SQL_WLONGVARCHAR buf_size = param_types[col_num][1]#len(self._inputsizers)>col_num and self._inputsizers[col_num] or 20500 ParameterBuffer = create_buffer_u(buf_size) elif param_types[col_num][0] == 'S': sql_c_type = SQL_C_CHAR sql_type = SQL_LONGVARCHAR buf_size = param_types[col_num][1]#len(self._inputsizers)>col_num and self._inputsizers[col_num] or 20500 ParameterBuffer = create_buffer(buf_size) # bool subclasses int, thus has to go first elif param_types[col_num][0] == 'b': sql_c_type = SQL_C_CHAR sql_type = SQL_BIT buf_size = SQL_data_type_dict[sql_type][4] ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'i': sql_c_type = SQL_C_CHAR sql_type = SQL_INTEGER buf_size = SQL_data_type_dict[sql_type][4] ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'l': sql_c_type = SQL_C_CHAR sql_type = SQL_BIGINT buf_size = SQL_data_type_dict[sql_type][4] ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'D': #Decimal sql_c_type = SQL_C_CHAR sql_type = SQL_NUMERIC digit_num, dec_num = param_types[col_num][1] if dec_num > 0: # has decimal buf_size = digit_num dec_num = dec_num else: # no decimal buf_size = digit_num - dec_num dec_num = 0 ParameterBuffer = create_buffer(buf_size + 4)# add extra length for sign and dot elif param_types[col_num][0] == 'f': sql_c_type = SQL_C_CHAR sql_type = SQL_DOUBLE buf_size = SQL_data_type_dict[sql_type][4] ParameterBuffer = create_buffer(buf_size) # datetime subclasses date, thus has to go first elif param_types[col_num][0] == 'dt': sql_c_type = SQL_C_CHAR sql_type = SQL_TYPE_TIMESTAMP buf_size = self.connection.type_size_dic[SQL_TYPE_TIMESTAMP][0] ParameterBuffer = create_buffer(buf_size) dec_num = self.connection.type_size_dic[SQL_TYPE_TIMESTAMP][1] elif param_types[col_num][0] == 'd': sql_c_type = SQL_C_CHAR if SQL_TYPE_DATE in self.connection.type_size_dic: #if DEBUG:print('conx.type_size_dic.has_key(SQL_TYPE_DATE)') sql_type = SQL_TYPE_DATE buf_size = self.connection.type_size_dic[SQL_TYPE_DATE][0] ParameterBuffer = create_buffer(buf_size) dec_num = self.connection.type_size_dic[SQL_TYPE_DATE][1] else: # SQL Sever <2008 doesn't have a DATE type. sql_type = SQL_TYPE_TIMESTAMP buf_size = 10 ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 't': sql_c_type = SQL_C_CHAR if SQL_TYPE_TIME in self.connection.type_size_dic: sql_type = SQL_TYPE_TIME buf_size = self.connection.type_size_dic[SQL_TYPE_TIME][0] ParameterBuffer = create_buffer(buf_size) dec_num = self.connection.type_size_dic[SQL_TYPE_TIME][1] elif SQL_SS_TIME2 in self.connection.type_size_dic: # TIME type added in SQL Server 2008 sql_type = SQL_SS_TIME2 buf_size = self.connection.type_size_dic[SQL_SS_TIME2][0] ParameterBuffer = create_buffer(buf_size) dec_num = self.connection.type_size_dic[SQL_SS_TIME2][1] else: # SQL Sever <2008 doesn't have a TIME type. sql_type = SQL_TYPE_TIMESTAMP buf_size = self.connection.type_size_dic[SQL_TYPE_TIMESTAMP][0] ParameterBuffer = create_buffer(buf_size) dec_num = 3 elif param_types[col_num][0] == 'BN': sql_c_type = SQL_C_BINARY sql_type = SQL_VARBINARY buf_size = 1 ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'N': if len(self._PARAM_SQL_TYPE_LIST) > 0: sql_c_type = SQL_C_DEFAULT sql_type = self._PARAM_SQL_TYPE_LIST[col_num][0] buf_size = 1 ParameterBuffer = create_buffer(buf_size) else: sql_c_type = SQL_C_CHAR sql_type = SQL_CHAR buf_size = 1 ParameterBuffer = create_buffer(buf_size) elif param_types[col_num][0] == 'bi': sql_c_type = SQL_C_BINARY sql_type = SQL_LONGVARBINARY buf_size = param_types[col_num][1]#len(self._inputsizers)>col_num and self._inputsizers[col_num] or 20500 ParameterBuffer = create_buffer(buf_size) else: sql_c_type = SQL_C_CHAR sql_type = SQL_LONGVARCHAR buf_size = len(self._inputsizers)>col_num and self._inputsizers[col_num] or 20500 ParameterBuffer = create_buffer(buf_size) temp_holder.append((sql_c_type, sql_type, buf_size, dec_num, ParameterBuffer)) for col_num, (sql_c_type, sql_type, buf_size, dec_num, ParameterBuffer) in enumerate(temp_holder): BufferLen = c_ssize_t(buf_size) LenOrIndBuf = c_ssize_t() InputOutputType = SQL_PARAM_INPUT if len(pram_io_list) > col_num: InputOutputType = pram_io_list[col_num] ret = SQLBindParameter(self.stmt_h, col_num + 1, InputOutputType, sql_c_type, sql_type, buf_size,\ dec_num, ADDR(ParameterBuffer), BufferLen,ADDR(LenOrIndBuf)) if ret != SQL_SUCCESS: check_success(self, ret) # Append the value buffer and the length buffer to the array ParamBufferList.append((ParameterBuffer,LenOrIndBuf,sql_type)) self._last_param_types = param_types self._ParamBufferList = ParamBufferList def execute(self, query_string, params=None, many_mode=False, call_mode=False): """ Execute the query string, with optional parameters. If parameters are provided, the query would first be prepared, then executed with parameters; If parameters are not provided, only th query sting, it would be executed directly """ if not self.connection: self.close() self._free_stmt(SQL_CLOSE) if params: # If parameters exist, first prepare the query then executed with parameters if not isinstance(params, (tuple, list)): raise TypeError("Params must be in a list, tuple, or Row") if query_string != self.statement: # if the query is not same as last query, then it is not prepared self.prepare(query_string) param_types = list(map(get_type, params)) if call_mode: self._free_stmt(SQL_RESET_PARAMS) self._BindParams(param_types, self._pram_io_list) else: if self._last_param_types is None: self._free_stmt(SQL_RESET_PARAMS) self._BindParams(param_types) elif len(param_types) != len(self._last_param_types): self._free_stmt(SQL_RESET_PARAMS) self._BindParams(param_types) elif sum([p_type[0] != 'N' and p_type != self._last_param_types[i] for i,p_type in enumerate(param_types)]) > 0: self._free_stmt(SQL_RESET_PARAMS) self._BindParams(param_types) # With query prepared, now put parameters into buffers col_num = 0 for param_buffer, param_buffer_len, sql_type in self._ParamBufferList: c_char_buf, c_buf_len = '', 0 param_val = params[col_num] if param_types[col_num][0] in ('N','BN'): param_buffer_len.value = SQL_NULL_DATA col_num += 1 continue elif param_types[col_num][0] in ('i','l','f'): if py_v3: c_char_buf = bytes(str(param_val),'ascii') else: c_char_buf = str(param_val) c_buf_len = len(c_char_buf) elif param_types[col_num][0] in ('s','S'): c_char_buf = param_val c_buf_len = len(c_char_buf) elif param_types[col_num][0] in ('u','U'): c_char_buf = UCS_buf(param_val) c_buf_len = len(c_char_buf) elif param_types[col_num][0] == 'dt': max_len = self.connection.type_size_dic[SQL_TYPE_TIMESTAMP][0] datetime_str = param_val.strftime('%Y-%m-%d %H:%M:%S.%f') c_char_buf = datetime_str[:max_len] if py_v3: c_char_buf = bytes(c_char_buf,'ascii') c_buf_len = len(c_char_buf) # print c_buf_len, c_char_buf elif param_types[col_num][0] == 'd': if SQL_TYPE_DATE in self.connection.type_size_dic: max_len = self.connection.type_size_dic[SQL_TYPE_DATE][0] else: max_len = 10 c_char_buf = param_val.isoformat()[:max_len] if py_v3: c_char_buf = bytes(c_char_buf,'ascii') c_buf_len = len(c_char_buf) #print c_char_buf elif param_types[col_num][0] == 't': if SQL_TYPE_TIME in self.connection.type_size_dic: max_len = self.connection.type_size_dic[SQL_TYPE_TIME][0] c_char_buf = param_val.isoformat()[:max_len] c_buf_len = len(c_char_buf) elif SQL_SS_TIME2 in self.connection.type_size_dic: max_len = self.connection.type_size_dic[SQL_SS_TIME2][0] c_char_buf = param_val.isoformat()[:max_len] c_buf_len = len(c_char_buf) else: c_buf_len = self.connection.type_size_dic[SQL_TYPE_TIMESTAMP][0] time_str = param_val.isoformat() if len(time_str) == 8: time_str += '.000' c_char_buf = '1900-01-01 '+time_str[0:c_buf_len - 11] if py_v3: c_char_buf = bytes(c_char_buf,'ascii') #print c_buf_len, c_char_buf elif param_types[col_num][0] == 'b': if param_val == True: c_char_buf = '1' else: c_char_buf = '0' if py_v3: c_char_buf = bytes(c_char_buf,'ascii') c_buf_len = 1 elif param_types[col_num][0] == 'D': #Decimal sign = param_val.as_tuple()[0] == 0 and '+' or '-' digit_string = ''.join([str(x) for x in param_val.as_tuple()[1]]) digit_num, dec_num = param_types[col_num][1] if dec_num > 0: # has decimal left_part = digit_string[:digit_num - dec_num] right_part = digit_string[0-dec_num:] else: # no decimal left_part = digit_string + '0'*(0-dec_num) right_part = '' v = ''.join((sign, left_part,'.', right_part)) if py_v3: c_char_buf = bytes(v,'ascii') else: c_char_buf = v c_buf_len = len(c_char_buf) elif param_types[col_num][0] == 'bi': c_char_buf = str_8b(param_val) c_buf_len = len(c_char_buf) else: c_char_buf = param_val if param_types[col_num][0] == 'bi': param_buffer.raw = str_8b(param_val) else: #print (type(param_val),param_buffer, param_buffer.value) param_buffer.value = c_char_buf if param_types[col_num][0] in ('U','u','S','s'): #ODBC driver will find NUL in unicode and string to determine their length param_buffer_len.value = SQL_NTS else: param_buffer_len.value = c_buf_len col_num += 1 ret = SQLExecute(self.stmt_h) if ret != SQL_SUCCESS: #print param_valparam_buffer, param_buffer.value check_success(self, ret) if not many_mode: self._NumOfRows() self._UpdateDesc() #self._BindCols() else: self.execdirect(query_string) return self def _SQLExecute(self): if not self.connection: self.close() ret = SQLExecute(self.stmt_h) if ret != SQL_SUCCESS: check_success(self, ret) def execdirect(self, query_string): """Execute a query directly""" if not self.connection: self.close() self._free_stmt() self._last_param_types = None self.statement = None if type(query_string) == unicode: c_query_string = wchar_pointer(UCS_buf(query_string)) ret = ODBC_API.SQLExecDirectW(self.stmt_h, c_query_string, len(query_string)) else: c_query_string = ctypes.c_char_p(query_string) ret = ODBC_API.SQLExecDirect(self.stmt_h, c_query_string, len(query_string)) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def callproc(self, procname, args): if not self.connection: self.close() raise Warning('', 'Still not fully implemented') self._pram_io_list = [row[4] for row in self.procedurecolumns(procedure = procname).fetchall() if row[4] not in (SQL_RESULT_COL, SQL_RETURN_VALUE)] print('pram_io_list: '+str(self._pram_io_list)) call_escape = '{CALL '+procname if args: call_escape += '(' + ','.join(['?' for params in args]) + ')' call_escape += '}' self.execute(call_escape, args, call_mode = True) result = [] for buf, buf_len, sql_type in self._ParamBufferList: if buf_len.value == -1: result.append(None) else: result.append(self.connection.output_converter[sql_type](buf.value)) return result def executemany(self, query_string, params_list = [None]): if not self.connection: self.close() for params in params_list: self.execute(query_string, params, many_mode = True) self._NumOfRows() self.rowcount = -1 self._UpdateDesc() #self._BindCols() def _CreateColBuf(self): if not self.connection: self.close() self._free_stmt(SQL_UNBIND) NOC = self._NumOfCols() self._ColBufferList = [] bind_data = True for col_num in range(NOC): col_name = self.description[col_num][0] col_size = self.description[col_num][2] col_sql_data_type = self._ColTypeCodeList[col_num] target_type = SQL_data_type_dict[col_sql_data_type][2] dynamic_length = SQL_data_type_dict[col_sql_data_type][5] # set default size base on the column's sql data type total_buf_len = SQL_data_type_dict[col_sql_data_type][4] # over-write if there's pre-set size value for "large columns" if total_buf_len > 20500: total_buf_len = self._outputsize.get(None,total_buf_len) # over-write if there's pre-set size value for the "col_num" column total_buf_len = self._outputsize.get(col_num, total_buf_len) # if the size of the buffer is very long, do not bind # because a large buffer decrease performance, and sometimes you only get a NULL value. # in that case use sqlgetdata instead. if col_size >= 1024: dynamic_length = True alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len) used_buf_len = c_ssize_t() force_unicode = self.connection.unicode_results if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR): target_type = SQL_C_WCHAR alloc_buffer = create_buffer_u(total_buf_len) buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]] if bind_data: if dynamic_length: bind_data = False self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data]) if bind_data: ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len)) if ret != SQL_SUCCESS: check_success(self, ret) def _UpdateDesc(self): "Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)" if not self.connection: self.close() force_unicode = self.connection.unicode_results if force_unicode: Cname = create_buffer_u(1024) else: Cname = create_buffer(1024) Cname_ptr = c_short() Ctype_code = c_short() Csize = ctypes.c_size_t() Cdisp_size = c_ssize_t(0) CDecimalDigits = c_short() Cnull_ok = c_short() ColDescr = [] self._ColTypeCodeList = [] NOC = self._NumOfCols() for col in range(1, NOC+1): ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)), 10, ADDR(c_short()),ADDR(Cdisp_size)) if ret != SQL_SUCCESS: check_success(self, ret) if force_unicode: ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\ ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok)) if ret != SQL_SUCCESS: check_success(self, ret) else: ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\ ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok)) if ret != SQL_SUCCESS: check_success(self, ret) col_name = Cname.value if lowercase: col_name = col_name.lower() #(name, type_code, display_size, ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\ Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False)) self._ColTypeCodeList.append(Ctype_code.value) if len(ColDescr) > 0: self.description = ColDescr # Create the row type before fetching. self._row_type = self.row_type_callable(self) else: self.description = None self._CreateColBuf() def _NumOfRows(self): """Get the number of rows""" if not self.connection: self.close() NOR = c_ssize_t() ret = SQLRowCount(self.stmt_h, ADDR(NOR)) if ret != SQL_SUCCESS: check_success(self, ret) self.rowcount = NOR.value return self.rowcount def _NumOfCols(self): """Get the number of cols""" if not self.connection: self.close() NOC = c_short() ret = SQLNumResultCols(self.stmt_h, ADDR(NOC)) if ret != SQL_SUCCESS: check_success(self, ret) return NOC.value def fetchall(self): if not self.connection: self.close() rows = [] while True: row = self.fetchone() if row is None: break rows.append(row) return rows def fetchmany(self, num = None): if not self.connection: self.close() if num is None: num = self.arraysize rows = [] while len(rows) < num: row = self.fetchone() if row is None: break rows.append(row) return rows def fetchone(self): if not self.connection: self.close() ret = SQLFetch(self.stmt_h) if ret in (SQL_SUCCESS,SQL_SUCCESS_WITH_INFO): '''Bind buffers for the record set columns''' value_list = [] col_num = 1 for col_name, target_type, used_buf_len, ADDR_used_buf_len, alloc_buffer, ADDR_alloc_buffer, total_buf_len, buf_cvt_func, bind_data in self._ColBufferList: raw_data_parts = [] while 1: if bind_data: ret = SQL_SUCCESS else: ret = SQLGetData(self.stmt_h, col_num, target_type, ADDR_alloc_buffer, total_buf_len, ADDR_used_buf_len) if ret == SQL_SUCCESS: if used_buf_len.value == SQL_NULL_DATA: value_list.append(None) else: if raw_data_parts == []: # Means no previous data, no need to combine if target_type == SQL_C_BINARY: value_list.append(buf_cvt_func(alloc_buffer.raw[:used_buf_len.value])) elif target_type == SQL_C_WCHAR: value_list.append(buf_cvt_func(from_buffer_u(alloc_buffer))) else: value_list.append(buf_cvt_func(alloc_buffer.value)) else: # There are previous fetched raw data to combine if target_type == SQL_C_BINARY: raw_data_parts.append(alloc_buffer.raw[:used_buf_len.value]) elif target_type == SQL_C_WCHAR: raw_data_parts.append(from_buffer_u(alloc_buffer)) else: raw_data_parts.append(alloc_buffer.value) break elif ret == SQL_SUCCESS_WITH_INFO: # Means the data is only partial if target_type == SQL_C_BINARY: raw_data_parts.append(alloc_buffer.raw) else: raw_data_parts.append(alloc_buffer.value) elif ret == SQL_NO_DATA: # Means all data has been transmitted break else: check_success(self, ret) if raw_data_parts != []: if py_v3: if target_type != SQL_C_BINARY: raw_value = ''.join(raw_data_parts) else: raw_value = BLANK_BYTE.join(raw_data_parts) else: raw_value = ''.join(raw_data_parts) value_list.append(buf_cvt_func(raw_value)) col_num += 1 return self._row_type(value_list) else: if ret == SQL_NO_DATA_FOUND: return None else: check_success(self, ret) def __next__(self): return self.next() def next(self): row = self.fetchone() if row is None: raise(StopIteration) return row def __iter__(self): return self def skip(self, count = 0): if not self.connection: self.close() for i in range(count): ret = ODBC_API.SQLFetchScroll(self.stmt_h, SQL_FETCH_NEXT, 0) if ret != SQL_SUCCESS: check_success(self, ret) return None def nextset(self): if not self.connection: self.close() ret = ODBC_API.SQLMoreResults(self.stmt_h) if ret not in (SQL_SUCCESS, SQL_NO_DATA): check_success(self, ret) if ret == SQL_NO_DATA: self._free_stmt() return False else: self._NumOfRows() self._UpdateDesc() #self._BindCols() return True def _free_stmt(self, free_type = None): if not self.connection: self.close() if not self.connection.connected: raise ProgrammingError('HY000','Attempt to use a closed connection.') #self.description = None #self.rowcount = -1 if free_type in (SQL_CLOSE, None): ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_CLOSE) if ret != SQL_SUCCESS: check_success(self, ret) if free_type in (SQL_UNBIND, None): ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_UNBIND) if ret != SQL_SUCCESS: check_success(self, ret) if free_type in (SQL_RESET_PARAMS, None): ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_RESET_PARAMS) if ret != SQL_SUCCESS: check_success(self, ret) def getTypeInfo(self, sqlType = None): if not self.connection: self.close() if sqlType is None: type = SQL_ALL_TYPES else: type = sqlType ret = ODBC_API.SQLGetTypeInfo(self.stmt_h, type) if ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO): self._NumOfRows() self._UpdateDesc() #self._BindCols() return self.fetchone() def tables(self, table=None, catalog=None, schema=None, tableType=None): """Return a list with all tables""" if not self.connection: self.close() l_catalog = l_schema = l_table = l_tableType = 0 if unicode in [type(x) for x in (table, catalog, schema,tableType)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLTablesW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLTables if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if table is not None: l_table = len(table) table = string_p(table) if tableType is not None: l_tableType = len(tableType) tableType = string_p(tableType) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, table, l_table, tableType, l_tableType) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def columns(self, table=None, catalog=None, schema=None, column=None): """Return a list with all columns""" if not self.connection: self.close() l_catalog = l_schema = l_table = l_column = 0 if unicode in [type(x) for x in (table, catalog, schema,column)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLColumnsW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLColumns if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if table is not None: l_table = len(table) table = string_p(table) if column is not None: l_column = len(column) column = string_p(column) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, table, l_table, column, l_column) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def primaryKeys(self, table=None, catalog=None, schema=None): if not self.connection: self.close() l_catalog = l_schema = l_table = 0 if unicode in [type(x) for x in (table, catalog, schema)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLPrimaryKeysW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLPrimaryKeys if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if table is not None: l_table = len(table) table = string_p(table) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, table, l_table) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def foreignKeys(self, table=None, catalog=None, schema=None, foreignTable=None, foreignCatalog=None, foreignSchema=None): if not self.connection: self.close() l_catalog = l_schema = l_table = l_foreignTable = l_foreignCatalog = l_foreignSchema = 0 if unicode in [type(x) for x in (table, catalog, schema,foreignTable,foreignCatalog,foreignSchema)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLForeignKeysW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLForeignKeys if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if table is not None: l_table = len(table) table = string_p(table) if foreignTable is not None: l_foreignTable = len(foreignTable) foreignTable = string_p(foreignTable) if foreignCatalog is not None: l_foreignCatalog = len(foreignCatalog) foreignCatalog = string_p(foreignCatalog) if foreignSchema is not None: l_foreignSchema = len(foreignSchema) foreignSchema = string_p(foreignSchema) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, table, l_table, foreignCatalog, l_foreignCatalog, foreignSchema, l_foreignSchema, foreignTable, l_foreignTable) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def procedurecolumns(self, procedure=None, catalog=None, schema=None, column=None): if not self.connection: self.close() l_catalog = l_schema = l_procedure = l_column = 0 if unicode in [type(x) for x in (procedure, catalog, schema,column)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLProcedureColumnsW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLProcedureColumns if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if procedure is not None: l_procedure = len(procedure) procedure = string_p(procedure) if column is not None: l_column = len(column) column = string_p(column) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, procedure, l_procedure, column, l_column) check_success(self, ret) self._NumOfRows() self._UpdateDesc() return self def procedures(self, procedure=None, catalog=None, schema=None): if not self.connection: self.close() l_catalog = l_schema = l_procedure = 0 if unicode in [type(x) for x in (procedure, catalog, schema)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLProceduresW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLProcedures if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if procedure is not None: l_procedure = len(procedure) procedure = string_p(procedure) self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, procedure, l_procedure) check_success(self, ret) self._NumOfRows() self._UpdateDesc() return self def statistics(self, table, catalog=None, schema=None, unique=False, quick=True): if not self.connection: self.close() l_table = l_catalog = l_schema = 0 if unicode in [type(x) for x in (table, catalog, schema)]: string_p = lambda x:wchar_pointer(UCS_buf(x)) API_f = ODBC_API.SQLStatisticsW else: string_p = ctypes.c_char_p API_f = ODBC_API.SQLStatistics if catalog is not None: l_catalog = len(catalog) catalog = string_p(catalog) if schema is not None: l_schema = len(schema) schema = string_p(schema) if table is not None: l_table = len(table) table = string_p(table) if unique: Unique = SQL_INDEX_UNIQUE else: Unique = SQL_INDEX_ALL if quick: Reserved = SQL_QUICK else: Reserved = SQL_ENSURE self._free_stmt() self._last_param_types = None self.statement = None ret = API_f(self.stmt_h, catalog, l_catalog, schema, l_schema, table, l_table, Unique, Reserved) check_success(self, ret) self._NumOfRows() self._UpdateDesc() #self._BindCols() return self def commit(self): if not self.connection: self.close() self.connection.commit() def rollback(self): if not self.connection: self.close() self.connection.rollback() def setoutputsize(self, size, column = None): if not self.connection: self.close() self._outputsize[column] = size def setinputsizes(self, sizes): if not self.connection: self.close() self._inputsizers = [size for size in sizes] def close(self): """ Call SQLCloseCursor API to free the statement handle""" # ret = ODBC_API.SQLCloseCursor(self.stmt_h) # check_success(self, ret) # if self.connection.connected: ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_CLOSE) check_success(self, ret) ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_UNBIND) check_success(self, ret) ret = ODBC_API.SQLFreeStmt(self.stmt_h, SQL_RESET_PARAMS) check_success(self, ret) ret = ODBC_API.SQLFreeHandle(SQL_HANDLE_STMT, self.stmt_h) check_success(self, ret) self.closed = True def __del__(self): if not self.closed: self.close() def __exit__(self, type, value, traceback): if not self.connection: self.close() if value: self.rollback() else: self.commit() self.close() def __enter__(self): return self # This class implement a odbc connection. # # connection_timeout = 0 class Connection: def __init__(self, connectString = '', autocommit = False, ansi = False, timeout = 0, unicode_results = use_unicode, readonly = False, **kargs): """Init variables and connect to the engine""" self.connected = 0 self.type_size_dic = {} self.ansi = False self.unicode_results = False self.dbc_h = ctypes.c_void_p() self.autocommit = autocommit self.readonly = False # the query timeout value self.timeout = 0 # self._cursors = [] for key, value in list(kargs.items()): connectString = connectString + key + '=' + value + ';' self.connectString = connectString self.clear_output_converters() try: lock.acquire() if shared_env_h is None: #Initialize an enviroment if it is not created. AllocateEnv() finally: lock.release() # Allocate an DBC handle self.dbc_h under the environment shared_env_h # This DBC handle is actually the basis of a "connection" # The handle of self.dbc_h will be used to connect to a certain source # in the self.connect and self.ConnectByDSN method ret = ODBC_API.SQLAllocHandle(SQL_HANDLE_DBC, shared_env_h, ADDR(self.dbc_h)) check_success(self, ret) self.connection_timeout = connection_timeout if self.connection_timeout != 0: self.set_connection_timeout(connection_timeout) self.connect(connectString, autocommit, ansi, timeout, unicode_results, readonly) def set_connection_timeout(self,connection_timeout): self.connection_timeout = connection_timeout ret = ODBC_API.SQLSetConnectAttr(self.dbc_h, SQL_ATTR_CONNECTION_TIMEOUT, connection_timeout, SQL_IS_UINTEGER); check_success(self, ret) def connect(self, connectString = '', autocommit = False, ansi = False, timeout = 0, unicode_results = use_unicode, readonly = False): """Connect to odbc, using connect strings and set the connection's attributes like autocommit and timeout by calling SQLSetConnectAttr """ # Before we establish the connection by the connection string # Set the connection's attribute of "timeout" (Actully LOGIN_TIMEOUT) if timeout != 0: ret = ODBC_API.SQLSetConnectAttr(self.dbc_h, SQL_ATTR_LOGIN_TIMEOUT, timeout, SQL_IS_UINTEGER); check_success(self, ret) # Create one connection with a connect string by calling SQLDriverConnect # and make self.dbc_h the handle of this connection # Convert the connetsytring to encoded string # so it can be converted to a ctypes c_char array object self.ansi = ansi if not ansi: c_connectString = wchar_pointer(UCS_buf(self.connectString)) odbc_func = ODBC_API.SQLDriverConnectW else: c_connectString = ctypes.c_char_p(self.connectString) odbc_func = ODBC_API.SQLDriverConnect # With unixODBC, SQLDriverConnect will intermittently fail with error: # [01000] [unixODBC][Driver Manager]Can't open lib '/path/to/so' : file not found" # or: # [01000] [unixODBC][Driver Manager]Can't open lib '/path/to/so' : (null)" # when called concurrently by more than one threads. So, we have to # use a lock to serialize the calls. By the way, the error is much # less likely to happen if ODBC Tracing is enabled, likely due to the # implicit serialization caused by writing to trace file. if ODBC_API._name != 'odbc32': try: lock.acquire() ret = odbc_func(self.dbc_h, 0, c_connectString, len(self.connectString), None, 0, None, SQL_DRIVER_NOPROMPT) finally: lock.release() else: ret = odbc_func(self.dbc_h, 0, c_connectString, len(self.connectString), None, 0, None, SQL_DRIVER_NOPROMPT) check_success(self, ret) # Set the connection's attribute of "autocommit" # self.autocommit = autocommit if self.autocommit == True: ret = ODBC_API.SQLSetConnectAttr(self.dbc_h, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_ON, SQL_IS_UINTEGER) else: ret = ODBC_API.SQLSetConnectAttr(self.dbc_h, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF, SQL_IS_UINTEGER) check_success(self, ret) # Set the connection's attribute of "readonly" # self.readonly = readonly if self.readonly == True: ret = ODBC_API.SQLSetConnectAttr(self.dbc_h, SQL_ATTR_ACCESS_MODE, SQL_MODE_READ_ONLY, SQL_IS_UINTEGER) check_success(self, ret) self.unicode_results = unicode_results self.connected = 1 self.update_db_special_info() def clear_output_converters(self): self.output_converter = {} for sqltype, profile in SQL_data_type_dict.items(): self.output_converter[sqltype] = profile[1] def add_output_converter(self, sqltype, func): self.output_converter[sqltype] = func def ConnectByDSN(self, dsn, user, passwd = ''): """Connect to odbc, we need dsn, user and optionally password""" self.dsn = dsn self.user = user self.passwd = passwd sn = create_buffer(dsn) un = create_buffer(user) pw = create_buffer(passwd) ret = ODBC_API.SQLConnect(self.dbc_h, sn, len(sn), un, len(un), pw, len(pw)) check_success(self, ret) self.update_db_special_info() self.connected = 1 def cursor(self, row_type_callable=None): #self.settimeout(self.timeout) if not self.connected: raise ProgrammingError('HY000','Attempt to use a closed connection.') cur = Cursor(self, row_type_callable=row_type_callable) # self._cursors.append(cur) return cur def update_db_special_info(self): for sql_type in ( SQL_TYPE_TIMESTAMP, SQL_TYPE_DATE, SQL_TYPE_TIME, SQL_SS_TIME2, ): cur = Cursor(self) try: info_tuple = cur.getTypeInfo(sql_type) if info_tuple is not None: self.type_size_dic[sql_type] = info_tuple[2], info_tuple[14] except: pass cur.close() self.support_SQLDescribeParam = False try: driver_name = self.getinfo(SQL_DRIVER_NAME) if any(x in driver_name for x in ('SQLSRV','ncli','libsqlncli')): self.support_SQLDescribeParam = True except: pass def commit(self): if not self.connected: raise ProgrammingError('HY000','Attempt to use a closed connection.') ret = SQLEndTran(SQL_HANDLE_DBC, self.dbc_h, SQL_COMMIT) if ret != SQL_SUCCESS: check_success(self, ret) def rollback(self): if not self.connected: raise ProgrammingError('HY000','Attempt to use a closed connection.') ret = SQLEndTran(SQL_HANDLE_DBC, self.dbc_h, SQL_ROLLBACK) if ret != SQL_SUCCESS: check_success(self, ret) def getinfo(self,infotype): if infotype not in list(aInfoTypes.keys()): raise ProgrammingError('HY000','Invalid getinfo value: '+str(infotype)) if aInfoTypes[infotype] == 'GI_UINTEGER': total_buf_len = 1000 alloc_buffer = ctypes.c_ulong() used_buf_len = c_short() ret = ODBC_API.SQLGetInfo(self.dbc_h,infotype,ADDR(alloc_buffer), total_buf_len,\ ADDR(used_buf_len)) check_success(self, ret) result = alloc_buffer.value elif aInfoTypes[infotype] == 'GI_USMALLINT': total_buf_len = 1000 alloc_buffer = ctypes.c_ushort() used_buf_len = c_short() ret = ODBC_API.SQLGetInfo(self.dbc_h,infotype,ADDR(alloc_buffer), total_buf_len,\ ADDR(used_buf_len)) check_success(self, ret) result = alloc_buffer.value else: total_buf_len = 1000 alloc_buffer = create_buffer(total_buf_len) used_buf_len = c_short() if self.ansi: API_f = ODBC_API.SQLGetInfo else: API_f = ODBC_API.SQLGetInfoW ret = API_f(self.dbc_h,infotype,ADDR(alloc_buffer), total_buf_len,\ ADDR(used_buf_len)) check_success(self, ret) if self.ansi: result = alloc_buffer.value else: result = UCS_dec(alloc_buffer) if aInfoTypes[infotype] == 'GI_YESNO': if unicode(result[0]) == unicode('Y'): result = True else: result = False return result def __exit__(self, type, value, traceback): if value: self.rollback() else: self.commit() if self.connected: self.close() def __enter__(self): return self def __del__(self): if self.connected: self.close() def close(self): if not self.connected: raise ProgrammingError('HY000','Attempt to close a closed connection.') # for cur in self._cursors: # if not cur is None: # if not cur.closed: # cur.close() if self.connected: #if DEBUG:print 'disconnect' if not self.autocommit: self.rollback() ret = ODBC_API.SQLDisconnect(self.dbc_h) check_success(self, ret) #if DEBUG:print 'free dbc' ret = ODBC_API.SQLFreeHandle(SQL_HANDLE_DBC, self.dbc_h) check_success(self, ret) # if shared_env_h.value: # #if DEBUG:print 'env' # ret = ODBC_API.SQLFreeHandle(SQL_HANDLE_ENV, shared_env_h) # check_success(shared_env_h, ret) self.connected = 0 odbc = Connection connect = odbc ''' def connect(connectString = '', autocommit = False, ansi = False, timeout = 0, unicode_results = False, readonly = False, **kargs): return odbc(connectString, autocommit, ansi, timeout, unicode_results, readonly, kargs) ''' def drivers(): if sys.platform not in ('win32','cli'): raise Exception('This function is available for use in Windows only.') try: lock.acquire() if shared_env_h is None: AllocateEnv() finally: lock.release() DriverDescription = create_buffer_u(1000) BufferLength1 = c_short(1000) DescriptionLength = c_short() DriverAttributes = create_buffer_u(1000) BufferLength2 = c_short(1000) AttributesLength = c_short() ret = SQL_SUCCESS DriverList = [] Direction = SQL_FETCH_FIRST while ret != SQL_NO_DATA: ret = ODBC_API.SQLDriversW(shared_env_h, Direction , DriverDescription , BufferLength1 , ADDR(DescriptionLength), DriverAttributes, BufferLength2, ADDR(AttributesLength)) check_success(shared_env_h, ret) DriverList.append(DriverDescription.value) if Direction == SQL_FETCH_FIRST: Direction = SQL_FETCH_NEXT return DriverList def win_create_mdb(mdb_path, sort_order = "General\0\0"): if sys.platform not in ('win32','cli'): raise Exception('This function is available for use in Windows only.') mdb_driver = [d for d in drivers() if 'Microsoft Access Driver (*.mdb' in d] if mdb_driver == []: raise Exception('Access Driver is not found.') else: driver_name = mdb_driver[0].encode('mbcs') #CREATE_DB= ctypes.windll.ODBCCP32.SQLConfigDataSource.argtypes = [ctypes.c_void_p,ctypes.c_ushort,ctypes.c_char_p,ctypes.c_char_p] if py_v3: c_Path = bytes("CREATE_DB=" + mdb_path + " " + sort_order,'mbcs') else: c_Path = "CREATE_DB=" + mdb_path + " " + sort_order ODBC_ADD_SYS_DSN = 1 ret = ctypes.windll.ODBCCP32.SQLConfigDataSource(None,ODBC_ADD_SYS_DSN,driver_name, c_Path) if not ret: raise Exception('Failed to create Access mdb file - "%s". Please check file path, permission and Access driver readiness.' %mdb_path) def win_connect_mdb(mdb_path): if sys.platform not in ('win32','cli'): raise Exception('This function is available for use in Windows only.') mdb_driver = [d for d in drivers() if 'Microsoft Access Driver (*.mdb' in d] if mdb_driver == []: raise Exception('Access Driver is not found.') else: driver_name = mdb_driver[0] return connect('Driver={'+driver_name+"};DBQ="+mdb_path, unicode_results = use_unicode, readonly = False) def win_compact_mdb(mdb_path, compacted_mdb_path, sort_order = "General\0\0"): if sys.platform not in ('win32','cli'): raise Exception('This function is available for use in Windows only.') mdb_driver = [d for d in drivers() if 'Microsoft Access Driver (*.mdb' in d] if mdb_driver == []: raise Exception('Access Driver is not found.') else: driver_name = mdb_driver[0].encode('mbcs') #COMPACT_DB= ctypes.windll.ODBCCP32.SQLConfigDataSource.argtypes = [ctypes.c_void_p,ctypes.c_ushort,ctypes.c_char_p,ctypes.c_char_p] #driver_name = "Microsoft Access Driver (*.mdb)" if py_v3: c_Path = bytes("COMPACT_DB=" + mdb_path + " " + compacted_mdb_path + " " + sort_order,'mbcs') #driver_name = bytes(driver_name,'mbcs') else: c_Path = "COMPACT_DB=" + mdb_path + " " + compacted_mdb_path + " " + sort_order ODBC_ADD_SYS_DSN = 1 ret = ctypes.windll.ODBCCP32.SQLConfigDataSource(None,ODBC_ADD_SYS_DSN,driver_name, c_Path) if not ret: raise Exception('Failed to compact Access mdb file - "%s". Please check file path, permission and Access driver readiness.' %compacted_mdb_path) def dataSources(): """Return a list with [name, descrition]""" dsn = create_buffer(1024) desc = create_buffer(1024) dsn_len = c_short() desc_len = c_short() dsn_list = {} try: lock.acquire() if shared_env_h is None: AllocateEnv() finally: lock.release() while 1: ret = ODBC_API.SQLDataSources(shared_env_h, SQL_FETCH_NEXT, \ dsn, len(dsn), ADDR(dsn_len), desc, len(desc), ADDR(desc_len)) if ret == SQL_NO_DATA_FOUND: break elif not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO): ctrl_err(SQL_HANDLE_ENV, shared_env_h, ret) else: dsn_list[dsn.value] = desc.value return dsn_list def monkey_patch_for_gevent(): import functools, gevent apply_e = gevent.get_hub().threadpool.apply_e def monkey_patch(func): @functools.wraps(func) def wrap(*args, **kwargs): #if DEBUG:print('%s called with %s %s' % (func, args, kwargs)) return apply_e(Exception, func, args, kwargs) return wrap for attr in dir(ODBC_API): if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'): setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))