機械学習・自然言語処理の勉強メモ

学んだことのメモやまとめ

Snorkelの前処理について(実装編)①

はじめに



前回まででSnorkelの概要、生成モデル、識別モデルに関してまとめた。
Snorkelの概要 - 機械学習・自然言語処理の勉強メモ
Snorkelにおける生成モデル - 機械学習・自然言語処理の勉強メモ
Snorkelにおける識別モデル - 機械学習・自然言語処理の勉強メモ

途中、数式が理解できない箇所もあったが、Snorkelの理論的な部分はだいたい分かった。

今回は前処理に関して実際のTutorialを見ながらまとめる。
尚、Tutorialのタスクはこんな感じで文章から人物名の組み合わせを抽出すること。

f:id:kento1109:20180114145046p:plain
snorkel/Snorkel-Workshop-FINAL.pdf at master · HazyResearch/snorkel · GitHubより抜粋

人物名はIREX標準で定義されている固有表現であり、「Stanford CoreNLP」や「Spcay」を用いて容易に抽出できる。ただし、これらのライブラリは日本語はサポートしておらず、それに依存するSnorkelの関数だけでは日本語は対応できない。なので、日本語文を対象とする場合、前処理を独自に行う必要がある。不安だったので、質問したら素早く教えてくれた。
github.com
まとめると、「candidateの抽出後」は、Snorkelを使って英語と同じように処理すればよいとのこと。

用語の整理
  • Entity・・意味のある(興味のある)概念。(固有表現)
  • Relation・・2つ以上のEntityの意味的な関係

この絵が分かりやすい。
f:id:kento1109:20180112124038p:plain
snorkel/Snorkel-Workshop-FINAL.pdf at master · HazyResearch/snorkel · GitHubより抜粋

candidateとは、sentence中の目的のrelation(ここではSpouse)の候補となるEntity

なので、興味のあるのEntityのペアがcandidateとして抽出できればOK。

コードを読んでいく



今回の前処理に関するTutorialはここにある。
github.com

文書に関して

対象とする「articles.tsv」はこんな形式

id sentence
9b28e780-... NEW YORK -- Theatergoers who check out "Beautiful" on tour won't get to see ...
corpus_parser
from snorkel.parser.spacy_parser import Spacy
from snorkel.parser import CorpusParser

corpus_parser = CorpusParser(parser=Spacy())
corpus_parser.apply(doc_preprocessor, count=n_docs, parallelism=1)

corpus_parser.py

class CorpusParser(UDFRunner):

    def __init__(self, parser=None, fn=None):
        self.parser = parser or StanfordCoreNLPServer()
        super(CorpusParser, self).__init__(CorpusParserUDF,
                                           parser=self.parser,
                                           fn=fn)

class CorpusParserUDF(UDF):

    def __init__(self, parser, fn, **kwargs):
        super(CorpusParserUDF, self).__init__(**kwargs)
        self.parser = parser
        self.req_handler = parser.connect()
        self.fn = fn

    def apply(self, x, **kwargs):
        """Given a Document object and its raw text, parse into Sentences"""
        doc, text = x
        for parts in self.req_handler.parse(doc, text):
            parts = self.fn(parts) if self.fn is not None else parts
            yield Sentence(**parts)

parserは、デフォルトで「Stanford CoreNLP」だが、Tutorialでは、「Spcay」を使用している。
applyメソッドでパース処理を実行する。
このapplyメソッドにたどり着くまでが少しややこしい。まず、CorpusParserの親クラスUDFRunnerのapplyメソッドを実行する。
内部でapply_stを経由してCorpusParserUDF.applyを呼び出す。

