Coverage for src/wiktextract/thesaurus.py: 61%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-25 10:11 +0000

1# Extracting information from thesaurus pages in Wiktionary. The data will be 

2# merged into word linkages in later stages. 

3# 

4# Copyright (c) 2021 Tatu Ylonen. See file LICENSE and https://ylonen.org 

5import os 

6import sqlite3 

7import tempfile 

8import time 

9import traceback 

10from collections.abc import Iterable 

11from dataclasses import dataclass, field 

12from multiprocessing import Pool, current_process 

13from pathlib import Path 

14from typing import Optional, TextIO 

15 

16from mediawiki_langcodes import code_to_name 

17from wikitextprocessor import Page 

18from wikitextprocessor.core import CollatedErrorReturnData, NamespaceDataEntry 

19 

20from .import_utils import import_extractor_module 

21from .wxr_context import WiktextractContext 

22from .wxr_logging import logger 

23 

24 

25@dataclass 

26class ThesaurusTerm: 

27 entry: str 

28 language_code: str 

29 pos: str 

30 linkage: str 

31 term: str 

32 tags: list[str] = field(default_factory=list) 

33 raw_tags: list[str] = field(default_factory=list) 

34 topics: list[str] = field(default_factory=list) 

35 roman: str = "" 

36 entry_id: int = 0 

37 sense: str = "" 

38 

39 

40def worker_func( 

41 page: Page, 

42) -> tuple[bool, list[ThesaurusTerm], CollatedErrorReturnData, Optional[str]]: 

43 wxr: WiktextractContext = worker_func.wxr # type:ignore[attr-defined] 

44 with tempfile.TemporaryDirectory(prefix="wiktextract") as tmpdirname: 

45 debug_path = "{}/wiktextract-{}".format(tmpdirname, os.getpid()) 

46 with open(debug_path, "w", encoding="utf-8") as f: 

47 f.write(page.title + "\n") 

48 

49 wxr.wtp.start_page(page.title) 

50 try: 

51 terms = extract_thesaurus_page(wxr, page) 

52 return True, terms, wxr.wtp.to_return(), None 

53 except Exception as e: 

54 lst = traceback.format_exception( 

55 type(e), value=e, tb=e.__traceback__ 

56 ) 

57 msg = ( 

58 '=== EXCEPTION while parsing page "{}":\n ' 

59 "in process {}".format( 

60 page.title, 

61 current_process().name, 

62 ) 

63 + "".join(lst) 

64 ) 

65 return False, [], {}, msg # type:ignore[typeddict-item] 

66 

67 

68def extract_thesaurus_page( 

69 wxr: WiktextractContext, page: Page 

70) -> list[ThesaurusTerm]: 

71 thesaurus_extractor_mod = import_extractor_module( 

72 wxr.wtp.lang_code, "thesaurus" 

73 ) 

74 return thesaurus_extractor_mod.extract_thesaurus_page(wxr, page) 

75 

76 

77def extract_thesaurus_data( 

78 wxr: WiktextractContext, num_processes: Optional[int] = None 

79) -> None: 

80 from .wiktionary import init_worker_process 

81 

82 start_t = time.time() 

83 logger.info("Extracting thesaurus data") 

84 thesaurus_ns_data: NamespaceDataEntry = wxr.wtp.NAMESPACE_DATA.get( 

85 "Thesaurus", 

86 {}, # type:ignore[typeddict-item] 

87 ) 

88 thesaurus_ns_id = thesaurus_ns_data.get("id", 0) 

89 

90 wxr.remove_unpicklable_objects() 

91 with Pool(num_processes, init_worker_process, (worker_func, wxr)) as pool: 

92 wxr.reconnect_databases(False) 

93 for success, terms, stats, err in pool.imap_unordered( 

94 worker_func, wxr.wtp.get_all_pages([thesaurus_ns_id], False) 

95 ): 

96 if not success: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was never true

97 # Print error in parent process - do not remove 

98 logger.error(err) 

