Linux boca.hozzt.com 4.18.0-553.8.1.lve.el8.x86_64 #1 SMP Thu Jul 4 16:24:39 UTC 2024 x86_64
LiteSpeed
: 159.253.39.62 | : 18.227.79.253
Cant Read [ /etc/named.conf ]
7.4.33
renovkoron
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
opt /
cloudlinux /
venv /
lib /
python3.11 /
site-packages /
lvestats /
plugins /
generic /
burster /
storage /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
384
B
-rw-r--r--
base.py
1.27
KB
-rw-r--r--
cleanup.py
2.05
KB
-rw-r--r--
load.py
4.11
KB
-rw-r--r--
save.py
2.67
KB
-rw-r--r--
utils.py
1.7
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : load.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from typing import Callable, TypedDict import sqlalchemy as sa from lvestats.orm import BurstingEventType, bursting_events_table from ..common import LveId, Timestamp from ..history import LveHistory, IntervalType, get_interval_type_after from ..utils import bootstrap_gen from .._logs import logger class _OutBurstingEventRow(TypedDict): lve_id: LveId timestamp: Timestamp event_type: BurstingEventType def load_bursting_enabled_intervals_from_db( engine: sa.engine.Engine, cutoff: Timestamp, server_id: str, ) -> dict[LveId, LveHistory]: lve_to_asm = dict() with engine.begin() as conn: first_events_stmt = ( sa.select([ bursting_events_table.c.lve_id, sa.func.max(bursting_events_table.c.timestamp).label("timestamp"), ]) .where(sa.and_( bursting_events_table.c.timestamp <= cutoff, bursting_events_table.c.server_id == server_id, )) .group_by( bursting_events_table.c.lve_id ) .alias('first_events') ) all_events_stmt = ( sa.select([ bursting_events_table.c.server_id, bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]) .select_from( bursting_events_table.outerjoin( first_events_stmt, bursting_events_table.c.lve_id == first_events_stmt.c.lve_id, ) ) .where(sa.and_( bursting_events_table.c.server_id == server_id, bursting_events_table.c.timestamp >= sa.func.coalesce( first_events_stmt.c.timestamp, cutoff, ) )) .order_by( bursting_events_table.c.timestamp.asc(), ) ) result = conn.execute(all_events_stmt) assert isinstance(result, sa.engine.ResultProxy) rows = result.fetchall() assert rows is not None for row in rows: row: _OutBurstingEventRow lve_id = row['lve_id'] try: asm = lve_to_asm[lve_id] except KeyError: asm = lve_to_asm[lve_id] = _LveHistoryAssembler(lve_id, row) asm.process_row(row) return {lve_id: asm.create() for lve_id, asm in lve_to_asm.items()} class _LveHistoryAssembler: def __init__(self, lve_id: LveId, first_event: _OutBurstingEventRow) -> None: self._lve_id = lve_id self._first_interval_type = ( IntervalType.OVERUSING if first_event['event_type'] == BurstingEventType.STARTED else IntervalType.NORMAL ) @bootstrap_gen def assembler_gen(): prev_event = first_event while True: event: _OutBurstingEventRow = yield if event['event_type'] == prev_event['event_type']: logger.debug( 'Skipping consecutive "%s" events for lve_id=%s', event['event_type'], lve_id, ) continue self._timestamps.append(event['timestamp']) prev_event = event self.process_row: Callable[[_OutBurstingEventRow], None] = assembler_gen().send self._timestamps: list[Timestamp] = [first_event['timestamp']] def create(self) -> LveHistory: timestamps = self._timestamps last_interval_type = get_interval_type_after(self._first_interval_type, len(timestamps)) if last_interval_type == IntervalType.OVERUSING: timestamps = timestamps[:-1] logger.warning("Last interval for lve_id=%s is not closed!", self._lve_id) if len(timestamps) == 0: return LveHistory() return LveHistory(self._first_interval_type, tuple(timestamps))
Close