class UDFRunner(object):
    """Class to run UDFs in parallel using simple queue-based multiprocessing setup"""
    def __init__(self, udf_class, **udf_init_kwargs):
        self.udf_class       = udf_class
        self.udf_init_kwargs = udf_init_kwargs
        self.udfs            = []

        if hasattr(self.udf_class, 'reduce'):
            self.reducer = self.udf_class(**self.udf_init_kwargs)
        else:
            self.reducer = None

    def apply(self, xs, clear=True, parallelism=None, progress_bar=True, count=None, **kwargs):
        """
        Apply the given UDF to the set of objects xs, either single or multi-threaded, 
        and optionally calling clear() first.
        """
        # Clear everything downstream of this UDF if requested
        if clear:
            print("Clearing existing...")
            SnorkelSession = new_sessionmaker()
            session = SnorkelSession()
            self.clear(session, **kwargs)
            session.commit()
            session.close()

        # Execute the UDF
        print("Running UDF...")
        if parallelism is None or parallelism < 2:
            self.apply_st(xs, progress_bar, clear=clear, count=count, **kwargs)
        else:
            self.apply_mt(xs, parallelism, clear=clear, **kwargs)

    def apply_st(self, xs, progress_bar, count, **kwargs):
        """Run the UDF single-threaded, optionally with progress bar"""
        udf = self.udf_class(**self.udf_init_kwargs)

        # Set up ProgressBar if possible
        pb = None
        if progress_bar and hasattr(xs, '__len__') or count is not None:
            n = count if count is not None else len(xs)
            pb = ProgressBar(n)
        
        # Run single-thread
        for i, x in enumerate(xs):
            if pb:
                pb.bar(i)

            # Apply UDF and add results to the session
            for y in udf.apply(x, **kwargs):
                
                # Uf UDF has a reduce step, this will take care of the insert; else add to session
                if hasattr(self.udf_class, 'reduce'):
                    udf.reduce(y, **kwargs)
                else:
                    udf.session.add(y)

        # Commit session and close progress bar if applicable
        udf.session.commit()
        if pb:
            pb.close()

後、apply_st関数でxsをイテレートしている。xsはdoc_preprocessorインスタンスであるので、この時点でdoc_preprocessorインスタンスの__iter__()メソッドが呼ばれることが分かる。

class DocPreprocessor(object):
    """
    Processes a file or directory of files into a set of Document objects.
    :param encoding: file encoding to use, default='utf-8'
    :param path: filesystem path to file or directory to parse
    :param max_docs: the maximum number of Documents to produce,
        default=float('inf')
    """

    def __init__(self, path, encoding="utf-8", max_docs=float('inf')):
        self.path = path
        self.encoding = encoding
        self.max_docs = max_docs

    def generate(self):
        """
        Parses a file or directory of files into a set of Document objects.
        """
        doc_count = 0
        for fp in self._get_files(self.path):
            file_name = os.path.basename(fp)
            if self._can_read(file_name):
                for doc, text in self.parse_file(fp, file_name):
                    yield doc, text
                    doc_count += 1
                    if doc_count >= self.max_docs:
                        return

    def __iter__(self):
        return self.generate()

class TSVDocPreprocessor(DocPreprocessor):
    """Simple parsing of TSV file with one (doc_name <tab> doc_text) per line"""

    def parse_file(self, fp, file_name):
        with codecs.open(fp, encoding=self.encoding) as tsv:
            for line in tsv:
                (doc_name, doc_text) = line.split('\t')
                stable_id = self.get_stable_id(doc_name)
                doc = Document(
                    name=doc_name, stable_id=stable_id,
                    meta={'file_name': file_name}
                )
                yield doc, doc_text

__iter__→generate()→parse_fileメソッドが呼ばれ、パース処理が実行される。xsには、doc, textが格納されており、それらはタブ毎にCorpusParserUDF.applyに渡される。
corpus_parser.py

class CorpusParserUDF(UDF):

    def __init__(self, parser, fn, **kwargs):
        super(CorpusParserUDF, self).__init__(**kwargs)
        self.parser = parser
        self.req_handler = parser.connect()
        self.fn = fn

    def apply(self, x, **kwargs):
        """Given a Document object and its raw text, parse into Sentences"""
        doc, text = x
        for parts in self.req_handler.parse(doc, text):
            parts = self.fn(parts) if self.fn is not None else parts
            yield Sentence(**parts)

self.req_handler.parse(doc, text)の実体は、Spacy.parse関数。この関数がSpacyによるパース処理を行う。

