importtimeimportloggingimportnumpyasnpfromsqlalchemyimportcreate_enginefromsqlalchemy.excimportProgrammingError,OperationalErrorfromsqlalchemy.sql.expressionimporttextfromsqlalchemy_utilsimportdatabase_exists,create_databasefrom.database_modelimportModelLOG=logging.getLogger(__name__)try:frominfluxdb_clientimportInfluxDBClientfromelasticsearchimportElasticsearch,BadRequestErrorfrominfluxdb_client.client.exceptionsimportInfluxDBErrorexceptModuleNotFoundErrorase:LOG.debug("Could not load ElasticSearch/Influx client: %s",e)# Default chunk_time_interval. Might become configurable at some point iff we# decide to keep TimescaleDB.TIMESCALEDB_DEFAULT_CHUNK_SIZE_INTERVAL=512def_create_timescaledb_extension(engine):"""Create the timescaledb extension. :param engine: The database engine. """withengine.begin()asconn:conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")timescale_tables={"world_states","muscle_actions",}withengine.begin()asconn:fortblintimescale_tables:cmd=(f"SELECT * FROM create_hypertable("f"'{tbl}', "# Table namef"'id', "# Primary partitioning columnf"chunk_time_interval => "f"{TIMESCALEDB_DEFAULT_CHUNK_SIZE_INTERVAL})")res=conn.execute(text(cmd))LOG.debug('Result of executing "%s" during setup: %s',cmd,res.fetchall(),)res.close()LOG.info("Created TimescaleDB hypertables: %s, set 'chunk_time_interval' ""parameter to %d. HINT: The chunk_time_interval should be chosen such ""that all active chunks of all your hypertables fit in 25\% of your ""RAM. You can change the value with TimescaleDB's ""set_chunk_time_interval() function.",", ".join(timescale_tables),TIMESCALEDB_DEFAULT_CHUNK_SIZE_INTERVAL,)
[docs]defsetup_database(uri):"""Creates the database from the current model in one go. Parameters ---------- uri : str The complete database connection URI. """engine=create_engine(uri)whilenotdatabase_exists(uri):i=1ifi>3:# Hardcoded max tries. No real reason to configure this.LOG.critical("Could not create the database. See errors above for more ""details. Giving up now.")raiseRuntimeError("Could not create database")try:create_database(uri)exceptOperationalErrorase:try:importpsycopg2.errorsifisinstance(e.orig,psycopg2.errors.ObjectInUse):LOG.warning("Could not create database because the template was ""in use. Retrying in %d seconds.",i,)time.sleep(i)else:breakexceptImportError:passexceptProgrammingErrorase:LOG.error("There was an error creating the database. I will continue ""and hope for the best. The error was: %s",e,)i+=1withengine.begin()asconn:try:Model.metadata.create_all(engine)exceptProgrammingErrorase:LOG.error("Could not create database: %s"%e)raiseetry:frommidas.tools.palaestrai.database_viewimport(# type: ignorecreate_midas_views,)ifengine.url.drivername=="psycopg2":# type: ignorecreate_midas_views(conn)exceptModuleNotFoundError:pass# Ok, don't create specific views if the tools are not pres.try:_create_timescaledb_extension(engine)exceptOperationalErrorase:LOG.warning("Could not create extension timescaledb and create hypertables: ""%s. ""Your database setup might lead to noticeable slowdowns with ""larger experiment runs. Please upgrade to PostgreSQL with ""TimescaleDB for the best performance."%e)
# create a new influx and elasticsearch database using setup_influxdb(time_series_uri) and# setup_elasticsearch(storage_uri) and write numpy style code documentation
[docs]defsetup_database_v2(store_uri,time_series_uri):"""Creates the database from the current model in one go. :param store_uri: The complete database connection URI. :param time_series_uri: The complete database connection URI. """# try setup the time series databasesetup_influxdb(time_series_uri)# setup the storage databasesetup_elasticsearch(store_uri)
# TODO: Implement SSL verification
[docs]defsetup_influxdb(time_series_uri):"""Creates the database from the current model in one go. :param time_series_uri: The complete database connection URI. """# create the databasedb_type,time_series_uri=time_series_uri.split("+")org,token=time_series_uri.split("@")[0].split(":")connections=time_series_uri.split("@")[1]try:client=InfluxDBClient(url=connections,token=token,org=org)exceptExceptionase:LOG.error("Could not connect to the influxdb. Please check if the ""'time_series_store_uri' is set correctly""Error %s was risen.",e,)raiseRuntimeError("Could not connect to database")bucket_api=client.buckets_api()try:bucket_api.create_bucket(bucket_name="palaestrai")exceptInfluxDBErrorase:if"already exists"instr(e):LOG.info("Bucket already exists. Continuing...")else:LOG.error("Could not create bucket: %s"%e)raiseeclient.close()
# create a new elasticsearch client which ignores tls verification# and create a new index with the name "palaestrai" if it does not exist# if the index already exists, the client will be closed and the function will return
[docs]defsetup_elasticsearch(store_uri):"""Creates the database from the current model in one go. :param store_uri: The complete database connection URI. """store_uri=store_uri.replace("elasticsearch+","")try:es=Elasticsearch([store_uri],verify_certs=False,timeout=60,max_retries=10,retry_on_timeout=True,)exceptExceptionase:LOG.critical("Could not connect to the elasticsearch. Please check if the ""'store_uri' is set correctly""Error %s was risen.",e,)raiseRuntimeError("Could not connect to database")try:es.indices.create(index="palaestrai",ignore=400)exceptExceptionase:if"resource_already_exists_exception"instr(e):LOG.info("Index already exists. Continuing...")else:raisee