-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
216 lines (136 loc) · 5.85 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# /granules_index
# collection_url=URL
# collection_name=NAME
import sys
assert sys.version_info >= (3,5)
import time
import threading
import os
import datetime
import time
from flask import Flask, request
from flask_restful import Resource, Api
import uuid
from lib.index import Index
from lib.pycsw_helper import PycswHelper
from lib.scraper_driver import ScraperDriver
from lib.scraper.simple import SimpleScraper
from lib.scraper.collection_import import CollectionImportScraper
from lib.scraper.collection_refresh import CollectionRefreshScraper
from lib.util.dtutil import timestamp_range_generator, timestamp_parser
from lib.util.path import mkdir_p
from lib.siphon.catalog import TDSCatalog, Dataset, CatalogRef
# from logging import info, debug,error
import logging
# class PurgeExpiredResource(Resource):
# def post(self):
# expired_date = timestamp_range_generator(14).start
# # expired_date = datetime.datetime.today() - datetime.timedelta(minutes=1)
# PycswHelper().delete_records({'where': 'TO_TIMESTAMP(records.insert_date, \'YYYY-MM-DDTHH:MI:SS\') < TIMESTAMP \'' + str(expired_date) + '\'', 'values': []})
# return("purged expired (older than 14 days) records")
class IndexResource(Resource):
def post(self):
print("1 TRAP SIGURG on " + str(os.getpid()))
debug_thread = threading.Thread(target = debug_dump_watch)
debug_thread.start()
# get params
# collection_name = request.form.get('collection_name')
catalog_url = request.form.get('catalog_url')
if 'thredds.ucar.edu' in catalog_url:
Dataset.default_authority = 'edu.ucar.unidata'
if 'rda.ucar.edu' in catalog_url:
Dataset.default_authority = 'edu.ucar.rda'
job_id = str(uuid.uuid4())[:8] # short id for uniqueness
index = Index(INDEX_DB_URL)
output_dir = RECORDS_DIR + '/result.' + job_id
scraper = CollectionImportScraper(output_dir, index)
scraper.add_catalog(catalog_url=catalog_url)
driver = ScraperDriver(scraper, 40, 1)
driver.harvest()
scraper.sync_index()
# complete
print("indexing harvest complete", flush=True)
print("importing %s" % output_dir, flush=True)
print("")
PycswHelper().load_records(output_dir)
return output_dir
def get(self):
index = Index(INDEX_DB_URL)
collection_name = request.args.get('collection_name')
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
start_time = timestamp_parser.parse_datetime(start_time, default=timestamp_parser.min_datetime)
end_time = timestamp_parser.parse_datetime(end_time, default=timestamp_parser.max_datetime)
# refresh if needed (might be time consuming)
scraper = CollectionRefreshScraper(index)
scraper.set_refresh_scope(collection_name, start_time, end_time)
driver = ScraperDriver(scraper, 4, 0.5)
driver.harvest()
scraper.sync_index()
# retrieve from index DB
result = index.get_granules(collection_name, start_time, end_time)
print("read index for %s" % collection_name, flush=True)
return result
class HarvestResource(Resource):
def post(self):
job_id = str(uuid.uuid4())[:8] # short id for uniqueness
catalog_url = request.form['catalog_url']
dataset_name = request.form.get('dataset_name', None)
output_dir = RECORDS_DIR + '/result.' + job_id
scraper = SimpleScraper(output_dir)
scraper.add_catalog(catalog_url=catalog_url, dataset_name=dataset_name)
# begin harvest
harvester = ScraperDriver(scraper, 20, 1)
harvester.harvest()
# complete
print("harvest complete", flush=True)
print("importing %s" % output_dir, flush=True)
print("")
# load records
PycswHelper().load_records(output_dir)
print("records loaded", flush=True)
return("harvested and loaded " + output_dir)
###############################
if __name__ == '__main__':
RECORDS_DIR = '../records'
else:
RECORDS_DIR = '/records'
# RECORDS_DIR = '../records'
# RECORDS_DIR = '../records'
sqlite_index_url = "sqlite:///" + RECORDS_DIR + '/index.sqlite.db'
INDEX_DB_URL = os.getenv('INDEX_DB_URL', sqlite_index_url)
# granules_index = GranulesIndex(RECORDS_DIR)
application = Flask(__name__)
api = Api(application)
api.add_resource(IndexResource, '/index')
api.add_resource(HarvestResource, '/harvest')
# harvest.delete_lock('granules')
# harvest.delete_lock('collections')
## DEBUG HANDLER
import sys, traceback, signal, threading, datetime, time
mutex = threading.Lock()
def debug_dump_watch():
while True:
print("Watching for /var/tmp/td", flush=True)
time.sleep(1)
try:
if os.path.isfile('/var/tmp/td'):
os.remove('/var/tmp/td')
with mutex:
fname = '/var/tmp/trace-dump-' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + '--' + str(os.getpid())
print("DEBUG SIGNAL " + fname)
for thread_id, frame in sys._current_frames().items():
with open(fname, 'a+') as file:
file.write('Stack for thread {}\n'.format(thread_id))
traceback.print_stack(frame, file=file)
file.write('\n\n')
except:
e = sys.exc_info()[0]
print(e)
if __name__ == '__main__':
print("starting application on port 8002")
application.run(port='8002', debug=True)
else:
gunicorn_logger = logging.getLogger('gunicorn.error')
application.logger.handlers = gunicorn_logger.handlers
application.logger.setLevel(gunicorn_logger.level)