class Spacy(Parser):

    def parse(self, document, text):
        '''
        Transform spaCy output to match CoreNLP's default format
        :param document:
        :param text:
        :return:
        '''
        text = self.to_unicode(text)

        doc = self.model.tokenizer(text)
        for proc in self.pipeline:
            proc(doc)
        assert doc.is_parsed

        position = 0
        for sent in doc.sents:
            parts = defaultdict(list)
            text = sent.text

            for i,token in enumerate(sent):
                parts['words'].append(str(token))
                parts['lemmas'].append(token.lemma_)
                parts['pos_tags'].append(token.tag_)
                parts['ner_tags'].append(token.ent_type_ if token.ent_type_ else 'O')
                parts['char_offsets'].append(token.idx)
                parts['abs_char_offsets'].append(token.idx)
                head_idx = 0 if token.head is token else token.head.i - sent[0].i + 1
                parts['dep_parents'].append(head_idx)
                parts['dep_labels'].append(token.dep_)

            # Add null entity array (matching null for CoreNLP)
            parts['entity_cids'] = ['O' for _ in parts['words']]
            parts['entity_types'] = ['O' for _ in parts['words']]

            # make char_offsets relative to start of sentence
            parts['char_offsets'] = [
                p - parts['char_offsets'][0] for p in parts['char_offsets']
            ]
            parts['position'] = position

            # Link the sentence to its parent document object
            parts['document'] = document
            parts['text'] = text

            # Add null entity array (matching null for CoreNLP)
            parts['entity_cids'] = ['O' for _ in parts['words']]
            parts['entity_types'] = ['O' for _ in parts['words']]

            # Assign the stable id as document's stable id plus absolute
            # character offset
            abs_sent_offset = parts['abs_char_offsets'][0]
            abs_sent_offset_end = abs_sent_offset + parts['char_offsets'][-1] + len(parts['words'][-1])
            if document:
                parts['stable_id'] = construct_stable_id(document, 'sentence', abs_sent_offset, abs_sent_offset_end)

            position += 1

            yield parts

パースした内容は、Document・Sentenceモデルで確認できる。(この辺の操作はSQLAlchemyで行う。)
例えば、Sentenceモデルのテーブル構造はこんな感じ。

列名
id Integer
words String
char_offsets Integer
abs_char_offsets Integer
lemmas String
pos_tags String
ner_tags String
dep_parents Integer
dep_labels String
entity_cids String
entity_types String

*ここで定義されている。
snorkel/context.py at master · HazyResearch/snorkel · GitHub
以下は先頭の文章をからいくつかの列の値抽出した結果。