99 continue 

100 for term in terms: 

101 insert_thesaurus_term(wxr.thesaurus_db_conn, term) # type:ignore[arg-type] 

102 wxr.config.merge_return(stats) 

103 

104 wxr.thesaurus_db_conn.commit() # type:ignore[union-attr] 

105 num_pages = wxr.wtp.saved_page_nums([thesaurus_ns_id], False) 

106 total = thesaurus_linkage_number(wxr.thesaurus_db_conn) # type:ignore[arg-type] 

107 logger.info( 

108 "Extracted {} linkages from {} thesaurus pages (took {:.1f}s)".format( 

109 total, num_pages, time.time() - start_t 

110 ) 

111 ) 

112 

113 

114def init_thesaurus_db(db_path: Path) -> sqlite3.Connection: 

115 conn = sqlite3.connect(db_path) 

116 conn.executescript( 

117 """ 

118 CREATE TABLE IF NOT EXISTS entries ( 

119 id INTEGER PRIMARY KEY, 

120 entry TEXT, 

121 pos TEXT, 

122 language_code TEXT, 

123 sense TEXT 

124 ); 

125 CREATE UNIQUE INDEX IF NOT EXISTS entries_index 

126 ON entries(entry, pos, language_code); 

127 

128 CREATE TABLE IF NOT EXISTS terms ( 

129 term TEXT, 

130 entry_id INTEGER, 

131 linkage TEXT, -- Synonyms, Hyponyms 

132 tags TEXT, 

133 raw_tags TEXT, 

134 topics TEXT, 

135 roman TEXT, -- Romanization 

136 PRIMARY KEY(term, entry_id), 

137 FOREIGN KEY(entry_id) REFERENCES entries(id) 

138 ); 

139 

140 PRAGMA journal_mode = WAL; 

141 PRAGMA foreign_keys = ON; 

142 """ 

143 ) 

144 return conn 

145 

146 

147def thesaurus_linkage_number(db_conn: sqlite3.Connection) -> int: 

148 for (r,) in db_conn.execute("SELECT count(*) FROM terms"): 148 ↛ 150line 148 didn't jump to line 150 because the loop on line 148 didn't complete

149 return r 

150 return 0 

151 

152 

153def search_thesaurus( 

154 db_conn: sqlite3.Connection, 

155 entry: str, 

156 lang_code: str, 

157 pos: str, 

158 linkage_type: Optional[str] = None, 

159) -> Iterable[ThesaurusTerm]: 

160 query_sql = """ 

161 SELECT term, entries.id, linkage, tags, topics, roman, sense, raw_tags 

162 FROM terms JOIN entries ON terms.entry_id = entries.id 

163 WHERE entry = ? AND language_code = ? AND pos = ? 

164 """ 

165 query_value: tuple[str, ...] = (entry, lang_code, pos) 

166 if linkage_type is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 query_sql += " AND linkage = ?" 

168 query_value += (linkage_type,) 

169 

170 for r in db_conn.execute(query_sql, query_value): 

171 yield ThesaurusTerm( 

172 term=r[0], 

173 entry_id=r[1], 

174 linkage=r[2], 

175 tags=r[3].split("|") if len(r[3]) > 0 else [], 

176 topics=r[4].split("|") if len(r[4]) > 0 else [], 

177 roman=r[5], 

178 sense=r[6], 

179 entry=entry, 

180 pos=pos, 

181 language_code=lang_code, 

182 raw_tags=r[7].split("|") if len(r[7]) > 0 else [], 

183 ) 

184 

185 

186def insert_thesaurus_term( 

187 db_conn: sqlite3.Connection, term: ThesaurusTerm 

188) -> None: 

189 entry_id = None 

190 for (new_entry_id,) in db_conn.execute( 

191 "INSERT OR IGNORE INTO entries (entry, language_code, pos, sense) " 

192 "VALUES(?, ?, ?, ?) RETURNING id", 

193 (term.entry, term.language_code, term.pos, term.sense), 

194 ): 

