1
1
mirror of https://github.com/chubin/cheat.sh.git synced 2025-01-07 13:47:44 +03:00

bin/srv.py cleanup

This commit is contained in:
Igor Chubin 2018-05-05 12:50:24 +00:00
parent ab54268581
commit 26e102fc16

View File

@ -1,106 +1,108 @@
#!/usr/bin/env python #!/usr/bin/env python
# vim: set encoding=utf-8 # vim: set encoding=utf-8
import gevent """
Main server program.
"""
from gevent.wsgi import WSGIServer from gevent.wsgi import WSGIServer
from gevent.queue import Queue
from gevent.monkey import patch_all from gevent.monkey import patch_all
from gevent.subprocess import Popen, PIPE, STDOUT
patch_all() patch_all()
# pylint: disable=wrong-import-position,wrong-import-order
import sys import sys
import logging import logging
import os import os
import re
import requests
import socket
import subprocess
import time
import traceback
import dateutil.parser
import json
import jinja2 import jinja2
from flask import Flask, request, render_template, send_from_directory, send_file, make_response, redirect from flask import Flask, request, send_from_directory, redirect
app = Flask(__name__)
MYDIR = os.path.abspath(os.path.dirname( os.path.dirname('__file__') )) MYDIR = os.path.abspath(os.path.dirname(os.path.dirname('__file__')))
sys.path.append("%s/lib/" % MYDIR) sys.path.append("%s/lib/" % MYDIR)
from globals import FILE_QUERIES_LOG, LOG_FILE, TEMPLATES, STATIC, log, error from globals import FILE_QUERIES_LOG, LOG_FILE, TEMPLATES, STATIC
from limits import Limits
from cheat_wrapper import cheat_wrapper, save_cheatsheet from cheat_wrapper import cheat_wrapper
from post import process_post_request
from options import parse_args
# pylint: disable=wrong-import-position,wrong-import-order
if not os.path.exists(os.path.dirname(LOG_FILE)): if not os.path.exists(os.path.dirname(LOG_FILE)):
os.makedirs(os.path.dirname(LOG_FILE)) os.makedirs(os.path.dirname(LOG_FILE))
logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(message)s') logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(message)s')
my_loader = jinja2.ChoiceLoader([ app = Flask(__name__) # pylint: disable=invalid-name
app.jinja_loader = jinja2.ChoiceLoader([
app.jinja_loader, app.jinja_loader,
jinja2.FileSystemLoader(TEMPLATES), jinja2.FileSystemLoader(TEMPLATES),
]) ])
app.jinja_loader = my_loader
LIMITS = Limits()
def is_html_needed(user_agent): def is_html_needed(user_agent):
plaintext_clients = [ 'curl', 'wget', 'fetch', 'httpie', 'lwp-request', 'python-requests'] """
Basing on `user_agent`, return whether it needs HTML or ANSI
"""
plaintext_clients = ['curl', 'wget', 'fetch', 'httpie', 'lwp-request', 'python-requests']
if any([x in user_agent for x in plaintext_clients]): if any([x in user_agent for x in plaintext_clients]):
return False return False
return True return True
def parse_args(args):
result = {}
q = ""
for key, val in args.items():
if len(val) == 0:
q += key
continue
if q is None:
return result
if 'T' in q:
result['no-terminal'] = True
if 'q' in q:
result['quiet'] = True
options_meaning = {
"c": dict(add_comments=True),
"C": dict(add_comments=False),
"Q": dict(remove_text=True),
}
for option, meaning in options_meaning.items():
if option in q:
result.update(meaning)
for key, val in args.items():
if val == 'True':
val = True
if val == 'False':
val = False
result[key] = val
return result
@app.route('/files/<path:path>') @app.route('/files/<path:path>')
def send_static(path): def send_static(path):
"""
Return static file `path`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, path) return send_from_directory(STATIC, path)
@app.route('/favicon.ico') @app.route('/favicon.ico')
def send_favicon(): def send_favicon():
"""
Return static file `favicon.ico`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, 'favicon.ico') return send_from_directory(STATIC, 'favicon.ico')
@app.route('/malformed-response.html') @app.route('/malformed-response.html')
def send_malformed(): def send_malformed():
"""
Return static file `malformed-response.html`.
Can be served by the HTTP frontend.
"""
return send_from_directory(STATIC, 'malformed-response.html') return send_from_directory(STATIC, 'malformed-response.html')
def log_query(ip, found, topic, user_agent): def log_query(ip_addr, found, topic, user_agent):
log_entry = "%s %s %s %s" % (ip, found, topic, user_agent) """
Log processed query and some internal data
"""
log_entry = "%s %s %s %s" % (ip_addr, found, topic, user_agent)
with open(FILE_QUERIES_LOG, 'a') as my_file: with open(FILE_QUERIES_LOG, 'a') as my_file:
my_file.write(log_entry.encode('utf-8')+"\n") my_file.write(log_entry.encode('utf-8')+"\n")
def get_request_ip(req):
"""
Extract IP address from `request`
"""
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
return ip_addr
@app.route("/", methods=['GET', 'POST']) @app.route("/", methods=['GET', 'POST'])
@app.route("/<path:topic>", methods=["GET", "POST"]) @app.route("/<path:topic>", methods=["GET", "POST"])
def answer(topic = None): def answer(topic=None):
""" """
Main rendering function, it processes incoming weather queries. Main rendering function, it processes incoming weather queries.
Depending on user agent it returns output in HTML or ANSI format. Depending on user agent it returns output in HTML or ANSI format.
@ -117,45 +119,16 @@ def answer(topic = None):
html_needed = is_html_needed(user_agent) html_needed = is_html_needed(user_agent)
options = parse_args(request.args) options = parse_args(request.args)
if request.headers.getlist("X-Forwarded-For"): ip_address = get_request_ip(request)
ip = request.headers.getlist("X-Forwarded-For")[0] not_allowed = LIMITS.check_ip(ip_address)
if ip.startswith('::ffff:'): if not_allowed:
ip = ip[7:] return "429 %s\n" % not_allowed, 429
else:
ip = request.remote_addr
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
if ip.startswith('::ffff:'):
ip = ip[7:]
else:
ip = request.remote_addr
if request.method == 'POST': if request.method == 'POST':
for k, v in request.form.items(): process_post_request(request, html_needed)
if k == '':
if topic is None:
topic_name = "UNNAMED"
else:
topic_name = topic
cheatsheet = v
else:
if v == '':
if topic is None:
topic_name = "UNNAMED"
else:
topic_name = topic
cheatsheet = k
else:
topic_name = k
cheatsheet = v
save_cheatsheet(topic_name, cheatsheet)
if html_needed: if html_needed:
return redirect("/") return redirect("/")
else: return "OK\n"
return "OK\n"
if 'topic' in request.args: if 'topic' in request.args:
return redirect("/%s" % request.args.get('topic')) return redirect("/%s" % request.args.get('topic'))
@ -163,11 +136,10 @@ def answer(topic = None):
if topic is None: if topic is None:
topic = ":firstpage" topic = ":firstpage"
answer, found = cheat_wrapper(topic, request_options=options, html=is_html_needed(user_agent)) result, found = cheat_wrapper(topic, request_options=options, html=is_html_needed(user_agent))
log_query(ip, found, topic, user_agent)
return answer
server = WSGIServer(("", 8002), app) # log=None) log_query(ip_address, found, topic, user_agent)
server.serve_forever() return result
SRV = WSGIServer(("", 8002), app) # log=None)
SRV.serve_forever()