Linux boca.hozzt.com 4.18.0-553.8.1.lve.el8.x86_64 #1 SMP Thu Jul 4 16:24:39 UTC 2024 x86_64
LiteSpeed
: 159.253.39.62 | : 3.147.86.6
Cant Read [ /etc/named.conf ]
7.4.33
renovkoron
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
opt /
cloudlinux /
venv /
lib /
python3.11 /
site-packages /
ssa /
modules /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
0
B
-rw-r--r--
autotracer.py
20.75
KB
-rw-r--r--
common.py
1.59
KB
-rw-r--r--
decision_maker.py
10.38
KB
-rw-r--r--
processor.py
7.77
KB
-rw-r--r--
stat_sender.py
7.9
KB
-rw-r--r--
storage.py
4.84
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : storage.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ Represents storage where ssa data is collected, stored and extracted """ import itertools from dataclasses import dataclass from typing import List, Iterator, Tuple, Dict import sqlalchemy from sqlalchemy import func, cast, case, literal_column, distinct, text from ssa.db import session_scope, RequestResult @dataclass class DomainData: domain_name: str domain_total_reqs: List[int] is_a_wordpress_domain: bool urls_number: int def iter_domains_data(engine) -> Iterator[DomainData]: """ Iterates data from database domain-by-domain. """ with session_scope(engine) as db: results_by_hour = db.query( RequestResult.domain, func.strftime('%H', RequestResult.created_at), func.Count(RequestResult.id), func.max(RequestResult.wordpress), func.count(distinct(RequestResult.path)) ).group_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ) results_by_hour_grouped = itertools.groupby(results_by_hour, key=lambda item: item[0]) for domain_name, group in results_by_hour_grouped: domain_results_by_hour = tuple(group) urls_number = 0 # at some hours there may be no requests # so we must normalize data to match 24h data format requests_number_by_hour = [0] * 24 for _, hour, requests_num, is_wordpress, urls in domain_results_by_hour: requests_number_by_hour[int(hour)] = requests_num urls_number = max(urls_number, urls) yield DomainData( domain_name=domain_name, domain_total_reqs=requests_number_by_hour, is_a_wordpress_domain=is_wordpress, urls_number=urls_number ) def iter_urls_data(engine, domain_name, all_paths): """ Iterates urls data from database url-by-url. """ with session_scope(engine) as db: all_paths_escaped = [path.replace(":", "\\:") for path in all_paths] urls_data = db.query( RequestResult.path, func.strftime('%H', RequestResult.created_at), func.Sum(cast( RequestResult.hitting_limits, sqlalchemy.Integer )).label('url_throttled_reqs'), func.Count( RequestResult.id ).label('url_total_reqs'), func.Sum(cast( RequestResult.is_slow_request, sqlalchemy.Integer) ).label('url_slow_reqs') ).filter( RequestResult.domain == domain_name ).filter( text(RequestResult.path.in_(all_paths_escaped).expression.compile(compile_kwargs={"literal_binds": True}).string) ).group_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ) previous_path = None url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 for path, hour, url_throttled_req, url_total_req, url_slow_req in urls_data: if previous_path and previous_path != path: yield previous_path, dict( path=previous_path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 url_throttled_reqs[int(hour)] = url_throttled_req url_total_reqs[int(hour)] = url_total_req url_slow_reqs[int(hour)] = url_slow_req previous_path = path yield path, dict( path=path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) def get_url_durations(engine, domain_name) -> Dict[str, Tuple[int]]: """ Get information about durations of requests url-by-url. """ with session_scope(engine) as db: urls_data = db.query( RequestResult.path, RequestResult.duration ).filter( RequestResult.domain == domain_name ).order_by( RequestResult.path ) durations_by_path = itertools.groupby( list(urls_data), lambda item: item[0]) for key, group in durations_by_path: yield key, [duration for _, duration in group]
Close