The tool downloads everything, both serious and non-serious line listings and both products and substances.
Later I will post the shell scripts and SQL statements to process the CSV files into PostgreSQL
If you have questions, pls feel free to leave a comment.
Download python script here: https://drive.google.com/file/d/1Zs8HiB3W-bd77OspdTrnhb8WiXsskfTM/view?usp=sharing
The Tableau dashboard is work in progress but can be downloaded here: https://drive.google.com/file/d/16RRvQJZ6VEyAF_Gm4DDPtE200esXTlSM/view?usp=share_link
(download a 14d trial of Tableau to open the dashboard)
import errno, json, hashlib, logging, pprint, urllib.request, re, requests, os, ssl, sys, time, threading, queue
from concurrent.futures import ThreadPoolExecutor
from itertools import product
from pathlib import Path
from multiprocessing.pool import ThreadPool
from urllib.parse import urlparse, unquote, urlencode
from requests.utils import requote_uri
from termcolor import colored
"""
# EMA Crawler
Crawls data from adrreports.eu and converts it into
data CSV and TXT files. The script operates from
the working directory and will create a "temp/", "output/" and "downloads/" subdirectory.
The "temp/" subdirectory will contain remote cache files and
an "output/" directory, the latter will contain the output files.
If downloads are performed the downloads are stored in a "downloads/" directory, this
directory holds hash references to keep track of what it's origins.
* Authors: In alphabetical order:
* Twan van der Poel <twan@vanderpoelsoftware.com>
* Wouter Aukema <waukema@gmail.com>
* Usage: python3 ema-crawler.py
* Version: 1.98
# Changelog
## 1.98
* Used new session for each executor.map iteration
* Further integration of multithreading
* Temporary removal of hash-subdirectory
* Added stopwatch
## 1.97
* Dynamic non-configurable download directory
* Recycled session for each .get call
* Removed last bits of BIN_CHROMEDRIVER
* Dedicated function for remote calls
* Further integration of remote HTTP errors
* Resolved several encoding issues
## 1.96
* Simplified target directory based on files' md5 hash
* Wrapped executor in progress-bar
* Added parameter VERBOSE_DEBUGGING
## 1.95
* Added setting for verbose chromedriver logging
* Removed Selenium approach
* Integrated requests.Session()
## 1.94
* Integrated dynamic download storage
* Feature which moves downloads to their final location
* Added setting USE_DOWNLOADS_CACHE
## 1.93
* Integrated generic fatal_shutdown handler
## 1.92
* Headless support for Chromedriver
* Integrated storage filenames in list of downloads
* Integrated filters to capture downloads of more than 250K of lines properly
* Added parameter "MAX_PRODUCT_COUNT" for testing small chunks
* Dynamic construction of URL (incl. filters)
* Cache integration in download directory
* Dynamic construction of filter combinations for Covid related HLC's
## Todo
* Replace hard-coded ID's with detection of 250k limitation
"""
# Settings
MAX_PRODUCT_COUNT = None
MAX_THREADS = 20
USE_DOWNLOADS_CACHE = False
VERBOSE_DEBUGGING = False
connect_timeout: int = 40
download_timeout: int = 100
# Variables used to split up large downloads
variables = {
'Seriousness': 'Serious',
'Regulatory Primary Source Country EEA/Non EEA': 'European Economic Area',
'Primary Source Qualification': 'Healthcare Professional',
'Sex': 'Female',
}
# Years used to split up large downloads
years = {
'2017': '2017',
'2018': '2018',
'2019': '2019',
'2020': '2020',
'2021': '2021',
'2022': '2022',
'2023': '2023',
'2024': '2024',
}
# Timing
executionStart = time.time()
print("Version 1.98")
# Function which handles fatal shutdowns
def fatal_shutdown(errorMessage, identifier):
print(colored('+--------------------------------------------------+', 'red'))
print(colored('| EMA crawler crashed, please read the error below |', 'red'))
print(colored('+--------------------------------------------------+', 'red'))
print(colored(errorMessage, 'yellow')+" "+colored("("+identifier+")", "white"))
sys.exit()
# Function which debugs verbose logging (if enabled)
def verbose_debug(logMessage):
if not VERBOSE_DEBUGGING:
return
print(logMessage)
# Helper to exit execution
def exit():
sys.exit()
# Determine downloads directory
downloadsPath = ("%s%sdl" % (os.getcwd(), os.sep))
# Default EMA DPA base URL
emaBaseUrl = 'https://dap.ema.europa.eu/analytics/saw.dll?Go'
# Prepare thread storage
threadStorage = threading.local()
# Pre-perform validation
if MAX_PRODUCT_COUNT is not None and not isinstance(MAX_PRODUCT_COUNT, int):
fatal_shutdown("MAX_PRODUCT_COUNT must be one of; numeric, None", "V01")
if not isinstance(MAX_THREADS, int) or MAX_THREADS < 1:
fatal_shutdown("MAX_THREADS must be numeric, at least 1", "V02")
if not os.path.exists(downloadsPath):
os.mkdir(downloadsPath)
if not os.path.exists(downloadsPath):
fatal_shutdown("Downloads directory does not exist (and could not be created)", "V03")
# Ensure working directories
basepath = os.path.dirname(os.path.abspath(sys.argv[0]))+os.sep
workdirs = ['temp'+os.sep, 'output'+os.sep, 'downloads'+os.sep]
for workdir in workdirs:
fullpath = basepath+workdir
if not os.path.exists(fullpath):
os.mkdir(fullpath)
if not os.access(fullpath, os.W_OK):
fatal_shutdown(("Working directory %s is not writable" % fullpath), "P01")
# Define important directories
CACHEDIR = basepath+workdirs[0]
OUTPUTDIR = basepath+workdirs[1]
# Function which creates a directory
def create_directory(directory):
try:
os.mkdir(directory)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
pass
# Function which initiates the thread executor
def initiate_thread_executor(resources):
# Determine index
threadStorage.threadIndex = resources.get(False)
# Open initial session and authenticate
threadStorage.localSession = requests.Session()
# Prepare params
params = {
'Path': '/shared/PHV DAP/DAP/Run Line Listing Report',
'Format': 'txt',
'Extension': '.csv',
'Action': 'Extract',
'P0': 0
}
# Initial request
ema_get_remote(threadStorage.localSession, params)
# Function which fetches an URL and caches it by a hash
def fetch_remote_url(url):
hash = hashlib.md5(url.encode()).hexdigest()
cacheFile = CACHEDIR+hash+".html"
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if os.path.exists(cacheFile):
verbose_debug("Fetching local from %s" % url)
contents = Path(cacheFile).read_text()
return contents.encode('utf-8')
else:
verbose_debug("Fetching remote from %s" % url)
try:
with urllib.request.urlopen(url, context=ctx) as response:
content = response.read()
if not content:
fatal_shutdown("Unable to fetch remote data from '%s'" % url, "R01")
except:
verbose_debug("Remote data is empty '%s'" % url)
content = "".encode('utf-8')
f = open(cacheFile, "wb")
f.write(content)
f.close()
return fetch_remote_url(url)
# Function which creates BI filtered URL's
def create_filtered_url(combination):
filters = combination['filters']
params = {}
params['Path'] = '/shared/PHV DAP/DAP/Run Line Listing Report'
params['Format'] = 'txt'
params['Extension'] = '.csv'
params['Action'] = 'Extract'
paramCounter = 0
if len(filters) > 0:
keyName = ('P%d' % paramCounter)
params[keyName] = len(filters)
paramCounter = paramCounter + 1
for filter in filters:
keyName = ('P%d' % paramCounter)
params[keyName] = filter[0]
paramCounter = paramCounter + 1
keyName = ('P%d' % paramCounter)
familyName = ('"Line Listing Objects"."%s"' % filter[1])
params[keyName] = familyName
paramCounter = paramCounter + 1
keyName = ('P%d' % paramCounter)
params[keyName] = (("1 %s" % filter[2]))
paramCounter = paramCounter + 1
baseUrl = emaBaseUrl
url = baseUrl
for property in params:
value = params[property]
params[property] = value
url += ('&%s=%s' % (property, params[property]))
return {
'baseUrl': baseUrl,
'params': params,
'url': url
}
# Function which calls remote with the right session and headers
def ema_get_remote(localSession, localParams):
# Prepare statics
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
errorPattern = r'<div class="ErrorMessage">(.*?)</div>'
# Perform request
response = localSession.get(emaBaseUrl, params=localParams, headers=headers, timeout=(connect_timeout, download_timeout))
# Non 200 code?
if response.status_code != 200:
print(localParams)
fatal_shutdown(f"Non remote 200 HTTP code, got {response.status_code}", "EGR01")
# Explicitly specify the encoding if needed
responseText = response.content.decode('utf-16', errors='replace')
# Extract remote error?
match = re.search(errorPattern, responseText, re.DOTALL)
if match:
print(localParams)
fatal_shutdown(("Remote URL got error-response: %s" % match.group(1)), "EGR02")
# Here you go, sir.
return responseText
# Function which process compiled (downloadable) items
def process_compiled_item(item):
# Prepare target
hash = hashlib.md5(item['DownloadUrl'].encode()).hexdigest()
#targetDirectory = "%s%s%s" % (downloadsPath, os.sep, hash)
targetDirectory = "%s%s" % (downloadsPath, os.sep)
targetFilename = "%s%s%s" % (targetDirectory, os.sep, item['OutputFile'])
# Apply cache?
if USE_DOWNLOADS_CACHE and os.path.exists(targetFilename):
if not VERBOSE_DEBUGGING:
bar()
return
# Create target directory
create_directory(targetDirectory)
# Debug
verbose_debug("\nProcessing download")
verbose_debug("-------------------")
verbose_debug("-> HLC: "+item['HighLevelCode'])
verbose_debug("-> URL: "+item['DownloadUrl'])
verbose_debug("-> Into: "+targetDirectory)
verbose_debug("-> File: "+targetFilename)
# Perform the request
responseText = ema_get_remote(threadStorage.localSession, item['Params'])
# Write response body to file
with open(targetFilename, 'w', encoding='utf-8') as file:
file.write(responseText)
verbose_debug("-> READY")
if not VERBOSE_DEBUGGING:
bar()
# Function which generates a matrix
def generate_matrix(variables):
options = [[]]
for name, value in variables.items():
newOptions = []
for option in options:
newOptions.append(option + [f"{name}={value}"])
newOptions.append(option + [f"{name}!={value}"])
options = newOptions
return options
# Fetch product- and substances list
collection = []
productCounter = 0
breakIteration = False
baseUrls = {}
baseUrls['products'] = 'https://www.adrreports.eu/tables/product/'
baseUrls['substances'] = 'https://www.adrreports.eu/tables/substance/'
chars = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0-9']
for type in baseUrls:
baseUrl = baseUrls[type]
baseType = type[:-1].capitalize()
for char in chars:
indexUrl = "%s%s.html" % (baseUrl, char)
content = fetch_remote_url(indexUrl)
pattern = r"<a\s+.*?</a>"
matches = re.findall(pattern, str(content))
for match in matches:
# Extract CA
centrallyAuthorized = "1"
if type == 'substances':
centrallyAuthorized = "0"
if re.search(r"\*", match):
centrallyAuthorized = "1"
# Extract URL
words = match.strip().split('"')
url = words[1]
# Extract code
parsed = urlparse(unquote(url))
words = parsed.query.split('+')
code = words[-1]
# Extract name
parsed = match.split(">")
parsed = parsed[1].split("<")
name = parsed[0]
# Construct filters:
# First parameter is the total count of parameters
# Then, groups of 3 parameters are added
# Parameters above are respectively: operator, var1, var2 or value
combinations = []
# Default definition
defaultFilters = []
defaultFilters.append(['eq', baseType+' High Level Code', code])
defaultFilters.append(['eq', 'Seriousness', 'Serious'])
# Prepare outputfile
filterString = ""
# For Covid related HLC's we chop up the result because of a 250K line listing limit)
if type == 'substances' and code in ['40983312', '40995439', '42325700', '21755']:
# Generate matrix
matrix = generate_matrix(variables)
# Construct combo
comboCounter = 0
for year in years:
for square in matrix:
filterString = ""
filters = defaultFilters.copy()
filters.append(['eq', 'Gateway Year', year])
for point in square:
if re.search('!=', point):
variable, value = point.split('!=')
operator = 'neq'
filterString += "_Not"
else:
variable, value = point.split('=')
operator = 'eq'
filters.append([operator, variable, value])
filterString += ("_%s" % value)
comboCounter = comboCounter + 1
outputFile = ("%s_%s_%s%s.csv" % (type, code, year, filterString))
targetfile = downloadsPath + '/' + outputFile
if not os.path.exists(targetfile):
#print(f"{outputFile} exists")
#print(f"Working on {outputFile}")
combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
else:
# Default ID match filter
filters = defaultFilters.copy()
outputFile = ("%s_%s.csv" % (type, code))
targetfile = downloadsPath + '/' + outputFile
if not os.path.exists(targetfile):
#print(f"Working on {outputFile}")
combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
# Raise product-counter
productCounter += 1
# Run every combination
for combination in combinations:
# Create URL
result = create_filtered_url(combination)
# Replace placeholders
downloadUrl = result['url']
downloadUrl = downloadUrl.replace('$hlc', code)
downloadUrl = downloadUrl.replace('$type', type)
# Collection storage
item = {}
item['HighLevelCode'] = code
item['ProductName'] = name
item['CentrallyAuthorized'] = centrallyAuthorized
item['Type'] = type
item['BaseUrl'] = result['baseUrl']
item['IndexUrl'] = indexUrl
item['DownloadUrl'] = downloadUrl
item['OutputFile'] = combination['outputFile']
item['Filters'] = combination['filters']
item['Params'] = result['params']
collection.append(item)
#print(f"HLC: {item['HighLevelCode']}")
#hier wordt dus de lijst met HLCs opgetuigd
# Max product counter
if MAX_PRODUCT_COUNT and productCounter == MAX_PRODUCT_COUNT:
verbose_debug(("--> Warning: MAX_PRODUCT_COUNT reached = %d" % MAX_PRODUCT_COUNT))
breakIteration = True
break
if breakIteration:
break
if breakIteration:
break
# Preformat output
output = {}
for item in collection:
type = item['Type']
if not type in output.keys():
output[type] = []
output[type].append([str(item['HighLevelCode']), str(item['ProductName']), str(item['CentrallyAuthorized'])])
# Generate indexed output files
for type in output:
resultFile = "%s%s.txt" % (OUTPUTDIR, type)
verbose_debug("Generating output file %s" % resultFile)
if os.path.exists(resultFile):
os.remove(resultFile)
lines = output[type]
content = ""
for line in lines:
content += "\t".join(line)+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Debug
resultFile = "%surls.txt" % (OUTPUTDIR)
if os.path.exists(resultFile):
os.remove(resultFile)
verbose_debug("Generating URL output file %s" % resultFile)
content = ""
for item in collection:
content += "\n"+item['OutputFile']+"\n"+item['DownloadUrl']+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Run executor
if VERBOSE_DEBUGGING:
resources = queue.Queue(MAX_THREADS)
for threadIndex in range(MAX_THREADS):
resources.put(threadIndex, False)
with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
pool.map(process_compiled_item, collection)
else:
collectionLength = len(collection)
with alive_bar(collectionLength) as bar:
resources = queue.Queue(MAX_THREADS)
for threadIndex in range(MAX_THREADS):
resources.put(threadIndex, False)
with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
pool.map(process_compiled_item, collection)
# Fin
if MAX_PRODUCT_COUNT != None:
print(f"Requested {MAX_PRODUCT_COUNT} files")
runtime = round(time.time() - executionStart)
print(f"Execution finished in ~ {runtime} seconds")