Coverage for src/wiktextract/thesaurus.py: 69%

127 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 10:14 +0000

1# Extracting information from thesaurus pages in Wiktionary. The data will be 

2# merged into word linkages in later stages. 

3# 

4# Copyright (c) 2021 Tatu Ylonen. See file LICENSE and https://ylonen.org 

5import atexit 

6import os 

7import sqlite3 

8import tempfile 

9import time 

10from collections.abc import Iterable 

11from concurrent.futures import ProcessPoolExecutor 

12from copy import deepcopy 

13from dataclasses import dataclass, field 

14from multiprocessing import current_process, get_all_start_methods, get_context 

15from pathlib import Path 

16from traceback import format_exc 

17from typing import Optional, TextIO 

18 

19from mediawiki_langcodes import code_to_name 

20from wikitextprocessor import Page 

21from wikitextprocessor.core import CollatedErrorReturnData, NamespaceDataEntry 

22 

23from .import_utils import import_extractor_module 

24from .wxr_context import WiktextractContext 

25from .wxr_logging import logger 

26 

27 

28@dataclass 

29class ThesaurusTerm: 

30 entry: str 

31 language_code: str 

32 pos: str 

33 linkage: str 

34 term: str 

35 tags: list[str] = field(default_factory=list) 

36 raw_tags: list[str] = field(default_factory=list) 

37 topics: list[str] = field(default_factory=list) 

38 roman: str = "" 

39 entry_id: int = 0 

40 sense: str = "" 

41 

42 

43def init_worker(wxr: WiktextractContext) -> None: 

44 global worker_wxr 

45 worker_wxr = wxr 

46 worker_wxr.reconnect_databases() 

47 atexit.register(worker_wxr.remove_unpicklable_objects) 

48 

49 

50def worker_func( 

51 page: Page, 

52) -> tuple[bool, list[ThesaurusTerm], CollatedErrorReturnData, Optional[str]]: 

53 with tempfile.TemporaryDirectory(prefix="wiktextract") as tmpdirname: 

54 debug_path = "{}/wiktextract-{}".format(tmpdirname, os.getpid()) 

55 with open(debug_path, "w", encoding="utf-8") as f: 

56 f.write(page.title + "\n") 

57 

58 worker_wxr.wtp.start_page(page.title) 

59 try: 

60 terms = extract_thesaurus_page(worker_wxr, page) 

61 return True, terms, worker_wxr.wtp.to_return(), None 

62 except Exception: 

63 msg = ( 

64 '=== EXCEPTION while parsing page "{}":\n in process {}'.format( 

65 page.title, 

66 current_process().name, 

67 ) 

68 + format_exc() 

69 ) 

70 return False, [], {}, msg # type:ignore[typeddict-item] 

71 

72 

73def extract_thesaurus_page( 

74 wxr: WiktextractContext, page: Page 

75) -> list[ThesaurusTerm]: 

76 thesaurus_extractor_mod = import_extractor_module( 

77 wxr.wtp.lang_code, "thesaurus" 

78 ) 

79 return thesaurus_extractor_mod.extract_thesaurus_page(wxr, page) 

80 

81 

82def extract_thesaurus_data( 

83 wxr: WiktextractContext, num_processes: Optional[int] = None 

84) -> None: 

85 start_t = time.time() 

86 logger.info("Extracting thesaurus data") 

87 thesaurus_ns_data: NamespaceDataEntry = wxr.wtp.NAMESPACE_DATA.get( 

88 "Thesaurus", 

89 {}, # type:ignore[typeddict-item] 

90 ) 

91 thesaurus_ns_id = thesaurus_ns_data.get("id", 0) 

92 

93 wxr.remove_unpicklable_objects() 

94 with ProcessPoolExecutor( 

95 max_workers=num_processes, 

96 mp_context=get_context( 

97 "forkserver" if "forkserver" in get_all_start_methods() else "spawn" 

98 ), 

99 initializer=init_worker, 

100 initargs=(deepcopy(wxr),), 

101 ) as executor: 

102 wxr.reconnect_databases() 

