Linux boca.hozzt.com 4.18.0-553.8.1.lve.el8.x86_64 #1 SMP Thu Jul 4 16:24:39 UTC 2024 x86_64
LiteSpeed
: 159.253.39.62 | : 3.21.246.168
Cant Read [ /etc/named.conf ]
7.4.33
renovkoron
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
opt /
cloudlinux /
venv /
lib /
python3.11 /
site-packages /
lvestats /
lib /
bursting /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
197
B
-rw-r--r--
history.py
3.74
KB
-rw-r--r--
info.py
6.83
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : history.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import datetime import sqlalchemy as sa from lvestats.orm import bursting_events_table class TableDoesNotExistError(Exception): def __init__(self, table_name): self.message = f'Table "{table_name}" does not exist in the database' super().__init__(self.message) class HistoryShowBursting: def __init__(self, dbengine: sa.engine.base.Engine, period_from: datetime.datetime, period_to: datetime.datetime, uid: int | None = None, server_id: str = 'localhost') -> None: self.dbengine = dbengine self.period_from = period_from self.period_to = period_to self.uid = uid self.server_id = server_id def get(self) -> list[sa.engine.RowProxy]: """ Get history from the 'bursting_events' table. Retrieving records within the required time frame, along with one record preceding this time frame to detect the bursting status at the start of the time frame. """ # Since bursting limits functionality isn't enabled by default, # we need to check if the table exists first inspector = sa.inspect(self.dbengine) if bursting_events_table.name not in inspector.get_table_names(): raise TableDoesNotExistError(bursting_events_table.name) ts_from = self.period_from.timestamp() ts_to = self.period_to.timestamp() # Get the rows with timestamp between ts_from and ts_to stmt1 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp >= ts_from, bursting_events_table.c.timestamp <= ts_to, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ) # Subquery to get the maximum timestamp for each lve_id where timestamp < ts_from subquery = sa.select([ bursting_events_table.c.lve_id, sa.func.max(bursting_events_table.c.timestamp).label('max_timestamp'), ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp < ts_from, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ).group_by( bursting_events_table.c.lve_id, ).alias('subquery') # Get the row with the maximum timestamp less than ts_from stmt2 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).select_from( bursting_events_table.join( subquery, sa.and_( bursting_events_table.c.lve_id == subquery.c.lve_id, bursting_events_table.c.timestamp == subquery.c.max_timestamp, ) ) ) stmt = sa.union( stmt1, stmt2, ) stmt = stmt.order_by( stmt.c.lve_id, stmt.c.timestamp, ) with self.dbengine.connect() as connection: result = connection.execute(stmt).fetchall() return result
Close