sent1 = session.query(Sentence).first()
print sent1.words
['NEW', 'YORK', '--', 'Theatergoers', 'who', 'check', 'out', '"', 'Beautiful', '"', 'on', 'tour', 'wo', "n't", 'get', 'to', 'see', 'Tony', 'winner', 'Jessie', 'Mueller', 'but', 'they', 'may', 'get', 'the', 'next', 'best', 'thing', '--', 'someone', 'with', 'her', 'DNA', '.', '  ']
print sent1.char_offsets
[0, 4, 9, 12, 25, 29, 35, 39, 40, 49, 51, 54, 59, 61, 65, 69, 72, 76, 81, 88, 95, 103, 107, 112, 116, 120, 124, 129, 134, 140, 143, 151, 156, 160, 163, 165]
print sent1.pos_tags
[u'JJ', u'NN', u':', u'NNS', u'WP', u'VBP', u'RP', u'``', u'JJ', u"''", u'IN', u'NN', u'MD', u'RB', u'VB', u'TO', u'VB', u'NNP', u'NN', u'NNP', u'NNP', u'CC', u'PRP', u'MD', u'VB', u'DT', u'JJ', u'JJS', u'NN', u':', u'NN', u'IN', u'PRP$', u'NNP', u'.', u'SP']
print sent1.ner_tags
['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', u'PERSON', 'O', u'PERSON', u'PERSON', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', u'ORG', 'O', 'O']

一連のパース処理は、英文だからできる。(Spcayが対応している言語ならできる。)
残念ながら日本語は対応していない。(janome/spaCyで一部パースは可能)
ここらへんをどう処理するかを考える必要がある。

candidate_subclass

タスクのcandidateを定義する。
以下では、person1・person2のエンティティを関係Spouse(配偶者)として定義している。

from snorkel.models import candidate_subclass

Spouse = candidate_subclass('Spouse', ['person1', 'person2'])

こうしたら固有表現抽出でも使えるらしい。

from snorkel.models import candidate_subclass

Person = candidate_subclass('Person', ['person1'])

Training data for training a NER model · Issue #599 · HazyResearch/snorkel · GitHub
後で、詳しく見てみたい。

CandidateExtractor

Candidateの抽出。ここが前処理において一番大事と思われる。
ここでSpcayによりパースしたNERタグが使われる。

from snorkel.candidates import Ngrams, CandidateExtractor
from snorkel.matchers import PersonMatcher

ngrams         = Ngrams(n_max=7)
person_matcher = PersonMatcher(longest_match_only=True)
cand_extractor = CandidateExtractor(Spouse, 
                                    [ngrams, ngrams], [person_matcher, person_matcher],
                                    symmetric_relations=False)

Ngramsのインスタンスメソッドを確認する。

class Ngrams(CandidateSpace):
    """
    Defines the space of candidates as all n-grams (n <= n_max) in a Sentence _x_,
    indexing by **character offset**.
    """
    def __init__(self, n_max=5, split_tokens=('-', '/')):
        CandidateSpace.__init__(self)
        self.n_max     = n_max
        self.split_rgx = r'('+r'|'.join(split_tokens)+r')' if split_tokens and len(split_tokens) > 0 else None

ngramsの属性を確認する。

print ngrams.__dict__
{'n_max': 7, 'split_rgx': '(-|/)'}

次にPersonMatcherについて

person_matcher = PersonMatcher(longest_match_only=True)

PersonMatcherのインスタンスメソッドを確認する。

class PersonMatcher(RegexMatchEach):
    """
    Matches Spans that are the names of people, as identified by CoreNLP.

    A convenience class for setting up a RegexMatchEach to match spans
    for which each token was tagged as a person.
    """
    def __init__(self, *children, **kwargs):
        kwargs['attrib'] = 'ner_tags'
        kwargs['rgx'] = 'PERSON'
        super(PersonMatcher, self).__init__(*children, **kwargs)

person_matcherの属性を確認する。

print person_matcher.__dict__
{'ignore_case': True, 'longest_match_only': True, 'sep': ' ', 'rgx': 'PERSON$', 'r': <_sre.SRE_Pattern object at 0x7f4ff55e6ea0>, 'attrib': 'ner_tags', 'children': (), 'opts': {'rgx': 'PERSON', 'attrib': 'ner_tags', 'longest_match_only': True}}

person_matcherはNERタグが「PERSON」の単語をターゲットとするための定義を行う。
snorkelでは、PERSON, LOCATION, ORGANIZATION などの標準的なNERタグをcandidateのターゲットとするためのクラスを用意してくれている。
※独自のタグをターゲットとする場合、これらのクラスに倣って専用のクラスを作ればよい。
最後にCandidateExtractorの引数に定義したものをセットする。
※symmetric_relationsは、集合「A, B」を候補「A, B」・「B, A」として扱うか。一つの候補「A, B」としてのみ扱うか。
そして、applyメソッドでcandidateの抽出処理を行う。

for i, sents in enumerate([train_sents, dev_sents, test_sents]):
    cand_extractor.apply(sents, split=i, parallelism=1)
    print("Number of candidates:", session.query(Spouse).filter(Spouse.split == i).count())

applyメソッドで重要と思われる箇所を見ていく。
大事なところは「candidate_spacesインスタンスのapplyメソッドの戻り値をmatchersインスタンスのapplyメソッドの引数」としていること。

def apply(self, context, clear, split, **kwargs):
    # Generate TemporaryContexts that are children of the context using the candidate_space and filtered
    # by the Matcher
    for i in range(self.arity):
        self.child_context_sets[i].clear()
        for tc in self.matchers[i].apply(self.candidate_spaces[i].apply(context)):
            tc.load_id_or_insert(self.session)
            self.child_context_sets[i].add(tc)

Ngramsクラスのapplyメソッドを確認する。

class Ngrams(CandidateSpace):   
    def apply(self, context):

        # These are the character offset--**relative to the sentence start**--for each _token_
        offsets = context.char_offsets

        # Loop over all n-grams in **reverse** order (to facilitate longest-match semantics)
        L    = len(offsets)
        seen = set()
        for l in range(1, self.n_max+1)[::-1]:
            for i in range(L-l+1):
                w     = context.words[i+l-1]
                start = offsets[i]
                end   = offsets[i+l-1] + len(w) - 1
                ts    = TemporarySpan(char_start=start, char_end=end, sentence=context)
                if ts not in seen:
                    seen.add(ts)
                    yield ts

tsの内容を下記sentenceを例に数件確認する。

i 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
words NEW YORK -- Theatergoers who check out " Beautiful " on tour wo n't get to see
offset 0 4 9 12 25 29 35 39 40 49 51 54 59 61 65 69 72

外側のループ1回目:
内側のループ1回目:
まず、L=16、外側のループでl=7(self.n_max=7)、内側のループでi=0がセットされる。
各変数は、

w = "out"
start = 0
end = 37 # 35 + 3 - 1
ts = TemporarySpan(char_start=0, char_end=37, sentence=context)
# ts.char_start=0
# ts.char_end=37
# ts.sentence=context

内側のループ2回目:
i=1に更新
各変数は、

w = ""
start = 4
end = 38 # 39 + 0 - 1
ts = TemporarySpan(char_start=4, char_end=38, sentence=context)
# ts.char_start=4
# ts.char_end=38
# ts.sentence=context

となる。
外側のループ1回目では、7-gramsの集合を作る。
外側のループ2回目:
内側のループ1回目:
l=6、i=0がセットされる。
各変数は、

w = "check"
start = 0
end = 33 # 29 + 5 - 1
ts = TemporarySpan(char_start=0, char_end=33, sentence=context)
# ts.char_start=0
# ts.char_end=33
# ts.sentence=context

となる。
外側のループ1回目では、6-gramsの集合を作る。
これを最終的にunigramの集合まで作成する。
実際はtsを生成するごとに、それをmatchersインスタンスのapplyメソッドに渡す。
PersonMatcherクラスのapplyメソッドを確認する。
※正確には、継承元Matcherから引き継いだもの
n-gramsのcandidateを一つずつチェックする。

def apply(self, candidates):
    """
    Apply the Matcher to a **generator** of candidates
    Optionally only takes the longest match (NOTE: assumes this is the *first* match)
    """
    seen_spans = set()
    for c in candidates:
        if self.f(c) and (not self.longest_match_only or not any([self._is_subspan(c, s) for s in seen_spans])):
            if self.longest_match_only:
                seen_spans.add(self._get_span(c))
            yield c
class RegexMatchEach(RegexMatch):
    """Matches regex pattern on **each token**"""
    def _f(self, c):
        tokens = c.get_attrib_tokens(self.attrib)
        return True if tokens and all([self.r.match(t) is not None for t in tokens]) else False

具体的には、_f関数でn-grams全てがマッチングするか(今回の場合、tokenのner_tags属性が全てPERSONか)判定。
例えば、下記の文章で考える。

Barack Obama and first lady Michelle Obama ...

最大3gramsで考えた場合、

Barack Obama and
Obama and first
and first lady
first lady Michelle
lady Michelle Obama
Barack Obama and
Obama and first
and first lady
first lady Michelle
lady Michelle Obama
Barack Obama
Obama and
and first
first lady
lady Michelle
Michelle Obama
Barack
Obama
and
first
lady
Michelle
Obama
Obama and
and first
first lady
lady Michelle
Michelle Obama
Barack
Obama
and
first
lady
Michelle
Obama

が候補になる。
n-gramが大きい方から候補を探すので、bi-gramの「Barack Obama」が候補となる。
また、既に候補が存在する場合、そのspanはseen_spansに格納済みなので、より小さいn-gramで再度候補となることはない(この例の場合、「BarackObama」が候補となることは無い)。
また、実際は単語ではなく、単語の開始位置で候補を判定するので、同じ単語が存在しても不都合が生じることは無い。
DB登録まで追えてはいないが、候補単位で登録を行うことで、同じ候補での重複(候補がBarack Obama同士になる)を無くしているのだと考えられる。
最後に抽出後のCandidateを確認する。

cands = session.query(Candidate).filter(Candidate.split == 0).all()
print cands[0]
# Spouse(Span("Sarko", sentence=44253, chars=[55,59], words=[10,10]), Span("Carla Bruni", sentence=44253, chars=[79,89], words=[15,16]))
print cands[0].person1
# Span("Sarko", sentence=44253, chars=[55,59], words=[10,10])
print cands[0].person2
# Span("Carla Bruni", sentence=44253, chars=[79,89], words=[15,16])
# the raw word tokens for the person1 Span
print cands[0].person1.get_attrib_tokens("words")
# ['Sarko']
print cands[0].person1.get_attrib_tokens("pos_tags")
# [u'NNP']
print cands[0].person1.get_attrib_tokens("ner_tags")
# [u'PERSON']

これで、前処理が完了した。
次は生成モデルの構築について見ていきたい。