103 for success, terms, stats, err in executor.map( 

104 worker_func, 

105 wxr.wtp.get_all_pages([thesaurus_ns_id], False), 

106 chunksize=100, # default is 1 too slow 

107 ): 

108 if not success: 108 ↛ 110line 108 didn't jump to line 110 because the condition on line 108 was never true

109 # Print error in parent process - do not remove 

110 logger.error(err) 

111 continue 

112 for term in terms: 

113 insert_thesaurus_term(wxr.thesaurus_db_conn, term) # type:ignore[arg-type] 

114 wxr.config.merge_return(stats) 

115 

116 wxr.thesaurus_db_conn.commit() # type:ignore[union-attr] 

117 num_pages = wxr.wtp.saved_page_nums([thesaurus_ns_id], False) 

118 total = thesaurus_linkage_number(wxr.thesaurus_db_conn) # type:ignore[arg-type] 

119 logger.info( 

120 "Extracted {} linkages from {} thesaurus pages (took {:.1f}s)".format( 

121 total, num_pages, time.time() - start_t 

122 ) 

123 ) 

124 

125 

126def init_thesaurus_db(db_path: Path) -> sqlite3.Connection: 

127 conn = sqlite3.connect(db_path) 

128 conn.executescript( 

129 """ 

130 CREATE TABLE IF NOT EXISTS entries ( 

131 id INTEGER PRIMARY KEY, 

132 entry TEXT, 

133 pos TEXT, 

134 language_code TEXT, 

135 sense TEXT 

136 ); 

137 CREATE UNIQUE INDEX IF NOT EXISTS entries_index 

138 ON entries(entry, pos, language_code); 

139 

140 CREATE TABLE IF NOT EXISTS terms ( 

141 term TEXT, 

142 entry_id INTEGER, 

143 linkage TEXT, -- Synonyms, Hyponyms 

144 tags TEXT, 

145 raw_tags TEXT, 

146 topics TEXT, 

147 roman TEXT, -- Romanization 

148 PRIMARY KEY(term, entry_id), 

149 FOREIGN KEY(entry_id) REFERENCES entries(id) 

150 ); 

151 

152 PRAGMA journal_mode = WAL; 

153 PRAGMA foreign_keys = ON; 

154 """ 

155 ) 

156 return conn 

157 

158 

159def thesaurus_linkage_number(db_conn: sqlite3.Connection) -> int: 

160 for (r,) in db_conn.execute("SELECT count(*) FROM terms"): 160 ↛ 162line 160 didn't jump to line 162 because the loop on line 160 didn't complete

161 return r 

162 return 0 

163 

164 

165def search_thesaurus( 

166 db_conn: sqlite3.Connection, 

167 entry: str, 

168 lang_code: str, 

169 pos: str, 

170 linkage_type: Optional[str] = None, 

171) -> Iterable[ThesaurusTerm]: 

172 query_sql = """ 

173 SELECT term, entries.id, linkage, tags, topics, roman, sense, raw_tags 

174 FROM terms JOIN entries ON terms.entry_id = entries.id 

175 WHERE entry = ? AND language_code = ? AND pos = ? 

176 """ 

177 query_value: tuple[str, ...] = (entry, lang_code, pos) 

178 if linkage_type is not None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 query_sql += " AND linkage = ?" 

180 query_value += (linkage_type,) 

181 

182 for r in db_conn.execute(query_sql, query_value): 

183 yield ThesaurusTerm( 

184 term=r[0], 

185 entry_id=r[1], 

186 linkage=r[2], 

187 tags=r[3].split("|") if len(r[3]) > 0 else [], 

188 topics=r[4].split("|") if len(r[4]) > 0 else [], 

189 roman=r[5], 

190 sense=r[6], 

191 entry=entry, 

192 pos=pos, 

193 language_code=lang_code, 

194 raw_tags=r[7].split("|") if len(r[7]) > 0 else [], 

195 ) 

196 

197 

198def insert_thesaurus_term( 

199 db_conn: sqlite3.Connection, term: ThesaurusTerm 

200) -> None: 

201 entry_id = None 

202 for (new_entry_id,) in db_conn.execute( 

203 "INSERT OR IGNORE INTO entries (entry, language_code, pos, sense) " 

204 "VALUES(?, ?, ?, ?) RETURNING id", 

205 (term.entry, term.language_code, term.pos, term.sense), 

206 ): 

