Snorkelの前処理について(実装編)①
はじめに
前回まででSnorkelの概要、生成モデル、識別モデルに関してまとめた。
Snorkelの概要 - 機械学習・自然言語処理の勉強メモ
Snorkelにおける生成モデル - 機械学習・自然言語処理の勉強メモ
Snorkelにおける識別モデル - 機械学習・自然言語処理の勉強メモ
途中、数式が理解できない箇所もあったが、Snorkelの理論的な部分はだいたい分かった。
今回は前処理に関して実際のTutorialを見ながらまとめる。
尚、Tutorialのタスクはこんな感じで文章から人物名の組み合わせを抽出すること。
snorkel/Snorkel-Workshop-FINAL.pdf at master · HazyResearch/snorkel · GitHubより抜粋
人物名はIREX標準で定義されている固有表現であり、「Stanford CoreNLP」や「Spcay」を用いて容易に抽出できる。ただし、これらのライブラリは日本語はサポートしておらず、それに依存するSnorkelの関数だけでは日本語は対応できない。なので、日本語文を対象とする場合、前処理を独自に行う必要がある。不安だったので、質問したら素早く教えてくれた。
github.com
まとめると、「candidateの抽出後」は、Snorkelを使って英語と同じように処理すればよいとのこと。
用語の整理
- Entity・・意味のある(興味のある)概念。(固有表現)
- Relation・・2つ以上のEntityの意味的な関係
この絵が分かりやすい。
snorkel/Snorkel-Workshop-FINAL.pdf at master · HazyResearch/snorkel · GitHubより抜粋
candidateとは、sentence中の目的のrelation(ここではSpouse)の候補となるEntity
なので、興味のあるのEntityのペアがcandidateとして抽出できればOK。
コードを読んでいく
今回の前処理に関するTutorialはここにある。
github.com
文書に関して
対象とする「articles.tsv」はこんな形式
id | sentence |
9b28e780-... | NEW YORK -- Theatergoers who check out "Beautiful" on tour won't get to see ... |
corpus_parser
from snorkel.parser.spacy_parser import Spacy from snorkel.parser import CorpusParser corpus_parser = CorpusParser(parser=Spacy()) corpus_parser.apply(doc_preprocessor, count=n_docs, parallelism=1)
corpus_parser.py
class CorpusParser(UDFRunner): def __init__(self, parser=None, fn=None): self.parser = parser or StanfordCoreNLPServer() super(CorpusParser, self).__init__(CorpusParserUDF, parser=self.parser, fn=fn) class CorpusParserUDF(UDF): def __init__(self, parser, fn, **kwargs): super(CorpusParserUDF, self).__init__(**kwargs) self.parser = parser self.req_handler = parser.connect() self.fn = fn def apply(self, x, **kwargs): """Given a Document object and its raw text, parse into Sentences""" doc, text = x for parts in self.req_handler.parse(doc, text): parts = self.fn(parts) if self.fn is not None else parts yield Sentence(**parts)
parserは、デフォルトで「Stanford CoreNLP」だが、Tutorialでは、「Spcay」を使用している。
applyメソッドでパース処理を実行する。
このapplyメソッドにたどり着くまでが少しややこしい。まず、CorpusParserの親クラスUDFRunnerのapplyメソッドを実行する。
内部でapply_stを経由してCorpusParserUDF.applyを呼び出す。
class UDFRunner(object): """Class to run UDFs in parallel using simple queue-based multiprocessing setup""" def __init__(self, udf_class, **udf_init_kwargs): self.udf_class = udf_class self.udf_init_kwargs = udf_init_kwargs self.udfs = [] if hasattr(self.udf_class, 'reduce'): self.reducer = self.udf_class(**self.udf_init_kwargs) else: self.reducer = None def apply(self, xs, clear=True, parallelism=None, progress_bar=True, count=None, **kwargs): """ Apply the given UDF to the set of objects xs, either single or multi-threaded, and optionally calling clear() first. """ # Clear everything downstream of this UDF if requested if clear: print("Clearing existing...") SnorkelSession = new_sessionmaker() session = SnorkelSession() self.clear(session, **kwargs) session.commit() session.close() # Execute the UDF print("Running UDF...") if parallelism is None or parallelism < 2: self.apply_st(xs, progress_bar, clear=clear, count=count, **kwargs) else: self.apply_mt(xs, parallelism, clear=clear, **kwargs) def apply_st(self, xs, progress_bar, count, **kwargs): """Run the UDF single-threaded, optionally with progress bar""" udf = self.udf_class(**self.udf_init_kwargs) # Set up ProgressBar if possible pb = None if progress_bar and hasattr(xs, '__len__') or count is not None: n = count if count is not None else len(xs) pb = ProgressBar(n) # Run single-thread for i, x in enumerate(xs): if pb: pb.bar(i) # Apply UDF and add results to the session for y in udf.apply(x, **kwargs): # Uf UDF has a reduce step, this will take care of the insert; else add to session if hasattr(self.udf_class, 'reduce'): udf.reduce(y, **kwargs) else: udf.session.add(y) # Commit session and close progress bar if applicable udf.session.commit() if pb: pb.close()
後、apply_st関数でxs
をイテレートしている。xs
はdoc_preprocessorインスタンスであるので、この時点でdoc_preprocessorインスタンスの__iter__()メソッドが呼ばれることが分かる。
class DocPreprocessor(object): """ Processes a file or directory of files into a set of Document objects. :param encoding: file encoding to use, default='utf-8' :param path: filesystem path to file or directory to parse :param max_docs: the maximum number of Documents to produce, default=float('inf') """ def __init__(self, path, encoding="utf-8", max_docs=float('inf')): self.path = path self.encoding = encoding self.max_docs = max_docs def generate(self): """ Parses a file or directory of files into a set of Document objects. """ doc_count = 0 for fp in self._get_files(self.path): file_name = os.path.basename(fp) if self._can_read(file_name): for doc, text in self.parse_file(fp, file_name): yield doc, text doc_count += 1 if doc_count >= self.max_docs: return def __iter__(self): return self.generate() class TSVDocPreprocessor(DocPreprocessor): """Simple parsing of TSV file with one (doc_name <tab> doc_text) per line""" def parse_file(self, fp, file_name): with codecs.open(fp, encoding=self.encoding) as tsv: for line in tsv: (doc_name, doc_text) = line.split('\t') stable_id = self.get_stable_id(doc_name) doc = Document( name=doc_name, stable_id=stable_id, meta={'file_name': file_name} ) yield doc, doc_text
__iter__→generate()→parse_fileメソッドが呼ばれ、パース処理が実行される。xs
には、doc
, text
が格納されており、それらはタブ毎にCorpusParserUDF.applyに渡される。
corpus_parser.py
class CorpusParserUDF(UDF): def __init__(self, parser, fn, **kwargs): super(CorpusParserUDF, self).__init__(**kwargs) self.parser = parser self.req_handler = parser.connect() self.fn = fn def apply(self, x, **kwargs): """Given a Document object and its raw text, parse into Sentences""" doc, text = x for parts in self.req_handler.parse(doc, text): parts = self.fn(parts) if self.fn is not None else parts yield Sentence(**parts)
self.req_handler.parse(doc, text)の実体は、Spacy.parse関数。この関数がSpacyによるパース処理を行う。
class Spacy(Parser): def parse(self, document, text): ''' Transform spaCy output to match CoreNLP's default format :param document: :param text: :return: ''' text = self.to_unicode(text) doc = self.model.tokenizer(text) for proc in self.pipeline: proc(doc) assert doc.is_parsed position = 0 for sent in doc.sents: parts = defaultdict(list) text = sent.text for i,token in enumerate(sent): parts['words'].append(str(token)) parts['lemmas'].append(token.lemma_) parts['pos_tags'].append(token.tag_) parts['ner_tags'].append(token.ent_type_ if token.ent_type_ else 'O') parts['char_offsets'].append(token.idx) parts['abs_char_offsets'].append(token.idx) head_idx = 0 if token.head is token else token.head.i - sent[0].i + 1 parts['dep_parents'].append(head_idx) parts['dep_labels'].append(token.dep_) # Add null entity array (matching null for CoreNLP) parts['entity_cids'] = ['O' for _ in parts['words']] parts['entity_types'] = ['O' for _ in parts['words']] # make char_offsets relative to start of sentence parts['char_offsets'] = [ p - parts['char_offsets'][0] for p in parts['char_offsets'] ] parts['position'] = position # Link the sentence to its parent document object parts['document'] = document parts['text'] = text # Add null entity array (matching null for CoreNLP) parts['entity_cids'] = ['O' for _ in parts['words']] parts['entity_types'] = ['O' for _ in parts['words']] # Assign the stable id as document's stable id plus absolute # character offset abs_sent_offset = parts['abs_char_offsets'][0] abs_sent_offset_end = abs_sent_offset + parts['char_offsets'][-1] + len(parts['words'][-1]) if document: parts['stable_id'] = construct_stable_id(document, 'sentence', abs_sent_offset, abs_sent_offset_end) position += 1 yield parts
パースした内容は、Document・Sentenceモデルで確認できる。(この辺の操作はSQLAlchemyで行う。)
例えば、Sentenceモデルのテーブル構造はこんな感じ。
列名 | 型 |
id | Integer |
words | String |
char_offsets | Integer |
abs_char_offsets | Integer |
lemmas | String |
pos_tags | String |
ner_tags | String |
dep_parents | Integer |
dep_labels | String |
entity_cids | String |
entity_types | String |
*ここで定義されている。
snorkel/context.py at master · HazyResearch/snorkel · GitHub
以下は先頭の文章をからいくつかの列の値抽出した結果。
sent1 = session.query(Sentence).first() print sent1.words ['NEW', 'YORK', '--', 'Theatergoers', 'who', 'check', 'out', '"', 'Beautiful', '"', 'on', 'tour', 'wo', "n't", 'get', 'to', 'see', 'Tony', 'winner', 'Jessie', 'Mueller', 'but', 'they', 'may', 'get', 'the', 'next', 'best', 'thing', '--', 'someone', 'with', 'her', 'DNA', '.', ' '] print sent1.char_offsets [0, 4, 9, 12, 25, 29, 35, 39, 40, 49, 51, 54, 59, 61, 65, 69, 72, 76, 81, 88, 95, 103, 107, 112, 116, 120, 124, 129, 134, 140, 143, 151, 156, 160, 163, 165] print sent1.pos_tags [u'JJ', u'NN', u':', u'NNS', u'WP', u'VBP', u'RP', u'``', u'JJ', u"''", u'IN', u'NN', u'MD', u'RB', u'VB', u'TO', u'VB', u'NNP', u'NN', u'NNP', u'NNP', u'CC', u'PRP', u'MD', u'VB', u'DT', u'JJ', u'JJS', u'NN', u':', u'NN', u'IN', u'PRP$', u'NNP', u'.', u'SP'] print sent1.ner_tags ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', u'PERSON', 'O', u'PERSON', u'PERSON', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', u'ORG', 'O', 'O']
一連のパース処理は、英文だからできる。(Spcayが対応している言語ならできる。)
残念ながら日本語は対応していない。(janome/spaCyで一部パースは可能)
ここらへんをどう処理するかを考える必要がある。
candidate_subclass
タスクのcandidateを定義する。
以下では、person1・person2のエンティティを関係Spouse(配偶者)として定義している。
from snorkel.models import candidate_subclass Spouse = candidate_subclass('Spouse', ['person1', 'person2'])
こうしたら固有表現抽出でも使えるらしい。
from snorkel.models import candidate_subclass Person = candidate_subclass('Person', ['person1'])
Training data for training a NER model · Issue #599 · HazyResearch/snorkel · GitHub
後で、詳しく見てみたい。
CandidateExtractor
Candidateの抽出。ここが前処理において一番大事と思われる。
ここでSpcayによりパースしたNERタグが使われる。
from snorkel.candidates import Ngrams, CandidateExtractor from snorkel.matchers import PersonMatcher ngrams = Ngrams(n_max=7) person_matcher = PersonMatcher(longest_match_only=True) cand_extractor = CandidateExtractor(Spouse, [ngrams, ngrams], [person_matcher, person_matcher], symmetric_relations=False)
Ngramsのインスタンスメソッドを確認する。
class Ngrams(CandidateSpace): """ Defines the space of candidates as all n-grams (n <= n_max) in a Sentence _x_, indexing by **character offset**. """ def __init__(self, n_max=5, split_tokens=('-', '/')): CandidateSpace.__init__(self) self.n_max = n_max self.split_rgx = r'('+r'|'.join(split_tokens)+r')' if split_tokens and len(split_tokens) > 0 else None
ngramsの属性を確認する。
print ngrams.__dict__ {'n_max': 7, 'split_rgx': '(-|/)'}
次にPersonMatcherについて
person_matcher = PersonMatcher(longest_match_only=True)
PersonMatcherのインスタンスメソッドを確認する。
class PersonMatcher(RegexMatchEach): """ Matches Spans that are the names of people, as identified by CoreNLP. A convenience class for setting up a RegexMatchEach to match spans for which each token was tagged as a person. """ def __init__(self, *children, **kwargs): kwargs['attrib'] = 'ner_tags' kwargs['rgx'] = 'PERSON' super(PersonMatcher, self).__init__(*children, **kwargs)
person_matcherの属性を確認する。
print person_matcher.__dict__ {'ignore_case': True, 'longest_match_only': True, 'sep': ' ', 'rgx': 'PERSON$', 'r': <_sre.SRE_Pattern object at 0x7f4ff55e6ea0>, 'attrib': 'ner_tags', 'children': (), 'opts': {'rgx': 'PERSON', 'attrib': 'ner_tags', 'longest_match_only': True}}
person_matcherはNERタグが「PERSON」の単語をターゲットとするための定義を行う。
snorkelでは、PERSON, LOCATION, ORGANIZATION などの標準的なNERタグをcandidateのターゲットとするためのクラスを用意してくれている。
※独自のタグをターゲットとする場合、これらのクラスに倣って専用のクラスを作ればよい。
最後にCandidateExtractorの引数に定義したものをセットする。
※symmetric_relationsは、集合「A, B」を候補「A, B」・「B, A」として扱うか。一つの候補「A, B」としてのみ扱うか。
そして、applyメソッドでcandidateの抽出処理を行う。
for i, sents in enumerate([train_sents, dev_sents, test_sents]): cand_extractor.apply(sents, split=i, parallelism=1) print("Number of candidates:", session.query(Spouse).filter(Spouse.split == i).count())
applyメソッドで重要と思われる箇所を見ていく。
大事なところは「candidate_spacesインスタンスのapplyメソッドの戻り値をmatchersインスタンスのapplyメソッドの引数」としていること。
def apply(self, context, clear, split, **kwargs): # Generate TemporaryContexts that are children of the context using the candidate_space and filtered # by the Matcher for i in range(self.arity): self.child_context_sets[i].clear() for tc in self.matchers[i].apply(self.candidate_spaces[i].apply(context)): tc.load_id_or_insert(self.session) self.child_context_sets[i].add(tc)
Ngramsクラスのapplyメソッドを確認する。
class Ngrams(CandidateSpace): def apply(self, context): # These are the character offset--**relative to the sentence start**--for each _token_ offsets = context.char_offsets # Loop over all n-grams in **reverse** order (to facilitate longest-match semantics) L = len(offsets) seen = set() for l in range(1, self.n_max+1)[::-1]: for i in range(L-l+1): w = context.words[i+l-1] start = offsets[i] end = offsets[i+l-1] + len(w) - 1 ts = TemporarySpan(char_start=start, char_end=end, sentence=context) if ts not in seen: seen.add(ts) yield ts
ts
の内容を下記sentenceを例に数件確認する。
i | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 |
words | NEW | YORK | -- | Theatergoers | who | check | out | " | Beautiful | " | on | tour | wo | n't | get | to | see |
offset | 0 | 4 | 9 | 12 | 25 | 29 | 35 | 39 | 40 | 49 | 51 | 54 | 59 | 61 | 65 | 69 | 72 |
外側のループ1回目:
内側のループ1回目:
まず、L=16、外側のループでl=7(self.n_max=7)、内側のループでi=0がセットされる。
各変数は、
w = "out" start = 0 end = 37 # 35 + 3 - 1 ts = TemporarySpan(char_start=0, char_end=37, sentence=context) # ts.char_start=0 # ts.char_end=37 # ts.sentence=context
内側のループ2回目:
i=1に更新
各変数は、
w = "" start = 4 end = 38 # 39 + 0 - 1 ts = TemporarySpan(char_start=4, char_end=38, sentence=context) # ts.char_start=4 # ts.char_end=38 # ts.sentence=context
となる。
外側のループ1回目では、7-gramsの集合を作る。
外側のループ2回目:
内側のループ1回目:
l=6、i=0がセットされる。
各変数は、
w = "check" start = 0 end = 33 # 29 + 5 - 1 ts = TemporarySpan(char_start=0, char_end=33, sentence=context) # ts.char_start=0 # ts.char_end=33 # ts.sentence=context
となる。
外側のループ1回目では、6-gramsの集合を作る。
これを最終的にunigramの集合まで作成する。
実際はts
を生成するごとに、それをmatchersインスタンスのapplyメソッドに渡す。
PersonMatcherクラスのapplyメソッドを確認する。
※正確には、継承元Matcherから引き継いだもの
n-gramsのcandidateを一つずつチェックする。
def apply(self, candidates): """ Apply the Matcher to a **generator** of candidates Optionally only takes the longest match (NOTE: assumes this is the *first* match) """ seen_spans = set() for c in candidates: if self.f(c) and (not self.longest_match_only or not any([self._is_subspan(c, s) for s in seen_spans])): if self.longest_match_only: seen_spans.add(self._get_span(c)) yield c
class RegexMatchEach(RegexMatch): """Matches regex pattern on **each token**""" def _f(self, c): tokens = c.get_attrib_tokens(self.attrib) return True if tokens and all([self.r.match(t) is not None for t in tokens]) else False
具体的には、_f
関数でn-grams全てがマッチングするか(今回の場合、tokenのner_tags属性が全てPERSONか)判定。
例えば、下記の文章で考える。
Barack Obama and first lady Michelle Obama ...
最大3gramsで考えた場合、
Barack Obama and |
Obama and first |
and first lady |
first lady Michelle |
lady Michelle Obama |
Barack Obama and |
Obama and first |
and first lady |
first lady Michelle |
lady Michelle Obama |
Barack Obama |
Obama and |
and first |
first lady |
lady Michelle |
Michelle Obama |
Barack |
Obama |
and |
first |
lady |
Michelle |
Obama |
Obama and |
and first |
first lady |
lady Michelle |
Michelle Obama |
Barack |
Obama |
and |
first |
lady |
Michelle |
Obama |
が候補になる。
n-gramが大きい方から候補を探すので、bi-gramの「Barack Obama」が候補となる。
また、既に候補が存在する場合、そのspanはseen_spans
に格納済みなので、より小さいn-gramで再度候補となることはない(この例の場合、「BarackやObama」が候補となることは無い)。
また、実際は単語ではなく、単語の開始位置で候補を判定するので、同じ単語が存在しても不都合が生じることは無い。
DB登録まで追えてはいないが、候補単位で登録を行うことで、同じ候補での重複(候補がBarack Obama同士になる)を無くしているのだと考えられる。
最後に抽出後のCandidateを確認する。
cands = session.query(Candidate).filter(Candidate.split == 0).all() print cands[0] # Spouse(Span("Sarko", sentence=44253, chars=[55,59], words=[10,10]), Span("Carla Bruni", sentence=44253, chars=[79,89], words=[15,16])) print cands[0].person1 # Span("Sarko", sentence=44253, chars=[55,59], words=[10,10]) print cands[0].person2 # Span("Carla Bruni", sentence=44253, chars=[79,89], words=[15,16]) # the raw word tokens for the person1 Span print cands[0].person1.get_attrib_tokens("words") # ['Sarko'] print cands[0].person1.get_attrib_tokens("pos_tags") # [u'NNP'] print cands[0].person1.get_attrib_tokens("ner_tags") # [u'PERSON']
これで、前処理が完了した。
次は生成モデルの構築について見ていきたい。