Pages

Friday, April 19, 2024

EMA Eudravigilance - Dashboard and collector tool


In the recent interview with Jim Ferguson, we discussed a Tableau dashboard I had generated from pharmacovigilance data from the European Medicines Agency (https://www.adrreports.eu/).



For those who wish to download the data, below is a Python script I developed with Twan van der Poel (https://vanderpoelsoftware.com/).

The tool downloads everything, both serious and non-serious line listings and both products and substances.

Later I will post the shell scripts and SQL statements to process the CSV files into PostgreSQL

If you have questions, pls feel free to leave a comment.

Download python script here: https://drive.google.com/file/d/1Zs8HiB3W-bd77OspdTrnhb8WiXsskfTM/view?usp=sharing

The Tableau dashboard is work in progress but can be downloaded here: https://drive.google.com/file/d/16RRvQJZ6VEyAF_Gm4DDPtE200esXTlSM/view?usp=share_link

(download a 14d trial of Tableau to open the dashboard)

import errno, json, hashlib, logging, pprint, urllib.request, re, requests, os, ssl, sys, time, threading, queue

from alive_progress import alive_bar;
from concurrent.futures import ThreadPoolExecutor
from itertools import product
from pathlib import Path
from multiprocessing.pool import ThreadPool
from urllib.parse import urlparse, unquote, urlencode
from requests.utils import requote_uri
from termcolor import colored
"""
# EMA Crawler
Crawls data from adrreports.eu and converts it into
data CSV and TXT files. The script operates from
the working directory and will create a "temp/", "output/" and "downloads/" subdirectory.
The "temp/" subdirectory will contain remote cache files and
an "output/" directory, the latter will contain the output files.
If downloads are performed the downloads are stored in a "downloads/" directory, this
directory holds hash references to keep track of what it's origins.
* Authors: In alphabetical order:
*          Twan van der Poel <twan@vanderpoelsoftware.com>
*          Wouter Aukema <waukema@gmail.com>
* Usage:   python3 ema-crawler.py
* Version: 1.98
# Changelog
## 1.98
* Used new session for each executor.map iteration
* Further integration of multithreading
* Temporary removal of hash-subdirectory
* Added stopwatch
## 1.97
* Dynamic non-configurable download directory
* Recycled session for each .get call
* Removed last bits of BIN_CHROMEDRIVER
* Dedicated function for remote calls
* Further integration of remote HTTP errors
* Resolved several encoding issues
## 1.96
* Simplified target directory based on files' md5 hash
* Wrapped executor in progress-bar
* Added parameter VERBOSE_DEBUGGING
## 1.95
* Added setting for verbose chromedriver logging
* Removed Selenium approach
* Integrated requests.Session()
## 1.94
* Integrated dynamic download storage
* Feature which moves downloads to their final location
* Added setting USE_DOWNLOADS_CACHE
## 1.93
* Integrated generic fatal_shutdown handler
## 1.92
* Headless support for Chromedriver
* Integrated storage filenames in list of downloads
* Integrated filters to capture downloads of more than 250K of lines properly
* Added parameter "MAX_PRODUCT_COUNT" for testing small chunks
* Dynamic construction of URL (incl. filters)
* Cache integration in download directory
* Dynamic construction of filter combinations for Covid related HLC's
## Todo
* Replace hard-coded ID's with detection of 250k limitation
"""
# Settings
MAX_PRODUCT_COUNT    = None
MAX_THREADS          = 20
USE_DOWNLOADS_CACHE  = False
VERBOSE_DEBUGGING    = False
connect_timeout: int = 40
download_timeout: int = 100

# Variables used to split up large downloads
variables = {
    'Seriousness':                                   'Serious',
    'Regulatory Primary Source Country EEA/Non EEA': 'European Economic Area',
    'Primary Source Qualification':                  'Healthcare Professional',
    'Sex':                                           'Female',
}
# Years used to split up large downloads
years = {
    '2017': '2017',
    '2018': '2018',
    '2019': '2019',
    '2020': '2020',
    '2021': '2021',
    '2022': '2022',
    '2023': '2023',
    '2024': '2024',
}
# Timing
executionStart = time.time()
print("Version 1.98")
# Function which handles fatal shutdowns
def fatal_shutdown(errorMessage, identifier):
    print(colored('+--------------------------------------------------+', 'red'))
    print(colored('| EMA crawler crashed, please read the error below |', 'red'))
    print(colored('+--------------------------------------------------+', 'red'))
    print(colored(errorMessage, 'yellow')+" "+colored("("+identifier+")", "white"))
    sys.exit()
# Function which debugs verbose logging (if enabled)
def verbose_debug(logMessage):
    if not VERBOSE_DEBUGGING:
        return
    print(logMessage)
# Helper to exit execution
def exit():
    sys.exit()
# Determine downloads directory
downloadsPath = ("%s%sdl" % (os.getcwd(), os.sep))
# Default EMA DPA base URL
emaBaseUrl = 'https://dap.ema.europa.eu/analytics/saw.dll?Go'
# Prepare thread storage
threadStorage = threading.local()
# Pre-perform validation
if MAX_PRODUCT_COUNT is not None and not isinstance(MAX_PRODUCT_COUNT, int):
    fatal_shutdown("MAX_PRODUCT_COUNT must be one of; numeric, None", "V01")
if not isinstance(MAX_THREADS, int) or MAX_THREADS < 1:
    fatal_shutdown("MAX_THREADS must be numeric, at least 1", "V02")
if not os.path.exists(downloadsPath):
    os.mkdir(downloadsPath)
if not os.path.exists(downloadsPath):
    fatal_shutdown("Downloads directory does not exist (and could not be created)", "V03")
# Ensure working directories
basepath = os.path.dirname(os.path.abspath(sys.argv[0]))+os.sep
workdirs = ['temp'+os.sep, 'output'+os.sep, 'downloads'+os.sep]
for workdir in workdirs:
    fullpath = basepath+workdir
    if not os.path.exists(fullpath):
        os.mkdir(fullpath)
    if not os.access(fullpath, os.W_OK):
        fatal_shutdown(("Working directory %s is not writable" % fullpath), "P01")
# Define important directories
CACHEDIR = basepath+workdirs[0]
OUTPUTDIR = basepath+workdirs[1]
# Function which creates a directory
def create_directory(directory):
    try:
        os.mkdir(directory)
    except OSError as exc:
        if exc.errno != errno.EEXIST:
            raise
        pass
# Function which initiates the thread executor
def initiate_thread_executor(resources):
    # Determine index
    threadStorage.threadIndex = resources.get(False)
    # Open initial session and authenticate
    threadStorage.localSession = requests.Session()
    # Prepare params
    params = {
        'Path':      '/shared/PHV DAP/DAP/Run Line Listing Report',
        'Format':    'txt',
        'Extension': '.csv',
        'Action':    'Extract',
        'P0':         0
    }
    # Initial request
    ema_get_remote(threadStorage.localSession, params)
# Function which fetches an URL and caches it by a hash
def fetch_remote_url(url):
    hash = hashlib.md5(url.encode()).hexdigest()
    cacheFile = CACHEDIR+hash+".html"
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    if os.path.exists(cacheFile):
        verbose_debug("Fetching local from %s" % url)
        contents = Path(cacheFile).read_text()
        return contents.encode('utf-8')
    else:
        verbose_debug("Fetching remote from %s" % url)
        try:
            with urllib.request.urlopen(url, context=ctx) as response:
                content = response.read()
            if not content:
                fatal_shutdown("Unable to fetch remote data from '%s'" % url, "R01")
        except:
            verbose_debug("Remote data is empty '%s'" % url)
            content = "".encode('utf-8')
        f = open(cacheFile, "wb")
        f.write(content)
        f.close()
        return fetch_remote_url(url)
# Function which creates BI filtered URL's
def create_filtered_url(combination):
    filters = combination['filters']
    params = {}
    params['Path']      = '/shared/PHV DAP/DAP/Run Line Listing Report'
    params['Format']    = 'txt'
    params['Extension'] = '.csv'
    params['Action']    = 'Extract'
    paramCounter = 0
    if len(filters) > 0:
        keyName = ('P%d' % paramCounter)
        params[keyName] = len(filters)
        paramCounter = paramCounter + 1
        for filter in filters:
            keyName = ('P%d' % paramCounter)
            params[keyName] = filter[0]
            paramCounter = paramCounter + 1
            keyName = ('P%d' % paramCounter)
            familyName = ('"Line Listing Objects"."%s"' % filter[1])
            params[keyName] = familyName
            paramCounter = paramCounter + 1
            keyName = ('P%d' % paramCounter)
            params[keyName] = (("1 %s" % filter[2]))
            paramCounter = paramCounter + 1
    baseUrl = emaBaseUrl
    url = baseUrl
    for property in params:
        value = params[property]
        params[property] = value
        url += ('&%s=%s' % (property, params[property]))
    return {
        'baseUrl': baseUrl,
        'params': params,
        'url': url
    }
# Function which calls remote with the right session and headers
def ema_get_remote(localSession, localParams):
    # Prepare statics
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    }
    errorPattern = r'<div class="ErrorMessage">(.*?)</div>'
    # Perform request
    response = localSession.get(emaBaseUrl, params=localParams, headers=headers, timeout=(connect_timeout, download_timeout))
    # Non 200 code?
    if response.status_code != 200:
        print(localParams)
        fatal_shutdown(f"Non remote 200 HTTP code, got {response.status_code}", "EGR01")
    # Explicitly specify the encoding if needed
    responseText = response.content.decode('utf-16', errors='replace')
    # Extract remote error?
    match = re.search(errorPattern, responseText, re.DOTALL)
    if match:
        print(localParams)
        fatal_shutdown(("Remote URL got error-response: %s" % match.group(1)), "EGR02")
    # Here you go, sir.
    return responseText
# Function which process compiled (downloadable) items
def process_compiled_item(item):
    # Prepare target
    hash = hashlib.md5(item['DownloadUrl'].encode()).hexdigest()
    #targetDirectory = "%s%s%s" % (downloadsPath, os.sep, hash)
    targetDirectory = "%s%s" % (downloadsPath, os.sep)
    targetFilename = "%s%s%s" % (targetDirectory, os.sep, item['OutputFile'])
    # Apply cache?
    if USE_DOWNLOADS_CACHE and os.path.exists(targetFilename):
        if not VERBOSE_DEBUGGING:
            bar()
        return
    # Create target directory
    create_directory(targetDirectory)
    # Debug
    verbose_debug("\nProcessing download")
    verbose_debug("-------------------")
    verbose_debug("-> HLC:  "+item['HighLevelCode'])
    verbose_debug("-> URL:  "+item['DownloadUrl'])
    verbose_debug("-> Into: "+targetDirectory)
    verbose_debug("-> File: "+targetFilename)
    # Perform the request
    responseText = ema_get_remote(threadStorage.localSession, item['Params'])
    # Write response body to file
    with open(targetFilename, 'w', encoding='utf-8') as file:
        file.write(responseText)
    verbose_debug("-> READY")
    if not VERBOSE_DEBUGGING:
        bar()
# Function which generates a matrix
def generate_matrix(variables):
    options = [[]]
    for name, value in variables.items():
        newOptions = []
        for option in options:
            newOptions.append(option + [f"{name}={value}"])
            newOptions.append(option + [f"{name}!={value}"])
        options = newOptions
    return options
# Fetch product- and substances list
collection = []
productCounter = 0
breakIteration = False
baseUrls = {}
baseUrls['products']   = 'https://www.adrreports.eu/tables/product/'
baseUrls['substances'] = 'https://www.adrreports.eu/tables/substance/'
chars = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0-9']
for type in baseUrls:
    baseUrl = baseUrls[type]
    baseType = type[:-1].capitalize()
    for char in chars:
        indexUrl = "%s%s.html" % (baseUrl, char)
        content = fetch_remote_url(indexUrl)
        pattern = r"<a\s+.*?</a>"
        matches = re.findall(pattern, str(content))
        for match in matches:
            # Extract CA
            centrallyAuthorized = "1"
            if type == 'substances':
                centrallyAuthorized = "0"
                if re.search(r"\*", match):
                    centrallyAuthorized = "1"
            # Extract URL
            words = match.strip().split('"')
            url = words[1]
            # Extract code
            parsed = urlparse(unquote(url))
            words = parsed.query.split('+')
            code = words[-1]
            # Extract name
            parsed = match.split(">")
            parsed = parsed[1].split("<")
            name = parsed[0]
            # Construct filters:
            #   First parameter is the total count of parameters
            #   Then, groups of 3 parameters are added
            #   Parameters above are respectively: operator, var1, var2 or value
            combinations = []
            # Default definition
            defaultFilters = []
            defaultFilters.append(['eq', baseType+' High Level Code', code])
            defaultFilters.append(['eq', 'Seriousness', 'Serious'])
            # Prepare outputfile
            filterString = ""
            # For Covid related HLC's we chop up the result because of a 250K line listing limit)
            if type == 'substances' and code in ['40983312', '40995439', '42325700', '21755']:
                # Generate matrix
                matrix = generate_matrix(variables)
                # Construct combo
                comboCounter = 0
                for year in years:
                    for square in matrix:
                        filterString = ""
                        filters = defaultFilters.copy()
                        filters.append(['eq', 'Gateway Year', year])
                        for point in square:
                            if re.search('!=', point):
                                variable, value = point.split('!=')
                                operator = 'neq'
                                filterString += "_Not"
                            else:
                                variable, value = point.split('=')
                                operator = 'eq'
                            filters.append([operator, variable, value])
                            filterString += ("_%s" % value)
                        comboCounter = comboCounter + 1
                        outputFile = ("%s_%s_%s%s.csv" % (type, code, year, filterString))
                        targetfile = downloadsPath + '/' + outputFile
                        if not os.path.exists(targetfile):
                            #print(f"{outputFile} exists")
                            #print(f"Working on {outputFile}")
                            combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
            else:
                # Default ID match filter
                filters = defaultFilters.copy()
                outputFile = ("%s_%s.csv" % (type, code))
                targetfile = downloadsPath + '/' + outputFile
                if not os.path.exists(targetfile):
                    #print(f"Working on {outputFile}")
                    combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
            # Raise product-counter
            productCounter += 1
            # Run every combination
            for combination in combinations:
                # Create URL
                result = create_filtered_url(combination)
                # Replace placeholders
                downloadUrl = result['url']
                downloadUrl = downloadUrl.replace('$hlc', code)
                downloadUrl = downloadUrl.replace('$type', type)
                # Collection storage
                item = {}
                item['HighLevelCode'] = code
                item['ProductName'] = name
                item['CentrallyAuthorized'] = centrallyAuthorized
                item['Type'] = type
                item['BaseUrl'] = result['baseUrl']
                item['IndexUrl'] = indexUrl
                item['DownloadUrl'] = downloadUrl
                item['OutputFile'] = combination['outputFile']
                item['Filters'] = combination['filters']
                item['Params'] = result['params']
                collection.append(item)
                #print(f"HLC: {item['HighLevelCode']}")
                #hier wordt dus de lijst met HLCs opgetuigd
            # Max product counter
            if MAX_PRODUCT_COUNT and productCounter == MAX_PRODUCT_COUNT:
                verbose_debug(("--> Warning: MAX_PRODUCT_COUNT reached = %d" % MAX_PRODUCT_COUNT))
                breakIteration = True
                break
        if breakIteration:
            break
    if breakIteration:
        break
# Preformat output
output = {}
for item in collection:
    type = item['Type']
    if not type in output.keys():
        output[type] = []
    output[type].append([str(item['HighLevelCode']), str(item['ProductName']), str(item['CentrallyAuthorized'])])
# Generate indexed output files
for type in output:
    resultFile = "%s%s.txt" % (OUTPUTDIR, type)
    verbose_debug("Generating output file %s" % resultFile)
    if os.path.exists(resultFile):
        os.remove(resultFile)
    lines = output[type]
    content = ""
    for line in lines:
        content += "\t".join(line)+"\n"
    f = open(resultFile, "wb")
    f.write(content.encode('utf-8'))
    f.close()
# Debug
resultFile = "%surls.txt" % (OUTPUTDIR)
if os.path.exists(resultFile):
    os.remove(resultFile)
verbose_debug("Generating URL output file %s" % resultFile)
content = ""
for item in collection:
    content += "\n"+item['OutputFile']+"\n"+item['DownloadUrl']+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Run executor
if VERBOSE_DEBUGGING:
    resources = queue.Queue(MAX_THREADS)
    for threadIndex in range(MAX_THREADS):
        resources.put(threadIndex, False)
    with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
        pool.map(process_compiled_item, collection)
else:
    collectionLength = len(collection)
    with alive_bar(collectionLength) as bar:
        resources = queue.Queue(MAX_THREADS)
        for threadIndex in range(MAX_THREADS):
            resources.put(threadIndex, False)
        with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
            pool.map(process_compiled_item, collection)
# Fin
if MAX_PRODUCT_COUNT != None:
    print(f"Requested {MAX_PRODUCT_COUNT} files")
runtime = round(time.time() - executionStart)
print(f"Execution finished in ~ {runtime} seconds")

No comments:

Post a Comment

I like interaction, thank you!

Note: Only a member of this blog may post a comment.