207 entry_id = new_entry_id 

208 if entry_id is None: 

209 for (old_entry_id,) in db_conn.execute( 

210 "SELECT id FROM entries WHERE entry = ? AND language_code = ? " 

211 "AND pos = ?", 

212 (term.entry, term.language_code, term.pos), 

213 ): 

214 entry_id = old_entry_id 

215 db_conn.execute( 

216 """ 

217 INSERT OR IGNORE INTO terms 

218 (term, entry_id, linkage, tags, topics, roman, raw_tags) 

219 VALUES(?, ?, ?, ?, ?, ?, ?) 

220 """, 

221 ( 

222 term.term, 

223 entry_id, 

224 term.linkage, 

225 "|".join(term.tags), 

226 "|".join(term.topics), 

227 term.roman, 

228 "|".join(term.raw_tags), 

229 ), 

230 ) 

231 

232 

233def close_thesaurus_db(db_path: Path, db_conn: sqlite3.Connection) -> None: 

234 db_conn.close() 

235 if db_path.parent.samefile(Path(tempfile.gettempdir())): 235 ↛ exitline 235 didn't return from function 'close_thesaurus_db' because the condition on line 235 was always true

236 db_path.unlink(True) 

237 

238 

239def emit_words_in_thesaurus( 

240 wxr: WiktextractContext, 

241 emitted: set[tuple[str, str, str]], 

242 out_f: TextIO, 

243 human_readable: bool, 

244) -> None: 

245 # Emit words that occur in thesaurus as main words but for which 

246 # Wiktionary has no word in the main namespace. This seems to happen 

247 # sometimes. 

248 from .wiktionary import write_json_data 

249 

250 logger.info("Emitting words that only occur in thesaurus") 

251 for entry_id, entry, pos, lang_code, sense in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr] 

252 "SELECT id, entry, pos, language_code, sense FROM entries " 

253 "WHERE pos IS NOT NULL AND language_code IS NOT NULL" 

254 ): 

255 if (entry, lang_code, pos) in emitted: 255 ↛ 258line 255 didn't jump to line 258 because the condition on line 255 was always true

256 continue 

257 

258 if None in (entry, lang_code, pos): 

259 logger.info( 

260 f"'None' in entry, lang_code or" 

261 f" pos: {entry}, {lang_code}, {pos}" 

262 ) 

263 continue 

264 

265 logger.info( 

266 "Emitting thesaurus entry for " 

267 f"{entry}/{lang_code}/{pos} (not in main)" 

268 ) 

269 

270 sense_dict = dict() 

271 

272 if sense: 

273 sense_dict["glosses"] = [sense] 

274 

275 for ( 

276 term, 

277 linkage, 

278 tags, 

279 topics, 

280 roman, 

281 raw_tags, 

282 ) in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr] 

283 """ 

284 SELECT term, linkage, tags, topics, roman, raw_tags 

285 FROM terms WHERE entry_id = ? 

286 """, 

287 (entry_id,), 

288 ): 

289 relation_dict = {"word": term, "source": f"Thesaurus:{entry}"} 

290 if len(tags) > 0: 

291 relation_dict["tags"] = tags.split("|") 

292 if len(topics) > 0: 

293 relation_dict["topics"] = topics.split("|") 

294 if len(raw_tags) > 0: 

295 relation_dict["raw_tags"] = raw_tags.split("|") 

296 if linkage not in sense_dict: 

297 sense_dict[linkage] = [] 

298 sense_dict[linkage].append(relation_dict) 

299 

300 if "glosses" not in sense_dict: 

301 sense_dict["tags"] = ["no-gloss"] 

302 

303 entry = { 

304 "word": entry, 

305 "lang": code_to_name(lang_code, "en"), 

306 "lang_code": lang_code, 

307 "pos": pos, 

308 "senses": [sense_dict] if sense_dict else [], 

309 "source": "thesaurus", 

310 } 

311 entry = {k: v for k, v in entry.items() if v} 

312 write_json_data(entry, out_f, human_readable)