195 entry_id = new_entry_id 

196 if entry_id is None: 

197 for (old_entry_id,) in db_conn.execute( 

198 "SELECT id FROM entries WHERE entry = ? AND language_code = ? " 

199 "AND pos = ?", 

200 (term.entry, term.language_code, term.pos), 

201 ): 

202 entry_id = old_entry_id 

203 db_conn.execute( 

204 """ 

205 INSERT OR IGNORE INTO terms 

206 (term, entry_id, linkage, tags, topics, roman, raw_tags) 

207 VALUES(?, ?, ?, ?, ?, ?, ?) 

208 """, 

209 ( 

210 term.term, 

211 entry_id, 

212 term.linkage, 

213 "|".join(term.tags), 

214 "|".join(term.topics), 

215 term.roman, 

216 "|".join(term.raw_tags), 

217 ), 

218 ) 

219 

220 

221def close_thesaurus_db(db_path: Path, db_conn: sqlite3.Connection) -> None: 

222 db_conn.close() 

223 if db_path.parent.samefile(Path(tempfile.gettempdir())): 223 ↛ exitline 223 didn't return from function 'close_thesaurus_db' because the condition on line 223 was always true

224 db_path.unlink(True) 

225 

226 

227def emit_words_in_thesaurus( 

228 wxr: WiktextractContext, 

229 emitted: set[tuple[str, str, str]], 

230 out_f: TextIO, 

231 human_readable: bool, 

232) -> None: 

233 # Emit words that occur in thesaurus as main words but for which 

234 # Wiktionary has no word in the main namespace. This seems to happen 

235 # sometimes. 

236 from .wiktionary import write_json_data 

237 

238 logger.info("Emitting words that only occur in thesaurus") 

239 for entry_id, entry, pos, lang_code, sense in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr] 

240 "SELECT id, entry, pos, language_code, sense FROM entries " 

241 "WHERE pos IS NOT NULL AND language_code IS NOT NULL" 

242 ): 

243 if (entry, lang_code, pos) in emitted: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true

244 continue 

245 

246 if None in (entry, lang_code, pos): 

247 logger.info( 

248 f"'None' in entry, lang_code or" 

249 f" pos: {entry}, {lang_code}, {pos}" 

250 ) 

251 continue 

252 

253 logger.info( 

254 "Emitting thesaurus entry for " 

255 f"{entry}/{lang_code}/{pos} (not in main)" 

256 ) 

257 

258 sense_dict = dict() 

259 

260 if sense: 

261 sense_dict["glosses"] = [sense] 

262 

263 for ( 

264 term, 

265 linkage, 

266 tags, 

267 topics, 

268 roman, 

269 raw_tags, 

270 ) in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr] 

271 """ 

272 SELECT term, linkage, tags, topics, roman, raw_tags 

273 FROM terms WHERE entry_id = ? 

274 """, 

275 (entry_id,), 

276 ): 

277 relation_dict = {"word": term, "source": f"Thesaurus:{entry}"} 

278 if len(tags) > 0: 

279 relation_dict["tags"] = tags.split("|") 

280 if len(topics) > 0: 

281 relation_dict["topics"] = topics.split("|") 

282 if len(raw_tags) > 0: 

283 relation_dict["raw_tags"] = raw_tags.split("|") 

284 if linkage not in sense_dict: 

285 sense_dict[linkage] = [] 

286 sense_dict[linkage].append(relation_dict) 

287 

288 if "glosses" not in sense_dict: 

289 sense_dict["tags"] = ["no-gloss"] 

290 

291 entry = { 

292 "word": entry, 

293 "lang": code_to_name(lang_code, "en"), 

294 "lang_code": lang_code, 

295 "pos": pos, 

296 "senses": [sense_dict] if sense_dict else [], 

297 "source": "thesaurus", 

298 } 

299 entry = {k: v for k, v in entry.items() if v} 

300 write_json_data(entry, out_f, human_readable)