Coverage for src/wiktextract/thesaurus.py: 61%
123 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-27 08:07 +0000
1# Extracting information from thesaurus pages in Wiktionary. The data will be
2# merged into word linkages in later stages.
3#
4# Copyright (c) 2021 Tatu Ylonen. See file LICENSE and https://ylonen.org
5import os
6import sqlite3
7import tempfile
8import time
9import traceback
10from collections.abc import Iterable
11from dataclasses import dataclass, field
12from multiprocessing import Pool, current_process
13from pathlib import Path
14from typing import Optional, TextIO
16from mediawiki_langcodes import code_to_name
17from wikitextprocessor import Page
18from wikitextprocessor.core import CollatedErrorReturnData, NamespaceDataEntry
20from .import_utils import import_extractor_module
21from .wxr_context import WiktextractContext
22from .wxr_logging import logger
25@dataclass
26class ThesaurusTerm:
27 entry: str
28 language_code: str
29 pos: str
30 linkage: str
31 term: str
32 tags: list[str] = field(default_factory=list)
33 raw_tags: list[str] = field(default_factory=list)
34 topics: list[str] = field(default_factory=list)
35 roman: str = ""
36 entry_id: int = 0
37 sense: str = ""
40def worker_func(
41 page: Page,
42) -> tuple[bool, list[ThesaurusTerm], CollatedErrorReturnData, Optional[str]]:
43 wxr: WiktextractContext = worker_func.wxr # type:ignore[attr-defined]
44 with tempfile.TemporaryDirectory(prefix="wiktextract") as tmpdirname:
45 debug_path = "{}/wiktextract-{}".format(tmpdirname, os.getpid())
46 with open(debug_path, "w", encoding="utf-8") as f:
47 f.write(page.title + "\n")
49 wxr.wtp.start_page(page.title)
50 try:
51 terms = extract_thesaurus_page(wxr, page)
52 return True, terms, wxr.wtp.to_return(), None
53 except Exception as e:
54 lst = traceback.format_exception(
55 type(e), value=e, tb=e.__traceback__
56 )
57 msg = (
58 '=== EXCEPTION while parsing page "{}":\n '
59 "in process {}".format(
60 page.title,
61 current_process().name,
62 )
63 + "".join(lst)
64 )
65 return False, [], {}, msg # type:ignore[typeddict-item]
68def extract_thesaurus_page(
69 wxr: WiktextractContext, page: Page
70) -> list[ThesaurusTerm]:
71 thesaurus_extractor_mod = import_extractor_module(
72 wxr.wtp.lang_code, "thesaurus"
73 )
74 return thesaurus_extractor_mod.extract_thesaurus_page(wxr, page)
77def extract_thesaurus_data(
78 wxr: WiktextractContext, num_processes: Optional[int] = None
79) -> None:
80 from .wiktionary import init_worker_process
82 start_t = time.time()
83 logger.info("Extracting thesaurus data")
84 thesaurus_ns_data: NamespaceDataEntry = wxr.wtp.NAMESPACE_DATA.get(
85 "Thesaurus",
86 {}, # type:ignore[typeddict-item]
87 )
88 thesaurus_ns_id = thesaurus_ns_data.get("id", 0)
90 wxr.remove_unpicklable_objects()
91 with Pool(num_processes, init_worker_process, (worker_func, wxr)) as pool:
92 wxr.reconnect_databases(False)
93 for success, terms, stats, err in pool.imap_unordered(
94 worker_func, wxr.wtp.get_all_pages([thesaurus_ns_id], False)
95 ):
96 if not success: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was never true
97 # Print error in parent process - do not remove
98 logger.error(err)
99 continue
100 for term in terms:
101 insert_thesaurus_term(wxr.thesaurus_db_conn, term) # type:ignore[arg-type]
102 wxr.config.merge_return(stats)
104 wxr.thesaurus_db_conn.commit() # type:ignore[union-attr]
105 num_pages = wxr.wtp.saved_page_nums([thesaurus_ns_id], False)
106 total = thesaurus_linkage_number(wxr.thesaurus_db_conn) # type:ignore[arg-type]
107 logger.info(
108 "Extracted {} linkages from {} thesaurus pages (took {:.1f}s)".format(
109 total, num_pages, time.time() - start_t
110 )
111 )
114def init_thesaurus_db(db_path: Path) -> sqlite3.Connection:
115 conn = sqlite3.connect(db_path)
116 conn.executescript(
117 """
118 CREATE TABLE IF NOT EXISTS entries (
119 id INTEGER PRIMARY KEY,
120 entry TEXT,
121 pos TEXT,
122 language_code TEXT,
123 sense TEXT
124 );
125 CREATE UNIQUE INDEX IF NOT EXISTS entries_index
126 ON entries(entry, pos, language_code);
128 CREATE TABLE IF NOT EXISTS terms (
129 term TEXT,
130 entry_id INTEGER,
131 linkage TEXT, -- Synonyms, Hyponyms
132 tags TEXT,
133 raw_tags TEXT,
134 topics TEXT,
135 roman TEXT, -- Romanization
136 PRIMARY KEY(term, entry_id),
137 FOREIGN KEY(entry_id) REFERENCES entries(id)
138 );
140 PRAGMA journal_mode = WAL;
141 PRAGMA foreign_keys = ON;
142 """
143 )
144 return conn
147def thesaurus_linkage_number(db_conn: sqlite3.Connection) -> int:
148 for (r,) in db_conn.execute("SELECT count(*) FROM terms"): 148 ↛ 150line 148 didn't jump to line 150 because the loop on line 148 didn't complete
149 return r
150 return 0
153def search_thesaurus(
154 db_conn: sqlite3.Connection,
155 entry: str,
156 lang_code: str,
157 pos: str,
158 linkage_type: Optional[str] = None,
159) -> Iterable[ThesaurusTerm]:
160 query_sql = """
161 SELECT term, entries.id, linkage, tags, topics, roman, sense, raw_tags
162 FROM terms JOIN entries ON terms.entry_id = entries.id
163 WHERE entry = ? AND language_code = ? AND pos = ?
164 """
165 query_value: tuple[str, ...] = (entry, lang_code, pos)
166 if linkage_type is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 query_sql += " AND linkage = ?"
168 query_value += (linkage_type,)
170 for r in db_conn.execute(query_sql, query_value):
171 yield ThesaurusTerm(
172 term=r[0],
173 entry_id=r[1],
174 linkage=r[2],
175 tags=r[3].split("|") if len(r[3]) > 0 else [],
176 topics=r[4].split("|") if len(r[4]) > 0 else [],
177 roman=r[5],
178 sense=r[6],
179 entry=entry,
180 pos=pos,
181 language_code=lang_code,
182 raw_tags=r[7].split("|") if len(r[7]) > 0 else [],
183 )
186def insert_thesaurus_term(
187 db_conn: sqlite3.Connection, term: ThesaurusTerm
188) -> None:
189 entry_id = None
190 for (new_entry_id,) in db_conn.execute(
191 "INSERT OR IGNORE INTO entries (entry, language_code, pos, sense) "
192 "VALUES(?, ?, ?, ?) RETURNING id",
193 (term.entry, term.language_code, term.pos, term.sense),
194 ):
195 entry_id = new_entry_id
196 if entry_id is None:
197 for (old_entry_id,) in db_conn.execute(
198 "SELECT id FROM entries WHERE entry = ? AND language_code = ? "
199 "AND pos = ?",
200 (term.entry, term.language_code, term.pos),
201 ):
202 entry_id = old_entry_id
203 db_conn.execute(
204 """
205 INSERT OR IGNORE INTO terms
206 (term, entry_id, linkage, tags, topics, roman, raw_tags)
207 VALUES(?, ?, ?, ?, ?, ?, ?)
208 """,
209 (
210 term.term,
211 entry_id,
212 term.linkage,
213 "|".join(term.tags),
214 "|".join(term.topics),
215 term.roman,
216 "|".join(term.raw_tags),
217 ),
218 )
221def close_thesaurus_db(db_path: Path, db_conn: sqlite3.Connection) -> None:
222 db_conn.close()
223 if db_path.parent.samefile(Path(tempfile.gettempdir())): 223 ↛ exitline 223 didn't return from function 'close_thesaurus_db' because the condition on line 223 was always true
224 db_path.unlink(True)
227def emit_words_in_thesaurus(
228 wxr: WiktextractContext,
229 emitted: set[tuple[str, str, str]],
230 out_f: TextIO,
231 human_readable: bool,
232) -> None:
233 # Emit words that occur in thesaurus as main words but for which
234 # Wiktionary has no word in the main namespace. This seems to happen
235 # sometimes.
236 from .wiktionary import write_json_data
238 logger.info("Emitting words that only occur in thesaurus")
239 for entry_id, entry, pos, lang_code, sense in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr]
240 "SELECT id, entry, pos, language_code, sense FROM entries "
241 "WHERE pos IS NOT NULL AND language_code IS NOT NULL"
242 ):
243 if (entry, lang_code, pos) in emitted: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true
244 continue
246 if None in (entry, lang_code, pos):
247 logger.info(
248 f"'None' in entry, lang_code or"
249 f" pos: {entry}, {lang_code}, {pos}"
250 )
251 continue
253 logger.info(
254 "Emitting thesaurus entry for "
255 f"{entry}/{lang_code}/{pos} (not in main)"
256 )
258 sense_dict = dict()
260 if sense:
261 sense_dict["glosses"] = [sense]
263 for (
264 term,
265 linkage,
266 tags,
267 topics,
268 roman,
269 raw_tags,
270 ) in wxr.thesaurus_db_conn.execute( # type:ignore[union-attr]
271 """
272 SELECT term, linkage, tags, topics, roman, raw_tags
273 FROM terms WHERE entry_id = ?
274 """,
275 (entry_id,),
276 ):
277 relation_dict = {"word": term, "source": f"Thesaurus:{entry}"}
278 if len(tags) > 0:
279 relation_dict["tags"] = tags.split("|")
280 if len(topics) > 0:
281 relation_dict["topics"] = topics.split("|")
282 if len(raw_tags) > 0:
283 relation_dict["raw_tags"] = raw_tags.split("|")
284 if linkage not in sense_dict:
285 sense_dict[linkage] = []
286 sense_dict[linkage].append(relation_dict)
288 if "glosses" not in sense_dict:
289 sense_dict["tags"] = ["no-gloss"]
291 entry = {
292 "word": entry,
293 "lang": code_to_name(lang_code, "en"),
294 "lang_code": lang_code,
295 "pos": pos,
296 "senses": [sense_dict] if sense_dict else [],
297 "source": "thesaurus",
298 }
299 entry = {k: v for k, v in entry.items() if v}
300 write_json_data(entry, out_f, human_readable)