Это старая версия документа!
Параметры с префиксом тире необязательные, без префикса обязательные они видимо и являются обязательными
# file.py --a='string' import argparse parser= argparse.ArgumentParser(description='fvfv') parser.add_argument('--a', default='def-a', type=str, help='var - a') parser.add_argument('--b', default='def-b', type=str, help='var - b') args= parser.parse_args() print(args.a) print(args.b) print(args) ###################### import argparse argParser= argparse.ArgumentParser() argParser.add_argument('-f', type= str, help= 'Файл для создания хеш-строки') args= argParser.parse_args() if not args.f: print("Укажите файл для создания хеш-строки") exit()
import requests import json import os str1= """{ "jsonrpc": "2.0", "method": "template.get", "params": { "selectItems": "extend", "filter": { "host": [ "ServerApp" ] } }, "auth": "///////40a3593////", "id": 1 }""" query= str1.replace(' ', '').replace('\n', '') result= requests.post('http://10.10.10.30/zabbix/api_jsonrpc.php', json=json.loads(query)) items= result.json() outfile= open('ServApp-items.txt', 'w') outfile.write(str(items['result'][0]['items'])) print(str(items['result'][0]['items']))
import sqlite3 con= sqlite3.connect('zbxtg-local.db'); # con.text_factory = bytes # Чтобы данные сохранялись как есть, без изменения кодировки и т.д. # cur= con.cursor(); cur.execute('create table if not exists `IdMessages` (`Id` INTEGER PRIMARY KEY AUTOINCREMENT, `MessageId` VARCHAR(45), `AlertId` VARCHAR(45));'); con.commit(); #cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values ("1", "11"); '); #cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values ("2", "22"); '); #cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values ("3", "33"); '); #cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values ("4", "44"); '); #cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values ("5", "55"); '); #con.commit(); cur.execute('select `MessageId`, `AlertId` from `IdMessages`; '); #con.commit; cur.fetchall(); print("Count rows = ", cur.rowcount); print(cur.fetchall()) #for row in cur.fetchall(): # print(row)
# # Доступ к данным # con.row_factory= sqlite3.Row; r= cur.fetchall();
print(r[0][0]+ ' -==- '+ r[0][1]);
#for member in r: # print(member[0]+ ' -==- '+ member[1])
import requests import sqlite3 import json import sys class LocalDB: def __init__(self): self.con= None; self.cur= None; def Connect(self): self.con= sqlite3.connect('zbxtg-local.db'); self.cur= self.con.cursor(); # Создаем таблицу если нету self.cur.execute('create table if not exists `IdMessages` (`Id` INTEGER PRIMARY KEY AUTOINCREMENT, `MessageId` VARCHAR(45), `AlertId` VARCHAR(45));'); self.con.commit(); def Dissconnect(self): self.con.close(); def SaveMessageId(self, MessageId, AlertId): self.cur.execute('insert into `IdMessages` (`MessageId`, `AlertId`) values("'+str(MessageId)+'", "'+AlertId+'"); '); self.con.commit(); print("-= write is done =-"); def GetMessageId(self, AlertId): self.cur.execute('select `MessageId` from `IdMessages` where `AlertId`= "'+AlertId+'"; '); return self.cur.fetchall(); def main(): if(len(sys.argv) < 3): print("You must input number alert"); quit(); elif(sys.argv[1]== '-edit' and len(sys.argv) < 4): print("For edit message, input text"); quit(); if(sys.argv[1]== '-add'): textMessage= 'Alert #'+ sys.argv[2]+ '\nBody message \nMessage boddy'; mydata = {'chat_id': '-751016983', 'text': textMessage}; answer = requests.post('https://api.telegram.org/bot1074186011:AAHu8aVvPXU0GinMYkra6HnR5l53sqAZEcM/sendMessage', params=mydata); MessageId= answer.json()["result"]["message_id"]; if(MessageId): # Если получили в ответ ID сообщения, записываем его в БД db= LocalDB(); db.Connect(); db.SaveMessageId(MessageId, sys.argv[2]); db.Dissconnect(); print('Message sended! - '+ str(MessageId)); elif(sys.argv[1]== '-edit'): textMessage= 'Старое содержание полученное из алерта\n--------------------------------\n--==Новое содержание==--\nАлерт решен!!!'; db= LocalDB(); db.Connect(); MessId= db.GetMessageId(sys.argv[2]); db.Dissconnect(); for row in MessId: mydata = {'chat_id': '-751016983', 'message_id': row, 'text': textMessage}; answer = requests.post('https://api.telegram.org/bot1074186011:AAHu8aVvPXU0GinMYkra6HnR5l53sqAZEcM/editMessageText', params=mydata); print('Message is edited'); print(answer.json()); print('***************************************'); #print(answer.json()); if __name__ == "__main__": main()
"""./py.py > file.csv""" import re import os def main(): infile= open("auth.log", 'r') data= infile.readlines() # checkLogin(data) checkHosts(data) def checkHosts(inData): i= 1 lstry= [] lstry2= [] for line in inData: if line.find('Failed')== -1: continue line= line[line.find(' for '):].replace(' for ', '') if line.split()[0]== 'invalid': line= line.replace('invalid user ', '') lstry.append(line.replace('\n','')) lstry.sort() for line in lstry: lsline= line.split() trying= {'number': i, 'user': lsline[0], 'host': lsline[2], 'protocol': lsline[5]} lstry2.append(trying) i += 1 for item in lstry2: print(item['user']+ ','+ item['host']) def checkLogin(inData): isFailed= re.compile(r"Failed") logCorr= '\n' logIncorr= '\n' for i in inData: if isFailed.search(i): #i= i.replace('\n','') isLogin= re.compile(r"for invalid") if isLogin.search(i): logIncorr += i else: logCorr += i print("Login correct:", logCorr) #print(type(logCorr)) print("\n\nLogin Incorrect:", logIncorr) if __name__== '__main__': main()
В обоих случаях, в аккаунте нужно дополнительно разрешить аутентификацию такого скрипта (приложения)
В гугле- https://myaccount.google.com/lesssecureapps
В яндексе «портальный пароль» нужно включить
#!/usr/bin/python3 import smtplib srv= 'smtp.yandex.ru' / 'smtp.gmail.com' port= 587 login= '*******@yandex.ru' passwd= '********' mailto= '******@gmail.com' message = 'Subject: {}\n\n{}'.format('Warning Notification', 'This is warning notification and sending from my-client') s= smtplib.SMTP(srv, port) s.starttls() s.login(login, passwd) s.sendmail(login, mailto, message) s.quit()
import smtplib from os.path import basename from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import COMMASPACE, formatdate def send_mail(send_from, send_to, subject, text, files=None, server="127.0.0.1"): assert isinstance(send_to, list) msg = MIMEMultipart() msg['From'] = send_from msg['To'] = COMMASPACE.join(send_to) msg['Date'] = formatdate(localtime=True) msg['Subject'] = subject msg.attach(MIMEText(text)) for f in files or []: with open(f, "rb") as fil: part = MIMEApplication( fil.read(), Name=basename(f) ) # After the file is closed part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f) msg.attach(part) smtp = smtplib.SMTP(server) smtp.sendmail(send_from, send_to, msg.as_string()) smtp.close()
Весь файл не будет загружаться в память
import os file= open('bigdict.txt', 'r') outfile= open('outfile.txt', 'w') for line in file: outfile.write(line)
Сервер может обрабатывать веб запросы, передавать их приложениям
Переменная environ содержит служебную информацию о клиенте
В start_response собсна помещаем ответ
#!/usr/bin/python3 import wsgiserver from wsgiref.simple_server import make_server, demo_app def my_app(environ, start_response): status= '200 OK' headers= [('Content-type', 'text/plain')] start_response(status, headers) return ['HelloW'.encode()] with make_server('', 8000, my_app) as httpd: print('Serving HTTP on port 8000..') httpd.serve_forever() httpd.handle_request()
{'SHELL': '/bin/bash', 'PWD': '/home/dn/pywh', 'LOGNAME': 'dn', 'XDG_SESSION_TYPE': 'tty', 'MOTD_SHOWN': 'pam', 'HOME': '/home/dn', 'LANG': 'ru_RU.UTF-8', 'LS_COLORS': 'rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:', 'SSH_CONNECTION': '192.168.0.10 59664 192.168.0.64 22', 'XDG_SESSION_CLASS': 'user', 'TERM': 'xterm', 'USER': 'dn', 'SHLVL': '1', 'XDG_SESSION_ID': '1', 'XDG_RUNTIME_DIR': '/run/user/1000', 'SSH_CLIENT': '192.168.0.10 59664 22', 'PATH': '/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games', 'SSH_TTY': '/dev/pts/0', '_': './whsrv.py', 'OLDPWD': '/home/dn', 'SERVER_NAME': 'deb64', 'GATEWAY_INTERFACE': 'CGI/1.1', 'SERVER_PORT': '8000', 'REMOTE_HOST': '', 'CONTENT_LENGTH': '26', 'SCRIPT_NAME': '', 'SERVER_PROTOCOL': 'HTTP/1.1', 'SERVER_SOFTWARE': 'WSGIServer/0.2', 'REQUEST_METHOD': 'POST', 'PATH_INFO': '/', 'QUERY_STRING': '', 'REMOTE_ADDR': '192.168.0.10', 'CONTENT_TYPE': 'application/json', 'HTTP_HOST': '192.168.0.64:8000', 'HTTP_USER_AGENT': 'curl/7.79.1', 'HTTP_ACCEPT': '*/*', 'wsgi.input': <_io.BufferedReader name=4>, 'wsgi.errors': <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>, 'wsgi.version': (1, 0), 'wsgi.run_once': False, 'wsgi.url_scheme': 'http', 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.file_wrapper': <class 'wsgiref.util.FileWrapper'>}
raw получаем вручную
pip install beautifulsoup4
from bs4 import BeautifulSoup import re def main(): ParseFromRaw() def ParseFromRaw(): infile= open('vk_raw.txt', 'r', encoding= 'utf-8') rawdata= infile.read() # Парсим содержимое как html parsdata= BeautifulSoup(rawdata, 'html.parser') # Извлекаем только то что "BeautifulSoup" определяет как текст textls= parsdata.get_text().split('\n') # К каждому треку относится >= 3 строки инфы, здесь она идет сплошником, отдельными строками # последней строкой каждого трека идет его длительность, разбиваем треки по ней # для результата reslistInlist= [] # тут накапливается инфа по одному треку buffer= [] # регулярка для определения строки с длительностью timeline= re.compile(r'^\d{1,2}:\d{1,2}.*') for line in textls: # есть пустые строки, пропускаем if line == '': continue buffer.append(line.strip()) # Делаем обрезку после строки с длительностью if timeline.search(line): reslistInlist.append(buffer.copy()) buffer.clear() # каждый трек записан в строку, в типе list, список всех треков ListInList reslistInlist.sort() count= 0 outfile= open('vk_res_sort.txt', 'w', encoding= 'utf-8') for line in reslistInlist: # Одна строка один трек, записи одного трека разделяем- `;` + tab outfile.write(';\t'.join(line)+ '\n') count += 1 print("Всего записей - "+ str(count)) if __name__ == "__main__": main()
Скрипт для работы с диском, создание каталога, загрузка файлов
Неплохие примеры обхода дерева каталогов
Неплохая дока
Собсна источник примера
#!/bin/python3 import os import requests from datetime import datetime from progress.bar import Bar URL = 'https://cloud-api.yandex.net/v1/disk/resources' TOKEN = '<Your-Token>' headers = {'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': f'OAuth {TOKEN}'} def create_folder(path): """Создание папки. \n path: Путь к создаваемой папке.""" requests.put(f'{URL}?path={path}', headers=headers) def upload_file(loadfile, savefile, replace=False): """Загрузка файла. savefile: Путь к файлу на Диске loadfile: Путь к загружаемому файлу replace: true or false Замена файла на Диске""" res = requests.get(f'{URL}/upload?path={savefile}&overwrite={replace}', headers=headers, verify=False).json() with open(loadfile, 'rb') as f: try: requests.put(res['href'], files={'file':f}, verify=False) except KeyError: print(res) def backup(savepath, loadpath): """Загрузка папки на Диск. \n savepath: Путь к папке на Диске для сохранения \n loadpath: Путь к загружаемой папке""" # Формируем название папки на диске, название целевой папки (из переданного пути) + дата-время # `loadpath.split('/')[-1]` разбиваем путь на list, и возвращаем последний элемент т.к. папку date_folder = 'currDate'#'{0}_{1}'.format(loadpath.split('/')[-1], datetime.now().strftime("%Y-%m-%d_%H:%M")) create_folder(savepath) # ф-я `os.walk` возвращает массив файлов из дерева каталогов (в параметрах можно указать направление обхода) # возвращает тройной кортеж - "dirpath (тек путь), dirnames (папки), filenames (файлы)" # по дефолту вложенность вся, идет по порядку от указанного пути и далее for address, _, files in os.walk(loadpath): # тут создаем подпапки, из путей, что приходят в цикле обхода # `address.replace(loadpath, "")[1:]` - здесь вычитаем исходный путь из вложенного create_folder('{0}/{1}/{2}'.format(savepath, date_folder, address.replace(loadpath, "")[1:])) bar = Bar('Loading', fill='X', max=len(files)) # Циклом проходим по всем файлам что есть в данном каталоге for file in files: bar.next() # в первой части путь в ФС, во второй путь на диске, исходный снова вырезан upload_file('{0}/{1}'.format(address, file),\ '{0}/{1}{2}/{3}'.format(savepath, date_folder, address.replace(loadpath, ""), file)) bar.finish() if __name__ == '__main__': backup('Backup', r'/home/ddn/bcps') #backup('Backup', os.getcwd())
Читать файл лучше всего по блокам, функцией update() данные помещаются в объект, можно делать это поблочно
Собсна каждый алгоритм имеет свою функцию, с соответствующим названием
digest() - хеш в виде байтов, hexdigest() в виде строки
import hashlib infile= open('my-file', 'rb') hsh= hashlib.md5() while True: fileData= infile.read(2048) if not fileData: break hsh.update(fileData) result= hsh.hexdigest() print(result)
#!/usr/bin/python3 import hashlib, argparse, os argParser= argparse.ArgumentParser() argParser.add_argument('filename', type= str, help= 'Проверяемый файл') argParser.add_argument('folder', type= str, help= 'Путь для проверки') args= argParser.parse_args() def main(): # Хеш из проверяемого файла #checkFileHash= GetHash(args.filename) # Перебираем целевую папку for file in os.listdir(args.folder): currentHash= GetHash(args.folder +'/'+ file) print(file +' - '+ currentHash) def GetHash(fullFileName): """ Сгенерировать хеш строку из указанного файла """ openedFile= open(fullFileName, 'rb') md5= hashlib.md5() while True: fileData= openedFile.read(2048) if not fileData: break md5.update(fileData) return md5.hexdigest() if __name__ == "__main__": main()
Вроде более рекомендованный способ, как то заморочено вроде
import subprocess var= subprocess.run("ls /home/admin", shell=True, stdout=subprocess.PIPE, encoding='utf-8') print(var.stdout)
Вывод сразу на консоль, возвращает код возврата
import os os.system("ls /home/admin >/dev/null")