-
Notifications
You must be signed in to change notification settings - Fork 3
/
sqlitedict.py
252 lines (202 loc) · 7.6 KB
/
sqlitedict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# -*- coding: utf-8 -*-
#
# Modified Radim Řehůřek's sqlitedict to use generic supported types.
import sqlite3
import logging
from threading import Thread
from sys import version_info
_major_version = version_info[0]
if _major_version < 3:
if version_info[1] < 5:
raise ImportError("sqlitedict requires python 2.5 or higher (python 3.3 or higher supported)")
try:
from collections import UserDict as DictClass
except ImportError:
from UserDict import DictMixin as DictClass
try:
from queue import Queue
except ImportError:
from Queue import Queue
logger = logging.getLogger(__name__)
def open(*args, **kwargs):
return SqliteDict(*args, **kwargs)
def encode(obj, obj_type):
try:
adapted = obj_type(obj)
if adapted == obj:
return adapted
except:
pass
raise TypeError('%r can not be adapted to %s' %(obj, obj_type))
class SqliteDict(DictClass):
"""
Supported types: int, long, float, str, unicode
"""
def __init__(self, key_type=unicode, value_type=int, filename=':memory:', tablename='Dict', autocommit=False, journal_mode='DELETE'):
self._key_type, self._value_type = key_type, value_type
self.filename, self.tablename = filename, tablename
logger.info("opening Sqlite table %r in %s" % (tablename, filename))
self.conn = SqliteMultithread(filename, autocommit=autocommit, journal_mode=journal_mode)
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
self.conn.execute(MAKE_TABLE)
self.conn.commit()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def __str__(self):
return "SqliteDict(%s)" % (self.conn.filename)
def __repr__(self):
return str(self)
def __len__(self):
GET_LEN = 'SELECT COUNT(*) FROM %s' % self.tablename
rows = self.conn.select_one(GET_LEN)[0]
return rows if rows is not None else 0
def __bool__(self):
GET_MAX = 'SELECT MAX(ROWID) FROM %s' % self.tablename
m = self.conn.select_one(GET_MAX)[0]
return True if m is not None else False
def keys(self):
GET_KEYS = 'SELECT key FROM %s ORDER BY rowid' % self.tablename
return [encode(key[0], self._key_type) for key in self.conn.select(GET_KEYS)]
def values(self):
GET_VALUES = 'SELECT value FROM %s ORDER BY rowid' % self.tablename
return [encode(value[0], self._value_type) for value in self.conn.select(GET_VALUES)]
def items(self):
GET_ITEMS = 'SELECT key, value FROM %s ORDER BY rowid' % self.tablename
return [(encode(key, self._key_type), encode(value, self._value_type))
for key, value in self.conn.select(GET_ITEMS)]
def __contains__(self, key):
HAS_ITEM = 'SELECT 1 FROM %s WHERE key = ?' % self.tablename
return self.conn.select_one(HAS_ITEM, (key,)) is not None
def __getitem__(self, key):
GET_ITEM = 'SELECT value FROM %s WHERE key = ?' % self.tablename
item = self.conn.select_one(GET_ITEM, (key,))
if item is None:
raise KeyError(key)
return encode(item[0], self._value_type)
def __setitem__(self, key, value):
ADD_ITEM = 'REPLACE INTO %s (key, value) VALUES (?,?)' % self.tablename
self.conn.execute(ADD_ITEM, (encode(key, self._key_type), encode(value, self._value_type)))
def __delitem__(self, key):
if key not in self:
raise KeyError(key)
DEL_ITEM = 'DELETE FROM %s WHERE key = ?' % self.tablename
self.conn.execute(DEL_ITEM, (key,))
def update(self, items=(), **kwds):
try:
items = [(encode(k, self._key_type), encode(v, self._value_type)) for k, v in items.items()]
except AttributeError:
pass
UPDATE_ITEMS = 'REPLACE INTO %s (key, value) VALUES (?, ?)' % self.tablename
self.conn.executemany(UPDATE_ITEMS, items)
if kwds:
self.update(kwds)
def __iter__(self):
return iter(self.keys())
def clear(self):
CLEAR_ALL = 'DELETE FROM %s;' % self.tablename
self.conn.commit()
self.conn.execute(CLEAR_ALL)
self.conn.commit()
def commit(self):
if self.conn is not None:
self.conn.commit()
sync = commit
def close(self):
logger.debug("closing %s" % self)
if self.conn is not None:
if self.conn.autocommit:
self.conn.commit()
self.conn.close()
self.conn = None
if self.in_temp:
try:
os.remove(self.filename)
except:
pass
def terminate(self):
self.close()
if self.filename == ':memory:':
return
logger.info("deleting %s" % self.filename)
try:
os.remove(self.filename)
except IOError:
_, e, _ = sys.exc_info()
logger.warning("failed to delete %s: %s" % (self.filename, str(e)))
def __del__(self):
try:
if self.conn is not None:
if self.conn.autocommit:
self.conn.commit()
self.conn.close()
self.conn = None
if self.in_temp:
os.remove(self.filename)
except:
pass
if _major_version == 2:
setattr(SqliteDict, "iterkeys", lambda self: self.keys())
setattr(SqliteDict, "itervalues", lambda self: self.values())
setattr(SqliteDict, "iteritems", lambda self: self.items())
SqliteDict.__nonzero__ = SqliteDict.__bool__
del SqliteDict.__bool__
class SqliteMultithread(Thread):
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
self.reqs = Queue()
self.setDaemon(True)
self.start()
def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
cursor.execute('PRAGMA synchronous=OFF')
cursor.execute('PRAGMA count_changes=OFF')
cursor.execute('PRAGMA temp_store=MEMORY')
while True:
req, arg, res = self.reqs.get()
if req == '--close--':
break
elif req == '--commit--':
conn.commit()
else:
cursor.execute(req, arg)
if res:
for rec in cursor:
res.put(rec)
res.put('--no more--')
if self.autocommit:
conn.commit()
conn.close()
def execute(self, req, arg=None, res=None):
self.reqs.put((req, arg or tuple(), res))
def executemany(self, req, items):
for item in items:
self.execute(req, item)
def select(self, req, arg=None):
res = Queue()
self.execute(req, arg, res)
while True:
rec = res.get()
if rec == '--no more--':
break
yield rec
def select_one(self, req, arg=None):
try:
return next(iter(self.select(req, arg)))
except StopIteration:
return None
def commit(self):
self.execute('--commit--')
def close(self):
self.execute('--close--')
self.join()