Coverage for src/wiktextract/thesaurus.py: 69%
127 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-13 07:43 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-13 07:43 +0000
1# Extracting information from thesaurus pages in Wiktionary. The data will be
2# merged into word linkages in later stages.
3#
4# Copyright (c) 2021 Tatu Ylonen. See file LICENSE and https://ylonen.org
5import atexit
6import os
7import sqlite3
8import tempfile
9import time
10from collections.abc import Iterable
11from concurrent.futures import ProcessPoolExecutor
12from copy import deepcopy
13from dataclasses import dataclass, field
14from multiprocessing import current_process, get_context
15from pathlib import Path
16from traceback import format_exc
17from typing import Optional, TextIO
19from mediawiki_langcodes import code_to_name
20from wikitextprocessor import Page
21from wikitextprocessor.core import CollatedErrorReturnData, NamespaceDataEntry
23from .import_utils import import_extractor_module
24from .wxr_context import WiktextractContext
25from .wxr_logging import logger
28@dataclass
29class ThesaurusTerm:
30 entry: str
31 language_code: str
32 pos: str
33 linkage: str
34 term: str
35 tags: list[str] = field(default_factory=list)
36 raw_tags: list[str] = field(default_factory=list)
37 topics: list[str] = field(default_factory=list)
38 roman: str = ""
39 entry_id: int = 0
40 sense: str = ""
43def init_worker(wxr: WiktextractContext) -> None:
44 global worker_wxr
45 worker_wxr = wxr
46 worker_wxr.reconnect_databases()
47 atexit.register(worker_wxr.remove_unpicklable_objects)
50def worker_func(
51 page: Page,
52) -> tuple[bool, list[ThesaurusTerm], CollatedErrorReturnData, Optional[str]]:
53 with tempfile.TemporaryDirectory(prefix="wiktextract") as tmpdirname:
54 debug_path = "{}/wiktextract-{}".format(tmpdirname, os.getpid())
55 with open(debug_path, "w", encoding="utf-8") as f:
56 f.write(page.title + "\n")
58 worker_wxr.wtp.start_page(page.title)
59 try:
60 terms = extract_thesaurus_page(worker_wxr, page)
61 return True, terms, worker_wxr.wtp.to_return(), None
62 except Exception:
63 msg = (
64 '=== EXCEPTION while parsing page "{}":\n in process {}'.format(
65 page.title,
66 current_process().name,
67 )
68 + format_exc()
69 )
70 return False, [], {}, msg # type:ignore[typeddict-item]
73def extract_thesaurus_page(
74 wxr: WiktextractContext, page: Page
75) -> list[ThesaurusTerm]:
76 thesaurus_extractor_mod = import_extractor_module(
77 wxr.wtp.lang_code, "thesaurus"
78 )
79 return thesaurus_extractor_mod.extract_thesaurus_page(wxr, page)
82def extract_thesaurus_data(
83 wxr: WiktextractContext, num_processes: Optional[int] = None
84) -> None:
85 start_t = time.time()
86 logger.info("Extracting thesaurus data")
87 thesaurus_ns_data: NamespaceDataEntry = wxr.wtp.NAMESPACE_DATA.get(
88 "Thesaurus",
89 {}, # type:ignore[typeddict-item]
90 )
91 thesaurus_ns_id = thesaurus_ns_data.get("id", 0)
93 wxr.remove_unpicklable_objects()
94 with ProcessPoolExecutor(
95 max_workers=num_processes,
96 mp_context=get_context("spawn"),
97 initializer=init_worker,
98 initargs=(deepcopy(wxr),),
99 ) as executor:
100 wxr.reconnect_databases()
101 for success, terms, stats, err in executor.map(
102 worker_func,
103 wxr.wtp.get_all_pages([thesaurus_ns_id], False),
104 chunksize=100, # default is 1 too slow
105 ):
106 if not success: 106 ↛ 108line 106 didn't jump to line 108 because the condition on line 106 was never true
107 # Print error in parent process - do not remove
108 logger.error(err)
109 continue
110 for term in terms:
111 insert_thesaurus_term(wxr.thesaurus_db_conn, term) # type:ignore[arg-type]
112 wxr.config.merge_return(stats)
114 wxr.thesaurus_db_conn.commit() # type:ignore[union-attr]
115 num_pages = wxr.wtp.saved_page_nums([thesaurus_ns_id], False)
116 total = thesaurus_linkage_number(wxr.thesaurus_db_conn) # type:ignore[arg-type]
117 logger.info(
118 "Extracted {} linkages from {} thesaurus pages (took {:.1f}s)".format(
119 total, num_pages, time.time() - start_t
120 )
121 )
124def init_thesaurus_db(db_path: Path) -> sqlite3.Connection:
125 conn = sqlite3.connect(db_path)
126 conn.executescript(
127 """
128 CREATE TABLE IF NOT EXISTS entries (
129 id INTEGER PRIMARY KEY,
130 entry TEXT,
131 pos TEXT,
132 language_code TEXT,
133 sense TEXT
134 );
135 CREATE UNIQUE INDEX IF NOT EXISTS entries_index
136 ON entries(entry, pos, language_code);
138 CREATE TABLE IF NOT EXISTS terms (
139 term TEXT,
140 entry_id INTEGER,
141 linkage TEXT, -- Synonyms, Hyponyms
142 tags TEXT,
143 raw_tags TEXT,
144 topics TEXT,
145 roman TEXT, -- Romanization
146 PRIMARY KEY(term, entry_id),
147 FOREIGN KEY(entry_id) REFERENCES entries(id)
148 );
150 PRAGMA journal_mode = WAL;
151 PRAGMA foreign_keys = ON;
152 """
153 )
154 return conn
157def thesaurus_linkage_number(db_conn: sqlite3.Connection) -> int:
158 for (r,) in db_conn.execute("SELECT count(*) FROM terms"): 158 ↛ 160line 158 didn't jump to line 160 because the loop on line 158 didn't complete
159 return r
160 return 0
163def search_thesaurus(
164 db_conn: sqlite3.Connection,
165 entry: str,
166 lang_code: str,
167 pos: str,
168 linkage_type: Optional[str] = None,
169) -> Iterable[ThesaurusTerm]:
170 query_sql = """
171 SELECT term, entries.id, linkage, tags, topics, roman, sense, raw_tags
172 FROM terms JOIN entries ON terms.entry_id = entries.id
173 WHERE entry = ? AND language_code = ? AND pos = ?
174 """
175 query_value: tuple[str, ...] = (entry, lang_code, pos)
176 if linkage_type is not None: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 query_sql += " AND linkage = ?"
178 query_value += (linkage_type,)
180 for r in db_conn.execute(query_sql, query_value):
181 yield ThesaurusTerm(
182 term=r[0],
183 entry_id=r[1],
184 linkage=r[2],
185 tags=r[3].split("|") if len(r[3]) > 0 else [],
186 topics=r[4].split("|") if len(r[4]) > 0 else [],
187 roman=r[5],
188 sense=r[6],
189 entry=entry,
190 pos=pos,
191 language_code=lang_code,
192 raw_tags=r[7].split("|") if len(r[7]) > 0 else [],
193 )
196def insert_thesaurus_term(
197 db_conn: sqlite3.Connection, term: ThesaurusTerm
198) -> None:
199 entry_id = None
200 for (new_entry_id,) in db_conn.execute(
201 "INSERT OR IGNORE INTO entries (entry, language_code, pos, sense) "
202 "VALUES(?, ?, ?, ?) RETURNING id",
203 (term.entry, term.language_code, term.pos, term.sense),
204 ):
205 entry_id = new_entry_id
206 if entry_id is None:
207 for (old_entry_id,) in db_conn.execute(
208 "SELECT id FROM entries WHERE entry = ? AND language_code = ? "
209 "AND pos = ?",
210 (term.entry, term.language_code, term.pos),
211 ):
212 entry_id = old_entry_id
213 db_conn.execute(
214 """
215 INSERT OR IGNORE INTO terms
216 (term, entry_id, linkage, tags, topics, roman, raw_tags)
217 VALUES(?, ?, ?, ?, ?, ?, ?)
218 """,
219 (
220 term.term,
221 entry_id,
222 term.linkage,
223 "|".join(term.tags),
224 "|".join(term.topics),
225 term.roman,
226 "|".join(term.raw_tags),
227 ),
228 )
231def close_thesaurus_db(db_path: Path, db_conn: sqlite3.Connection) -> None:
232 db_conn.close()
233 if db_path.parent.samefile(Path(tempfile.gettempdir())): 233 ↛ exitline 233 didn't return from function 'close_thesaurus_db' because the condition on line 233 was always true
234 db_path.unlink(True)
237def emit_words_in_thesaurus(
238 wxr: WiktextractContext,
239 emitted: set[tuple[str, str, str]],
240 out_f: TextIO,
241 human_readable: bool,
242) -> None:
243 # Emit words that occur in thesaurus as main words but for which
244 # Wiktionary has no word in the main namespace. This seems to happen
245 # sometimes.
246 from .wiktionary import write_json_data
248 logger.info("Emitting words that only occur in thesaurus")
249 for entry_id, entry, pos, lang_code, sense in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr]
250 "SELECT id, entry, pos, language_code, sense FROM entries "
251 "WHERE pos IS NOT NULL AND language_code IS NOT NULL"
252 ):
253 if (entry, lang_code, pos) in emitted: 253 ↛ 256line 253 didn't jump to line 256 because the condition on line 253 was always true
254 continue
256 if None in (entry, lang_code, pos):
257 logger.info(
258 f"'None' in entry, lang_code or"
259 f" pos: {entry}, {lang_code}, {pos}"
260 )
261 continue
263 logger.info(
264 "Emitting thesaurus entry for "
265 f"{entry}/{lang_code}/{pos} (not in main)"
266 )
268 sense_dict = dict()
270 if sense:
271 sense_dict["glosses"] = [sense]
273 for (
274 term,
275 linkage,
276 tags,
277 topics,
278 roman,
279 raw_tags,
280 ) in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr]
281 """
282 SELECT term, linkage, tags, topics, roman, raw_tags
283 FROM terms WHERE entry_id = ?
284 """,
285 (entry_id,),
286 ):
287 relation_dict = {"word": term, "source": f"Thesaurus:{entry}"}
288 if len(tags) > 0:
289 relation_dict["tags"] = tags.split("|")
290 if len(topics) > 0:
291 relation_dict["topics"] = topics.split("|")
292 if len(raw_tags) > 0:
293 relation_dict["raw_tags"] = raw_tags.split("|")
294 if linkage not in sense_dict:
295 sense_dict[linkage] = []
296 sense_dict[linkage].append(relation_dict)
298 if "glosses" not in sense_dict:
299 sense_dict["tags"] = ["no-gloss"]
301 entry = {
302 "word": entry,
303 "lang": code_to_name(lang_code, "en"),
304 "lang_code": lang_code,
305 "pos": pos,
306 "senses": [sense_dict] if sense_dict else [],
307 "source": "thesaurus",
308 }
309 entry = {k: v for k, v in entry.items() if v}
310 write_json_data(entry, out_f, human_readable)