Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add python DBAPI 2.0 compatibility #22

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pyhs2/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ def _get_krb_settings(self, default_host, config):
def cursor(self):
return Cursor(self.client, self.session)

def commit( self ):
"""HIve doesn't support transactions; does nothing."""
# PEP 249
pass

def rollback( self ):
"""HIve doesn't support transactions; raises NotSupportedError"""
# PEP 249
pass

def close(self):
req = TCloseSessionReq(sessionHandle=self.session)
self.client.CloseSession(req)
self.client.CloseSession(req)
216 changes: 121 additions & 95 deletions pyhs2/cursor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from TCLIService.ttypes import TOpenSessionReq, TGetTablesReq, TFetchResultsReq,\
from TCLIService.ttypes import TOpenSessionReq, TGetTablesReq, TFetchResultsReq, \
TStatusCode, TGetResultSetMetadataReq, TGetColumnsReq, TType, TTypeId, \
TExecuteStatementReq, TGetOperationStatusReq, TFetchOrientation, TCloseOperationReq, \
TCloseSessionReq, TGetSchemasReq, TGetLogReq, TCancelOperationReq, TGetCatalogsReq, TGetInfoReq

from error import Pyhs2Exception
import threading

def get_type(typeDesc):
def get_type( typeDesc ):
for ttype in typeDesc.types:
if ttype.primitiveEntry is not None:
return TTypeId._VALUES_TO_NAMES[ttype.primitiveEntry.type]
Expand All @@ -21,23 +21,23 @@ def get_type(typeDesc):
elif ttype.userDefinedTypeEntry is not None:
return ttype.userDefinedTypeEntry

def get_value(colValue):
def get_value( colValue ):
if colValue.boolVal is not None:
return colValue.boolVal.value
return colValue.boolVal.value
elif colValue.byteVal is not None:
return colValue.byteVal.value
return colValue.byteVal.value
elif colValue.i16Val is not None:
return colValue.i16Val.value
return colValue.i16Val.value
elif colValue.i32Val is not None:
return colValue.i32Val.value
return colValue.i32Val.value
elif colValue.i64Val is not None:
return colValue.i64Val.value
return colValue.i64Val.value
elif colValue.doubleVal is not None:
return colValue.doubleVal.value
return colValue.doubleVal.value
elif colValue.stringVal is not None:
return colValue.stringVal.value
return colValue.stringVal.value

class Cursor(object):
class Cursor( object ):
session = None
client = None
operationHandle = None
Expand All @@ -50,184 +50,210 @@ class Cursor(object):
_blockRequestInProgress = False
_cursorLock = None

def __init__(self, _client, sessionHandle):
def __init__( self, _client, sessionHandle ):
self.session = sessionHandle
self.client = _client
self._cursorLock = threading.RLock()

def execute(self, hql):
query = TExecuteStatementReq(self.session, statement=hql, confOverlay={})
res = self.client.ExecuteStatement(query)
"""
This read-only attribute is a list of 7-item tuples, each containing (name, type_code, display_size, internal_size, precision, scale, null_ok).
This attribute will be None for operations that do not return rows or if one of the execute methods has not been called.
"""
self._description = list()


@property
def description( self ):
# PEP 249
return self._description


def execute( self, hql ):
query = TExecuteStatementReq( self.session, statement = hql, confOverlay = {} )
res = self.client.ExecuteStatement( query )
self.operationHandle = res.operationHandle
if res.status.errorCode is not None:
raise Pyhs2Exception(res.status.errorCode, res.status.errorMessage)
raise Pyhs2Exception( res.status.errorCode, res.status.errorMessage )

def fetch(self):
cols = self.getSchema()
if cols is not None:
self._description = [( col['columnName'], col['type'], None, None, None, None, None ) for col in cols]

def fetch( self ):
rows = []
while self.hasMoreRows:
rows = rows + self.fetchSet()
return rows

