DatDB is now sqlite base.
@@ -6,7 +6,7 @@ | ||
6 | 6 | DBDir = "db" |
7 | 7 | DatQueryFile = os.path.join(DBDir, "datq.pkl") |
8 | 8 | KeyQueryFile = os.path.join(DBDir, "keyq.pkl") |
9 | -DatDBFile = os.path.join(DBDir, "datdb.pkl") | |
9 | +DatDBFile = os.path.join(DBDir, "datdb.db") | |
10 | 10 | KeyDBFile = os.path.join(DBDir, "keydb.pkl") |
11 | 11 | NodeDBFile = os.path.join(DBDir, "nodedb.pkl") |
12 | 12 | ProfileFile = os.path.join(DBDir, "profile.pkl") |
@@ -9,6 +9,8 @@ | ||
9 | 9 | import gzip |
10 | 10 | import random |
11 | 11 | import time |
12 | +import sqlite3 | |
13 | +from binascii import unhexlify, hexlify | |
12 | 14 | |
13 | 15 | from o2on_const import regHosts, DatQueryFile, DatDBFile |
14 | 16 | import o2on_config |
@@ -159,16 +161,15 @@ | ||
159 | 161 | self.glob = g |
160 | 162 | self.lock = threading.Lock() |
161 | 163 | with self.lock: |
162 | - self.hashmap = {} | |
163 | - self.boardmap = {} | |
164 | - self.publishmap = {} | |
165 | 164 | self.need_rebuild = False |
166 | - self.load() | |
167 | - if len(self.hashmap) == 0: | |
165 | + if len(self)==0: | |
168 | 166 | self.need_rebuild = True |
169 | 167 | def __len__(self): |
168 | + if(not os.path.isfile(DatDBFile)): return 0 | |
170 | 169 | with self.lock: |
171 | - return len(self.hashmap) | |
170 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
171 | + c = sqlite_conn.execute('SELECT COUNT(*) FROM dattbl') | |
172 | + return c.fetchone()[0] | |
172 | 173 | def checkrebuild(self): |
173 | 174 | with self.lock: |
174 | 175 | tmp = self.need_rebuild |
@@ -179,90 +180,108 @@ | ||
179 | 180 | self.glob.logger.log("DATDB","Generated DatDB") |
180 | 181 | with self.lock: |
181 | 182 | self.need_rebuild = False |
183 | + def makeDat(self, col): | |
184 | + if col: | |
185 | + dat = Dat(col[0]) | |
186 | + dat.published = col[1] | |
187 | + return dat | |
188 | + return None | |
182 | 189 | def getRandomInBoard(self,board): |
183 | - if board in self.boardmap: | |
184 | - h = random.choice(self.boardmap[board]) | |
185 | - return self.hashmap[h] | |
186 | - return None | |
190 | + with self.lock: | |
191 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
192 | + c = sqlite_conn.execute('SELECT datpath, published FROM dattbl WHERE board = ? ORDER BY RANDOM() LIMIT 1', (board,)) | |
193 | + return self.makeDat(c.fetchone()) | |
187 | 194 | def choice(self): |
188 | - return self.hashmap[random.choice(self.hashmap.keys())] | |
195 | + with self.lock: | |
196 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
197 | + c = sqlite_conn.execute('SELECT datpath,published FROM dattbl ORDER BY RANDOM() LIMIT 1') | |
198 | + return self.makeDat(c.fetchone()) | |
189 | 199 | def get(self,x): |
190 | 200 | with self.lock: |
191 | - return self.hashmap.get(x) | |
201 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
202 | + c = sqlite_conn.execute('SELECT datpath, published FROM dattbl WHERE hash = ?', (hexlify(x),)) | |
203 | + return self.makeDat(c.fetchone()) | |
192 | 204 | def has_keyhash(self,key): |
193 | 205 | with self.lock: |
194 | - return key in self.hashmap | |
206 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
207 | + c = sqlite_conn.execute('SELECT COUNT(*) FROM dattbl WHERE hash = ?', (key,)) | |
208 | + return c.fetchone()[0]==1 | |
195 | 209 | def add_dat(self, dat): |
196 | 210 | with self.lock: |
197 | - befdat = self.hashmap.get(dat.hash()) | |
198 | - self.hashmap[dat.hash()] = dat | |
199 | - if not dat.fullboard() in self.boardmap: | |
200 | - self.boardmap[dat.fullboard()] = [] | |
201 | - self.boardmap[dat.fullboard()].append(dat.hash()) | |
202 | - if not befdat: | |
203 | - dat.published = int(time.time()) | |
204 | - if dat.published not in self.publishmap: | |
205 | - self.publishmap[dat.published]=[] | |
206 | - self.publishmap[dat.published].append(dat.hash()) | |
207 | - else: | |
208 | - dat.published = befdat.published | |
209 | - def add(self, path, data, start=0): | |
211 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
212 | + c = sqlite_conn.cursor() | |
213 | + c.execute('SELECT datpath, published FROM dattbl WHERE hash = ?', | |
214 | + (hexlify(dat.hash()),)) | |
215 | + befdat = self.makeDat(c.fetchone()) | |
216 | + if not befdat: dat.published = int(time.time()) | |
217 | + else: dat.published = befdat.published | |
218 | + c.execute('REPLACE INTO dattbl VALUES(?, ?, ?, ?)', | |
219 | + (dat.path(), hexlify(dat.hash()), dat.fullboard(), dat.published)) | |
220 | + try: c.execute('COMMIT') | |
221 | + except sqlite3.OperationalError: pass | |
222 | + def add(self, path, data, start=0): | |
210 | 223 | dat = Dat(path) |
211 | 224 | if dat.save(data, start): self.add_dat(dat) |
212 | 225 | def published(self, datid, publish_time): |
213 | 226 | if len(datid) != 20: raise Exception |
214 | 227 | with self.lock: |
215 | - if datid not in self.hashmap: raise Exception | |
216 | - dat = self.hashmap[datid] | |
217 | - self.publishmap[dat.published].remove(datid) | |
228 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
229 | + c = sqlite_conn.cursor() | |
230 | + c.execute('SELECT datpath, published FROM dattbl WHERE hash = ?', (hexlify(datid),)) | |
231 | + dat = self.makeDat(c.fetchone()) | |
232 | + if not dat: raise Exception | |
218 | 233 | dat.published = publish_time |
219 | - if publish_time not in self.publishmap: self.publishmap[publish_time]=[] | |
220 | - self.publishmap[publish_time].append(datid) | |
234 | + c.execute('UPDATE dattbl SET published = ? WHERE hash = ?', (publish_time,hexlify(datid),)) | |
235 | + try: c.execute('COMMIT') | |
236 | + except sqlite3.OperationalError: pass | |
221 | 237 | def dat_to_publish(self, last_published_before, limit): |
222 | 238 | res = [] |
223 | 239 | if limit == 0: return res |
224 | - for x in sorted(self.publishmap.keys()): | |
225 | - for y in self.publishmap[x]: | |
226 | - res.append(self.hashmap[y]) | |
227 | - limit -= 1 | |
228 | - if limit == 0: return res | |
229 | - return res | |
240 | + with self.lock: | |
241 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
242 | + c=sqlite_conn.execute('SELECT datpath, published FROM dattbl '+ | |
243 | + 'WHERE published < ? ORDER BY published DESC LIMIT ?', | |
244 | + (last_published_before,limit)) | |
245 | + while True: | |
246 | + dat = self.makeDat(c.fetchone()) | |
247 | + if not dat: return res | |
248 | + res.append(dat) | |
249 | + return res | |
230 | 250 | def generate(self): |
251 | + reghost = re.compile(regHosts+'$') | |
252 | + regnumdir = re.compile('^\d{4}$') | |
231 | 253 | regdat = re.compile('^(\d+)\.dat(?:\.gz)?$') |
232 | - sep = re.escape(os.sep) | |
233 | - regdatdir = re.compile(regHosts+sep+'(.+)'+sep+'\d{4}$') | |
234 | 254 | with self.lock: |
235 | - self.hashmap = {} | |
236 | - self.boardmap = {} | |
237 | - self.publishmap = {0:[]} | |
238 | - for root, dirs, files in os.walk(o2on_config.DatDir): | |
239 | - for f in files: | |
240 | - m1 = regdat.match(f) | |
241 | - if not m1: continue | |
242 | - m2 = regdatdir.search(root) | |
243 | - if not m2: continue | |
244 | - path = m2.group(1)+"/"+m2.group(2)+"/"+m1.group(1) | |
245 | - d = Dat(path) | |
255 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
256 | + c = sqlite_conn.cursor() | |
257 | + try: c.execute('DROP TABLE dattbl') | |
258 | + except sqlite3.OperationalError: pass | |
259 | + c.execute('CREATE TABLE dattbl(datpath, hash PRIMARY KEY, board, published)') | |
260 | + for h in os.listdir(o2on_config.DatDir): | |
261 | + if not reghost.match(h): continue | |
262 | + for b in os.listdir(os.path.join(o2on_config.DatDir, h)): | |
246 | 263 | with self.lock: |
247 | - self.hashmap[d.hash()] = d | |
248 | - if not d.fullboard() in self.boardmap: | |
249 | - self.boardmap[d.fullboard()] = [] | |
250 | - self.boardmap[d.fullboard()].append(d.hash()) | |
251 | - self.publishmap[0].append(d.hash()) | |
252 | - self.glob.logger.log("DATDB", "added %s" % path) | |
264 | + for d in os.listdir(os.path.join(o2on_config.DatDir, h, b)): | |
265 | + if not regnumdir.match(d): continue | |
266 | + for f in os.listdir(os.path.join(o2on_config.DatDir, h, b, d)): | |
267 | + m = regdat.match(f) | |
268 | + if not m: continue | |
269 | + path = h+"/"+b+"/"+m.group(1) | |
270 | + dat = Dat(path) | |
271 | + try: | |
272 | + c.execute('INSERT OR IGNORE INTO dattbl VALUES(?, ?, ?, ?)', | |
273 | + (path, hexlify(dat.hash()), dat.fullboard(), 0)) | |
274 | + except sqlite3.IntegrityError: | |
275 | + raise Exception("dup hash %s %s" % (hexlify(dat.hash()), path)) | |
276 | + self.glob.logger.log("DATDB", "added %s" % path) | |
277 | + try: c.execute('COMMIT') | |
278 | + except sqlite3.OperationalError: pass | |
253 | 279 | def load(self): |
254 | - if(os.path.isfile(DatDBFile)): | |
255 | - pkl_file = open(DatDBFile,"rb") | |
256 | - with self.lock: | |
257 | - self.hashmap = cPickle.load(pkl_file) | |
258 | - self.boardmap = cPickle.load(pkl_file) | |
259 | - self.publishmap = cPickle.load(pkl_file) | |
260 | - pkl_file.close() | |
280 | + pass | |
261 | 281 | def save(self): |
262 | - pkl_file = open(DatDBFile,"wb") | |
263 | 282 | with self.lock: |
264 | - cPickle.dump(self.hashmap, pkl_file,-1) | |
265 | - cPickle.dump(self.boardmap, pkl_file,-1) | |
266 | - cPickle.dump(self.publishmap, pkl_file,-1) | |
267 | - pkl_file.close() | |
283 | + sqlite_conn = sqlite3.connect(DatDBFile) | |
284 | + try: sqlite_conn.execute('COMMIT') | |
285 | + except sqlite3.OperationalError: pass | |
286 | + | |
268 | 287 |
@@ -25,11 +25,12 @@ | ||
25 | 25 | import o2on_im |
26 | 26 | |
27 | 27 | def showstat(args): |
28 | + n = (len(glob.nodedb), len(glob.datquery), len(glob.datdb), len(glob.keydb)) | |
28 | 29 | glob.logger.begin() |
29 | - glob.logger.log("GLOBAL", "nodes %d" % len(glob.nodedb)) | |
30 | - glob.logger.log("GLOBAL", "datquery %d" % len(glob.datquery)) | |
31 | - glob.logger.log("GLOBAL", "dat %d" % len(glob.datdb)) | |
32 | - glob.logger.log("GLOBAL", "key %d" % len(glob.keydb)) | |
30 | + glob.logger.log("GLOBAL", "nodes %d" % n[0]) | |
31 | + glob.logger.log("GLOBAL", "datquery %d" % n[1]) | |
32 | + glob.logger.log("GLOBAL", "dat %d" % n[2]) | |
33 | + glob.logger.log("GLOBAL", "key %d" % n[3]) | |
33 | 34 | glob.logger.end() |
34 | 35 | |
35 | 36 | def showmynode(args): |