# Copyright (c) 2018, Vanessa Sochat All rights reserved.
# See the LICENSE in the main repository at:
# https://www.github.com/openschemas/openschemas-python
from rdflib import ConjunctiveGraph
import csv
import os
import re
import requests
import sys
# Loading Functions
[docs]def load_tsv(filename):
'''load a tsv file using the csv default provided reader!
Parameters
==========
filename: the file name to load, will return list (rows) of
lists (columns)
'''
rows = []
with open(filename,'r') as tsv:
content = csv.reader(tsv, delimiter='\t')
for row in content:
if row:
rows.append(row)
return rows
# RDF Functions
def __get_class_name(temp_uri):
return temp_uri.replace("http://schema.org/","")
def __add_property(props_dic, prop_desc):
sdo_uri="http://schema.org/"
if prop_desc['prop_name'] in props_dic:
t_prop_name = prop_desc['prop_name']
props_dic[t_prop_name]['exp_type'].append(prop_desc['exp_type'].replace(sdo_uri,""))
else:
props_dic[prop_desc['prop_name']]=prop_desc
props_dic[prop_desc['prop_name']]['exp_type'] = [prop_desc['exp_type'].replace(sdo_uri,"")]
return props_dic
def __get_class_props(class_name, graph):
print("Quering properties of %s in Schema.org" % class_name)
qres = graph.query("""prefix schema: <http://schema.org/>
select distinct * where {
?property schema:domainIncludes schema:%s .
?property schema:rangeIncludes ?exp_type .
?property rdfs:label ?prop_name.
?property rdfs:comment ?description
}""" % class_name)
temp_dic = {}
for row in qres:
labels=row.labels.keys()
labels_dic = {}
for label in labels:
labels_dic[label] = str(row[label]).replace('<a href=\"/docs/', '<a href=\"http://schema.org/docs/')
temp_dic=__add_property(temp_dic, labels_dic)
return temp_dic
def __get_parent_type(class_name, graph):
print("Find parent type of %s in Schema.org" % class_name)
qres = graph.query("""prefix schema: <http://schema.org/>
select ?supclass where{
?class rdfs:label ?label .
?class rdfs:subClassOf ?supclass .
filter (?label='%s')
}""" % class_name)
resp_arr=[]
for row in qres:
resp_arr.append(str(row['supclass']))
return resp_arr[0].replace('http://schema.org/', '')
def _get_properties(class_name, graph, properties):
if class_name == 'Thing':
properties[class_name] = __get_class_props(class_name, graph)
return properties
else:
temp_props = __get_class_props(class_name, graph)
properties[class_name] = temp_props
parent_type = __get_parent_type(class_name, graph)
_get_properties(parent_type, graph, properties)
[docs]def get_properties_in_hierarchy(type_name):
query_type = type_name
g = ConjunctiveGraph()
g.parse('http://schema.org/version/latest/schema.jsonld', format='json-ld')
props_dic={}
_get_properties(query_type, g, props_dic)
return props_dic
[docs]def get_hierarchy(props_dic):
type_hierarchy = []
for h_type in props_dic:
type_hierarchy.append(h_type)
return type_hierarchy
[docs]def get_expected_types(expected_types):
'''Function that receives an string with expected types
and generates an array with each expected type
'''
# Get rid of newlines anywhere
expected_types = expected_types.strip().replace('\n',' ')
# Get rid of OR in any casing, anywhere with space either side
expected_types = re.sub(' (o|O)(r|R) ', ' ', expected_types)
# Split based on space OR comma
expected_types = re.split(' |,', expected_types)
# Keep a separate final list of cleaned types
list_of_types = []
for expected_type in expected_types:
if expected_type not in ['', None]:
list_of_types.append(expected_type.strip())
return list_of_types
[docs]def get_row_value(field, row, headers, default='', clean=True):
'''get a value from a list based on a field name that is expected to
appear in headers. This allows for change in the ordering of fields
as long as the header is correctly labeled. The default value of
an empty string is returned, and we clean by stripping spaces and
newlines.
Parameters
==========
field: the field to look up
row: the row (list) of values
headers: the list of field names (header of the tsv file) to match field
default: the default value to return, if not found
clean: boolean to indicate wanting to strip newlines and spaces
'''
value = default
for i in range(0, len(row)):
if headers[i] == field:
value = row[i]
break
if clean is True:
value = value.strip().replace('\n', '')
return value
[docs]def get_dict_from_row(row, headers):
'''a row is typically a list of values, assigned to another list of headers.
this function parses a known set of headers and enters them into
the expected values for bioschemas attributes. We return a dict.
Properties
==========
row: the row (list) of values from the bioschemas tsv file
headers: the headers that are expected (already validated)
'''
props = {}
# Set Bioschemas attributes
props['bsc_description'] = get_row_value('BSC Description', row, headers)
props['cardinality'] = get_row_value('Cardinality', row, headers)
props['controlled_vocab'] = get_row_value('Controlled Vocabulary', row, headers)
props['description'] = get_row_value('Description', row, headers, ' ')
props['example'] = get_row_value('Example', row, headers)
props['marginality'] = get_row_value('Marginality', row, headers)
props['property'] = get_row_value('Property', row, headers)
props['type'] = get_row_value('Type', row, headers)
props['type_url'] = get_row_value('Type URL', row, headers)
# Expected types list, cleaned up
expected_types = get_row_value('Expected Type', row, headers)
props['expected_types'] = get_expected_types(expected_types)
return props
[docs]def get_property_in_hierarchy(sdo_props, mapping_property, prop_type="new_sdo"):
'''if a mapping property (indexed by its name, which is key "property")
if found as a key in sdo_props at a particular entity (e.g., Thing)
then we've found it in the sdo, and grab it's description for use from
the sdo. If we don't find it, we return the type as a "new_sdo"
'''
for entity in sdo_props:
if mapping_property['property'] in sdo_props[entity]:
prop_type = entity
name = mapping_property['property']
desc = sdo_props[entity][name]['description']
mapping_property['description'] = desc
break
return {'type': prop_type,
'property': mapping_property}
[docs]def get_mapping_properties(bioschemas_file):
'''get_mapping_properties
use the bioschemas field file and the specification type to
return a list of type properties. The bioschemas file
should already be validated for correct headers.
Parameters
==========
bioschemas_file: the <Template> - Bioschemas.tsv file
'''
rows = load_tsv(bioschemas_file)
headers = rows[0]
type_properties = []
for r in range(1,len(rows)):
row = rows[r]
# If we want to do checks for empty cells, do it here
# If Expected Type, Marginality, and Cardinaity isn't empty
if row[1] != "" and row[6] != "" and row[7] != "":
property_dict = get_dict_from_row(row, headers)
type_properties.append(property_dict)
return type_properties
[docs]class MappingParser:
metadata = {}
def __init__(self, metadata=None):
if metadata != None:
self.metadata = metadata
[docs] def check_url(self, spec_url):
'''check_url doesn't exit if the address isn't found, etc.
it just adds the string "err_404" as metadata given these cases.
'''
if spec_url is None:
return "err_404"
response = requests.get(spec_url)
if response.status_code == 404:
return "err_404"
else:
return spec_url
[docs] def get_description(self, spec_file=None):
if not spec_file:
spec_file = self.metadata['specification_file']
# Read in both, these are already validated
spec_sheet = load_tsv(spec_file)
# Generate values in advance
name = self.metadata['name']
gh_base = 'https://github.com/openschemas/specifications/tree/master'
use_cases_url = self.metadata['use_cases_url']
description = {}
description['name'] = name
description['status'] = self.metadata['status']
description['spec_type'] = self.metadata['spec_type']
# Github Future Links
description['gh_folder'] = '%s/%s' % (gh_base, name)
description['gh_tasks'] = 'https://github.com/openschemas/specifications/labels/type%3A%20'+ name
description['edit_url']='%s/%s/specification.html' % (gh_base, name)
description['use_cases_url'] = self.check_url(use_cases_url)
description['version'] = self.metadata['version']
description['parent_type'] = self.metadata.get('parent_type', 'Thing')
# Parse specification file
description['subtitle'] = spec_sheet[1][1]
description['description'] = spec_sheet[1][2]
return description
[docs] def get_mapping(self, spec_sheet=None,
bioschemas_sheet=None):
'''get a mapping, meanng the full properties given a specification sheet
and a bioschemas sheet. If files aren't provided, the defaults defined
at self.defaults.paths are used.
Parameters
==========
spec_sheet: the sheet with basic information (description, name, etc.)
bioschemas_sheet: sheet (tsv) with bioschemas fields
'''
print("\nParsing %s =========" % self.metadata['name'])
# Pull out description, name, type
description = self.get_description(spec_sheet)
name = description['name']
spec_type = description['spec_type']
if bioschemas_sheet is None:
bioschemas_sheet = self.metadata['bioschemas_file']
try:
ptype = description['parent_type']
sdo_props = get_properties_in_hierarchy(ptype)
except IndexError as e:
print('Error finding parent %s! Is %s a valid entity?' %ptype)
sys.exit(1)
description['hierarchy'] = get_hierarchy(sdo_props)
description['hierarchy'].reverse()
print_hierarchy = ' > '.join(description['hierarchy'])
print("Prepared schema.org properties for hierarchy %s" % print_hierarchy)
print("Classifing %s properties" % description['name'])
mapping_props = get_mapping_properties(bioschemas_sheet)
# Combine new specification with standard ontology to get final mapping
description['mapping'] = get_formatted_props(sdo_props,
mapping_props,
spec_name = name,
spec_type = spec_type)
return description