def fetchSet(self):
def fetchSet( self ):
rows = []
fetchReq = TFetchResultsReq(operationHandle=self.operationHandle,
orientation=TFetchOrientation.FETCH_NEXT,
maxRows=10000)
self._fetch(rows, fetchReq)
fetchReq = TFetchResultsReq( operationHandle = self.operationHandle,
orientation = TFetchOrientation.FETCH_NEXT,
maxRows = 10000 )
self._fetch( rows, fetchReq )
return rows

def _fetchBlock(self):
def _fetchBlock( self ):
""" internal use only.
get a block of rows from the server and put in standby block.
get a block of rows from the server and put in standby block.
future enhancements:
(1) locks for multithreaded access (protect from multiple calls)
(2) allow for prefetch by use of separate thread
"""
# make sure that another block request is not standing
if self._blockRequestInProgress :
# need to wait here before returning... (TODO)
return
# need to wait here before returning... (TODO)
return

# make sure another block request has not completed meanwhile
if self._standbyBlock is not None:
return
if self._standbyBlock is not None:
return

self._blockRequestInProgress = True
fetchReq = TFetchResultsReq(operationHandle=self.operationHandle,
orientation=TFetchOrientation.FETCH_NEXT,
maxRows=self.arraysize)
self._standbyBlock = self._fetch([],fetchReq)
fetchReq = TFetchResultsReq( operationHandle = self.operationHandle,
orientation = TFetchOrientation.FETCH_NEXT,
maxRows = self.arraysize )
self._standbyBlock = self._fetch( [], fetchReq )
self._blockRequestInProgress = False
return

def fetchone(self):
""" fetch a single row. a lock object is used to assure that a single
record will be fetched and all housekeeping done properly in a
multithreaded environment.
as getting a block is currently synchronous, this also protects
against multiple block requests (but does not protect against
explicit calls to to _fetchBlock())
def fetchone( self ):
""" fetch a single row. a lock object is used to assure that a single
record will be fetched and all housekeeping done properly in a
multithreaded environment.
as getting a block is currently synchronous, this also protects
against multiple block requests (but does not protect against
explicit calls to to _fetchBlock())
"""
self._cursorLock.acquire()

# if there are available records in current block,
# return one and advance counter
if self._currentBlock is not None and self._currentRecordNum < len(self._currentBlock):
x = self._currentRecordNum
self._currentRecordNum += 1
self._cursorLock.release()
return self._currentBlock[x]
# if there are available records in current block,
# return one and advance counter
if self._currentBlock is not None and self._currentRecordNum < len( self._currentBlock ):
x = self._currentRecordNum
self._currentRecordNum += 1
self._cursorLock.release()
return self._currentBlock[x]

# if no standby block is waiting, fetch a block
if self._standbyBlock is None:
# TODO - make sure exceptions due to problems in getting the block
# of records from the server are handled properly
self._fetchBlock()
# TODO - make sure exceptions due to problems in getting the block
# of records from the server are handled properly
self._fetchBlock()

# if we still do not have a standby block (or it is empty),
# return None - no more data is available
if self._standbyBlock is None or len(self._standbyBlock)==0:
self._cursorLock.release()
return None
# if we still do not have a standby block (or it is empty),
# return None - no more data is available
if self._standbyBlock is None or len( self._standbyBlock ) == 0:
self._cursorLock.release()
return None

# move the standby to current
self._currentBlock = self._standbyBlock
self._currentBlock = self._standbyBlock
self._standbyBlock = None
self._currentRecordNum = 1

# return the first record
self._cursorLock.release()
return self._currentBlock[0]

def fetchmany(self,size=-1):
""" return a sequential set of records. This is guaranteed by locking,
so that no other thread can grab a few records while a set is fetched.
this has the side effect that other threads may have to wait for
def fetchmany( self, size = -1 ):
""" return a sequential set of records. This is guaranteed by locking,
so that no other thread can grab a few records while a set is fetched.
this has the side effect that other threads may have to wait for
an arbitrary long time for the completion of the current request.
"""
self._cursorLock.acquire()

