From bf2857bf229cc01e2b5c9203638b9f3a6bdcac06 Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 10 Oct 2023 10:56:32 +0100 Subject: [PATCH 1/2] CU-8692wgmkm: Add neo4J connector that was removed from MedCAT --- helper/neo/README.md | 4 + helper/neo/data_preparation.py | 231 +++++++++++++++++++++++++++++++++ helper/neo/neo_connector.py | 161 +++++++++++++++++++++++ 3 files changed, 396 insertions(+) create mode 100644 helper/neo/README.md create mode 100644 helper/neo/data_preparation.py create mode 100644 helper/neo/neo_connector.py diff --git a/helper/neo/README.md b/helper/neo/README.md new file mode 100644 index 0000000..4e7209f --- /dev/null +++ b/helper/neo/README.md @@ -0,0 +1,4 @@ +This folder contains the code removed from MedCAT with the following PR: +https://github.com/CogStack/MedCAT/pull/356 + +It may be worth rewriting this with some other dependency. But we'll keep it here for now. diff --git a/helper/neo/data_preparation.py b/helper/neo/data_preparation.py new file mode 100644 index 0000000..674fca7 --- /dev/null +++ b/helper/neo/data_preparation.py @@ -0,0 +1,231 @@ +import os +import pandas as pd + + +def get_index_queries(): + """Run before everything to speed up things.""" + return ['CREATE INDEX patientId FOR (p:Patient) ON (p.patientId);', + 'CREATE INDEX conceptId FOR (c:Concept) ON (c.conceptId);', + 'CREATE INDEX documentId FOR (d:Document) ON (d.documentId);'] + + +def create_neo_csv(data, columns, output_dir='/etc/lib/neo4j/import/', + base_name='patients'): + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data. + columns: + What data to use from the dataframe. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + base_name: + Name of the csv. + """ + if isinstance(data, pd.DataFrame): + df = data + else: + df = pd.read_csv(data) + + # Remove duplicates + df = df.drop_duplicates(subset=columns) + + out_df = df[columns] + data_path = os.path.join(output_dir, f"{base_name}.csv") + out_df.to_csv(data_path, index=False) + + +def create_patients_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='patients'): + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: patientId, + sex, ethnicity, dob. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible, + but writing there could be only admin. + + Returns: + str: The query. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'CREATE (:Patient {patientId: toString(row.patientId), \n' + ' sex: toString(row.sex), \n' + ' ethnicity: toString(row.ethnicity), \n' + ' dob: datetime(row.dob)}) \n' + ) + + create_neo_csv(data=data, columns=['patientId', 'sex', 'ethnicity', 'dob'], + output_dir=output_dir, base_name=base_name) + + return query + + +def create_documents_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='documents'): + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: documentId. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + + Returns: + str: The query. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'CREATE (:Document {documentId: toString(row.documentId)}) \n' + ) + + create_neo_csv(data=data, columns=['documentId'], + output_dir=output_dir, base_name=base_name) + + return query + + +def create_concepts_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='concepts'): + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: conceptId, + name and type. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'CREATE (:Concept {conceptId: toString(row.conceptId), \n' + ' type: toString(row.type), \n' + ' name: toString(row.name)}) \n' + ) + + create_neo_csv(data=data, columns=['conceptId', 'name', 'type'], + output_dir=output_dir, base_name=base_name) + + return query + + +def create_document2patient_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='document2patient'): + + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: patientId and + documentId. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'MATCH (pt:Patient {patientId: toString(row.patientId)}) \n' + 'MATCH (doc:Document {documentId: toString(row.documentId)}) \n' + 'CREATE (pt)-[:HAS]->(doc); \n' + ) + + create_neo_csv(data=data, columns=['patientId', 'documentId'], + output_dir=output_dir, base_name=base_name) + + return query + + +def create_concept_ontology_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='concept_ontology'): + + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: child, parent. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'MATCH (child:Concept {conceptId: toString(row.child)}) \n' + 'MATCH (parent:Concept {conceptId: toString(row.parent)}) \n' + 'CREATE (child)-[:IS_A]->(parent); \n' + ) + + create_neo_csv(data=data, columns=['child', 'parent'], + output_dir=output_dir, base_name=base_name) + + return query + + +def create_document2concept_csv(data, output_dir='/etc/lib/neo4j/import/', + base_name='document2concepts'): + """Creates a patients CSV for neo4j load csv function + + Args: + data: + A dataframe or path to a dataframe with the required data: 'conceptId', + 'documentId', 'contextSimilarity', 'start', 'end', 'timestamp', + 'metaSubject', 'metaPresence', 'metaTime'. + output_dir: + Where to save the CSVs, should be the neo4j imports path if possible. + """ + query = ( + 'USING PERIODIC COMMIT 100000 \n' + f'LOAD CSV WITH HEADERS FROM "file:///{base_name}.csv" AS row \n' + 'MATCH (doc:Document{documentId: toString(row.documentId)}) \n' + 'MATCH (concept:Concept {conceptId: toString(row.conceptId)}) \n' + 'CREATE (doc)-[:HAS {start: toInteger(row.start), \n' + ' end: toInteger(row.end), \n' + ' timestamp: toInteger(row.timestamp), \n' + ' contextSimilarity: toFloat(row.contextSimilarity), \n' + ' metaSubject: toString(row.metaSubject), \n' + ' metaPresence: toString(row.metaPresence), \n' + ' metaTime: toString(row.metaTime) \n' + ' }]->(concept); \n' + ) + + columns = ['conceptId', 'documentId', 'contextSimilarity', 'start', + 'end', 'timestamp', 'metaSubject', 'metaPresence', 'metaTime'] + + create_neo_csv(data=data, columns=columns, + output_dir=output_dir, base_name=base_name) + + return query + + +def get_data_from_docs(docs, doc2pt, doc2time=None): + data = [['conceptId', 'documentId', 'contextSimilarity', + 'start', 'end', 'timestamp', 'metaSubject', + 'metaPresence', 'metaTime']] + + for doc_id, doc in docs.items(): + row = [] + for ent in doc['entities'].values(): + #if ent['meta_anns']['Subject']['value'] == 'Patient' and \ + # ent['meta_anns']['Presence']['value'] == 'True': + if doc2time is not None: + t = doc2time[doc_id] + else: + t = ent['document_timestamp'] + + row = [ent['cui'], doc_id, + ent['context_similarity'], + ent['start'], ent['end'], + t, + ent['meta_anns'].get('Subject', {}).get('value', None), + ent['meta_anns'].get('Presence', {}).get('value', None), + ent['meta_anns'].get('Time', {}).get('value', None)] + data.append(row) + row = [] + + return data \ No newline at end of file diff --git a/helper/neo/neo_connector.py b/helper/neo/neo_connector.py new file mode 100644 index 0000000..2da9c31 --- /dev/null +++ b/helper/neo/neo_connector.py @@ -0,0 +1,161 @@ +from py2neo import Graph +import getpass +from collections import defaultdict + + +class NeoConnector: + def __init__(self, uri, user, password=None): + if password is None: + password = getpass.getpass("Password:") + self.graph = Graph(uri, auth=(user, password)) + + def execute(self, query): + r = self.graph.run(query) + return r + + def bucket_concepts(self, data, bucket_size_seconds): + entities = data['entities'] + + _bucket = [] + _concepts = set() + start_time = -1 + new_stream = [] + # Sort entities + entities.sort(key=lambda ent: ent['timestamp']) + for ent in entities: + if start_time == -1: + start_time = ent['timestamp'] + + if ent['timestamp'] - start_time >= bucket_size_seconds: + # Add to stream + new_stream.extend(_bucket) + _bucket = [] + _concepts = set() + start_time = ent['timestamp'] + + t_ent = dict(new_stream[-1]) + t_ent['timestamp'] += 1 + t_ent['name'] = '' + t_ent['conceptId'] = '' + new_stream.append(t_ent) + + if ent['conceptId'] not in _concepts: + _bucket.append(ent) + _concepts.add(ent['conceptId']) + + if _bucket: + new_stream.extend(_bucket) + + data['entities'] = new_stream + + def get_all_patients(self, concepts, limit=1000, require_time=False, ignore_meta=False): + """Return all patients having all concepts + + Args: + concepts: The concepts + limit: The maximum number of results. Defaults to 1000. + require_time: If set only concepts that have the timestamp property will be used. + """ + + q = "WITH [{}] AS cs ".format(",".join(["'{}'".format(c) for c in concepts])) + if not require_time: + q += '''MATCH (c:Concept)<-[:HAS ''' + if not ignore_meta: + q += '''{metaPresence: 'True', metaSubject: 'Patient'}''' + q += ''']-(:Document)<-[:HAS]-(pt:Patient) + WHERE c.conceptId in cs + WITH pt, size(cs) as inputCnt, count(DISTINCT c) as cnt + WHERE cnt = inputCnt + ''' + else: + q += '''MATCH (c:Concept)<-[r:HAS {metaPresence: 'True', metaSubject: + 'Patient'}]-(:Document)<-[:HAS]-(pt:Patient) \n + WHERE c.conceptId in cs AND exists(r.timestamp) \n + WITH pt, size(cs) as inputCnt, count(DISTINCT c) as cnt \n + WHERE cnt = inputCnt \n + ''' + + q += ' RETURN pt LIMIT {}'.format(limit) + data = self.execute(q).data() # Do not like this too much + + return [n['pt']['patientId'] for n in data], q + + def get_all_concepts_from(self, patient_id=None, document_id=None, + limit=1000, bucket_size_seconds=None, min_count=0, meta_requirements=None, require_time=True): + """Returns all concepts belonging to a document or patient + given the concept type (if none all are retruned). + """ + + if patient_id is not None: + q = 'MATCH (patient:Patient {patientId: "%s"})-[:HAS]->' % patient_id \ + + '(document:Document)-[has:HAS]->(concept:Concept) \n' + elif document_id is not None: + q = 'MATCH (patient:Patient)-[:HAS]->(document:Document {documentId: "%s"})' % document_id \ + + '-[has:HAS]->(concept:Concept) \n' + else: + raise Exception("patient_id or document_id are required") + q += 'RETURN patient, document, concept, has LIMIT %s \n' % limit + + data = self.execute(q).data() # Do not like this too much + out = None + if len(data) > 0: + out = {'patient': dict(data[0]['patient']), + 'entities': []} + + cnt = defaultdict(int) + for row in data: + if meta_requirements is None or \ + all([row['has'][meta] == value for meta,value in meta_requirements.items()]): + if not require_time or 'timestamp' in row['has']: + ent = dict(row['concept']) # Take everything from concept + ent['documentId'] = row['document']['documentId'] + ent.update(row['has']) # add all the stuff from the meta ann + + out['entities'].append(ent) + cnt[ent['conceptId']] += 1 + + # Cleanup based on min_count + new_ents = [] + for ent in out['entities']: + if cnt[ent['conceptId']] >= min_count: + ent['count'] = cnt[ent['conceptId']] + new_ents.append(ent) + out['entities'] = new_ents + + if bucket_size_seconds is not None: + self.bucket_concepts(data=out, bucket_size_seconds=bucket_size_seconds) + + return out, q + + def get_all_patients_descend(self, concepts, limit=1000, require_time=False): + """Return all patients having all descendant concepts under the ancestor concept + + Args: + concepts: Ancestor top-level concepts + limit: The maximum number of results. Defaults to 1000. + require_time: If set only concepts that have the timestamp property will be used. + Defaults to False + Returns: + List: Patients with attached SNOMED concepts + """ + + q = "WITH [{}] AS ancestor ".format(",".join(["'{}'".format(c) for c in concepts])) + if not require_time: + q += '''MATCH (n:Concept)-[:IS_A*0..5]->(m:Concept) + WHERE m.conceptId IN ancestor ## get the ancestor and the children + WITH [n.conceptId] AS lineage ## pass the lineage to patient match + MATCH (c:Concept)<-[r:HAS {metaPresence: 'True', metaSubject: 'Patient'}]-(d:Document)<-[q:HAS]-(pt:Patient) + WHERE c.conceptId in lineage + ''' + else: + q += '''MATCH (n:Concept)-[:IS_A*0..5]->(m:Concept) + WHERE m.conceptId IN ancestor ## get the ancestor and the children + WITH [n.conceptId] AS lineage ## pass the lineage to patient match + MATCH (c:Concept)<-[r:HAS {metaPresence: 'True', metaSubject: 'Patient'}]-(d:Document)<-[q:HAS]-(pt:Patient) + WHERE c.conceptId in lineage AND exists(r.timestamp) + ''' + + q += ' RETURN pt.patientId, pt.sex, c.conceptId, c.name, r.timestamp LIMIT {}'.format(limit) + data = self.execute(q).data() # Do not like this too much + + return [n['pt']['patientId'] for n in data], q \ No newline at end of file From 67fcbd2f9c6f40a1ca1fc0d8bde9730be4643107 Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 10 Oct 2023 10:58:33 +0100 Subject: [PATCH 2/2] CU-8692wgmkm: Add py2neo dependency to requirements --- requirements-dev.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 2a9e8d8..2a4020b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,4 +4,5 @@ seaborn~=0.11.2 pytest-xdist~=2.5.0 nbmake<1.4 nbconvert<6 -jinja2<=3.0 \ No newline at end of file +jinja2<=3.0 +py2neo~=2021.2.3 \ No newline at end of file