2023-06-06 01:38:15 +03:00
|
|
|
import os
|
2023-05-21 02:20:55 +03:00
|
|
|
import re
|
|
|
|
import tempfile
|
2023-06-06 01:38:15 +03:00
|
|
|
import unicodedata
|
|
|
|
|
2023-08-11 09:20:12 +03:00
|
|
|
from urllib.parse import urljoin
|
2023-06-06 01:38:15 +03:00
|
|
|
import requests
|
|
|
|
from pydantic import BaseModel
|
2023-05-21 02:20:55 +03:00
|
|
|
|
|
|
|
|
|
|
|
class CrawlWebsite(BaseModel):
|
2023-06-20 10:54:23 +03:00
|
|
|
url: str
|
|
|
|
js: bool = False
|
2023-08-11 09:20:12 +03:00
|
|
|
depth: int = int(os.getenv("CRAWL_DEPTH","1"))
|
2023-06-20 10:54:23 +03:00
|
|
|
max_pages: int = 100
|
|
|
|
max_time: int = 60
|
2023-05-21 02:20:55 +03:00
|
|
|
|
|
|
|
def _crawl(self, url):
|
2023-08-08 18:15:43 +03:00
|
|
|
try:
|
|
|
|
response = requests.get(url)
|
|
|
|
if response.status_code == 200:
|
|
|
|
return response.text
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
|
|
print(e)
|
2023-05-21 02:20:55 +03:00
|
|
|
return None
|
|
|
|
|
|
|
|
def process(self):
|
2023-08-11 09:20:12 +03:00
|
|
|
visited_list=[]
|
|
|
|
self._process_level(self.url, 0, visited_list)
|
|
|
|
return visited_list
|
2023-06-20 10:54:23 +03:00
|
|
|
|
2023-08-11 09:20:12 +03:00
|
|
|
def _process_level(self, url, level_depth, visited_list):
|
|
|
|
content = self._crawl(url)
|
|
|
|
if content is None:
|
|
|
|
return
|
|
|
|
|
|
|
|
|
2023-06-20 10:54:23 +03:00
|
|
|
# Create a file
|
2023-08-11 09:20:12 +03:00
|
|
|
file_name = slugify(url) + ".html"
|
2023-05-21 02:20:55 +03:00
|
|
|
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
|
2023-06-20 10:54:23 +03:00
|
|
|
with open(temp_file_path, "w") as temp_file:
|
2023-07-10 15:27:49 +03:00
|
|
|
temp_file.write(content) # pyright: ignore reportPrivateUsage=none
|
2023-06-20 10:54:23 +03:00
|
|
|
# Process the file
|
|
|
|
|
2023-05-21 02:20:55 +03:00
|
|
|
if content:
|
2023-08-11 09:20:12 +03:00
|
|
|
visited_list.append((temp_file_path, file_name))
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
soup = BeautifulSoup(content, 'html5lib')
|
|
|
|
links = soup.findAll('a')
|
|
|
|
if level_depth < self.depth:
|
|
|
|
for a in links:
|
|
|
|
if not a.has_attr('href'):
|
|
|
|
continue
|
|
|
|
new_url = a['href']
|
|
|
|
file_name = slugify(new_url) + ".html"
|
|
|
|
already_visited = False
|
|
|
|
for (fpath,fname) in visited_list:
|
|
|
|
if fname == file_name :
|
|
|
|
already_visited = True
|
|
|
|
break
|
|
|
|
if not already_visited:
|
|
|
|
self._process_level(urljoin(url,new_url),level_depth + 1,visited_list)
|
|
|
|
|
|
|
|
|
2023-06-20 10:54:23 +03:00
|
|
|
|
2023-06-06 01:38:15 +03:00
|
|
|
def checkGithub(self):
|
|
|
|
if "github.com" in self.url:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
2023-05-21 02:20:55 +03:00
|
|
|
|
|
|
|
def slugify(text):
|
2023-06-20 10:54:23 +03:00
|
|
|
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("utf-8")
|
|
|
|
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
|
|
|
|
text = re.sub(r"[-\s]+", "-", text)
|
|
|
|
return text
|