# default value (or just checking that someone did not put a ridiculous size)
if size < 0 or size > MAX_BLOCK_SIZE:
size = self.arraysize
size = self.arraysize
recs = []
for i in range(0,size):
recs.append(self.fetchone())
for i in range( 0, size ):
recs.append( self.fetchone() )

self._cursorLock.release()
return recs

def fetchall(self):
""" returns the remainder of records from the query. This is
guaranteed by locking, so that no other thread can grab a few records
while the set is fetched. This has the side effect that other threads
may have to wait for an arbitrary long time until this query is done
before they can return (obviously with None).
def fetchall( self ):
""" returns the remainder of records from the query. This is
guaranteed by locking, so that no other thread can grab a few records
while the set is fetched. This has the side effect that other threads
may have to wait for an arbitrary long time until this query is done
before they can return (obviously with None).
"""
self._cursorLock.acquire()

recs = []
while True:
rec = self.fetchone()
if rec is None:
break
recs.append(rec)
break
recs.append( rec )

self._cursorLock.release()
return recs

def __iter__(self):
def __iter__( self ):
""" returns an iterator object. no special code needed here. """
return self

def next(self):
def next( self ):
""" iterator-protocol for fetch next row. """
row = self.fetchone()
if row is None:
raise StopIteration
raise StopIteration
return row

def getSchema(self):
def getSchema( self ):
if self.operationHandle:
req = TGetResultSetMetadataReq(self.operationHandle)
res = self.client.GetResultSetMetadata(req)
req = TGetResultSetMetadataReq( self.operationHandle )
res = self.client.GetResultSetMetadata( req )
if res.schema is not None:
cols = []
for c in self.client.GetResultSetMetadata(req).schema.columns:
for c in self.client.GetResultSetMetadata( req ).schema.columns:
col = {}
col['type'] = get_type(c.typeDesc)
col['type'] = get_type( c.typeDesc )
col['columnName'] = c.columnName
col['comment'] = c.comment
cols.append(col)
cols.append( col )
return cols
return None

def getDatabases(self):
req = TGetSchemasReq(self.session)
res = self.client.GetSchemas(req)
def getDatabases( self ):
req = TGetSchemasReq( self.session )
res = self.client.GetSchemas( req )
self.operationHandle = res.operationHandle
if res.status.errorCode is not None:
raise Pyhs2Exception(res.status.errorCode, res.status.errorMessage)
raise Pyhs2Exception( res.status.errorCode, res.status.errorMessage )
return self.fetch()

def __enter__(self):
def __enter__( self ):
return self

def __exit__(self, _exc_type, _exc_value, _traceback):
def __exit__( self, _exc_type, _exc_value, _traceback ):
self.close()

def _fetch(self, rows, fetchReq):
resultsRes = self.client.FetchResults(fetchReq)
def _fetch( self, rows, fetchReq ):
resultsRes = self.client.FetchResults( fetchReq )
for row in resultsRes.results.rows:
rowData= []
for i, col in enumerate(row.colVals):
rowData.append(get_value(col))
rows.append(rowData)
if len(resultsRes.results.rows) == 0:
rowData = []
for i, col in enumerate( row.colVals ):
rowData.append( get_value( col ) )
rows.append( rowData )
if len( resultsRes.results.rows ) == 0:
self.hasMoreRows = False
return rows

def close(self):
def close( self ):
if self.operationHandle is not None:
req = TCloseOperationReq(operationHandle=self.operationHandle)
self.client.CloseOperation(req)
req = TCloseOperationReq( operationHandle = self.operationHandle )
self.client.CloseOperation( req )


def setinputsizes( self, sizes ):
# PEP 249
pass

def setoutputsize( self, size, column = None ):
# PEP 249
pass