프로젝트 전체 단계
- 사용자 활동 모니터링 설정
- SSH 로그인 시도 및 인증 실패를 모니터링합니다.
- 출근 시간 체크
- 이상한 네트워크 트래픽 감지
- 특정 IP 대역 모니터링
- 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.
- 디스코드 알림 시스템 구축
- 특정 이벤트 발생 시 관리자에게 실시간으로 알림을 보냅니다.
- 동적 권한 관리 로직 구현
- 사용자 활동에 기반하여 권한을 동적으로 할당하거나 회수하는 로직을 개발합니다.
- 이상 행위 탐지 및 대응
- 비정상적인 사용자 행동을 감지하고, 이를 통해 관리자가 신속히 대응할 수 있도록 합니다.
1. 사용자 활동 모니터링 설정 - ssh 로그인 탐지 파트
사용자 활동 모니터링 설정 - ssh 로그인 시도 및 인증실패 모니터링 부분 구현
-
- SSH 로그인 실패 감지: Failed password for 로그를 감지하여 SSH 로그인 실패 시도를 추적합니다.
- PAM 추가 인증 실패 처리: PAM 로그에서 추가된 인증 실패를 감지하고 필요시 기록합니다.
- 알림 전송: 3회 실패 시마다 디스코드로 실시간 알림을 전송합니다.
- 총 실패 횟수 기록: 하루 동안 총 몇 번의 로그인 실패가 발생했는지 기록하여 알림에 함께 표시합니다.
- 하루가 지나면 카운트 초기화: 매일 자정이 지나면 실패 횟수가 초기화됩니다.
웹후크의 URL을 따로 복사해둔다.
Python 코드 작성
import time
import requests
from collections import defaultdict
from datetime import datetime, timedelta
# 디스코드 웹훅 URL 설정
DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/YOUR_WEBHOOK_URL'
# 로그 파일 경로
LOG_FILE = '/var/log/secure' # CentOS의 경우
# 실패 시도 기록을 위한 딕셔너리 {IP: [시도시간들]}
failed_attempts = defaultdict(list)
# 최대 허용 실패 횟수 및 시간 간격 (예: 5분 이내 3번 실패)
MAX_ATTEMPTS = 3
TIME_WINDOW = timedelta(minutes=5)
def send_discord_alert(message):
data = {
"content": message
}
response = requests.post(DISCORD_WEBHOOK_URL, json=data)
if response.status_code == 204:
print("디스코드 알림 전송 성공")
else:
print(f"디스코드 알림 전송 실패: {response.status_code}")
def monitor_log():
with open(LOG_FILE, 'r') as file:
# 파일 끝으로 이동
file.seek(0, 2)
while True:
line = file.readline()
if not line:
time.sleep(0.1) # 파일이 업데이트될 때까지 대기
continue
if "Failed password for" in line:
print(f"로그 감지: {line.strip()}")
# 로그에서 IP 주소 추출
parts = line.split()
ip_index = parts.index("from") + 1 if "from" in parts else -1
if ip_index != -1:
ip = parts[ip_index]
# 현재 시간 가져오기
current_time = datetime.now()
failed_attempts[ip].append(current_time)
# 지정된 시간 간격 내의 실패 시도만 유지
failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]
# 실패 시도가 기준을 초과했을 때 알림
if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
send_discord_alert(f"의심스러운 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
# 알림 후 해당 IP의 실패 기록 초기화
failed_attempts[ip].clear()
if __name__ == "__main__":
print("SSH 로그 감시 시작...")
monitor_log()
스크립트 설명
1. 라이브러리 임포트
- time, requests, defaultdict, datetime, timedelta 등 필요한 라이브러리들을 불러옵니다.
- requests는 디스코드 웹훅에 HTTP 요청을 보내기 위한 라이브러리입니다.
2. 디스코드 웹훅 설정
- DISCORD_WEBHOOK_URL에 디스코드 웹훅 URL을 설정하여 로그인 실패 시 알림을 받을 수 있습니다.
3. 로그 파일 경로
- LOG_FILE = '/var/log/secure': CentOS 또는 Amazon Linux에서 SSH 관련 인증 로그를 기록하는 /var/log/secure 파일을 모니터링합니다.
4. 실패 시도 추적
- failed_attempts = defaultdict(list)는 IP 주소별로 실패 시도를 기록합니다. 각 IP에 대해 실패한 시도 시간을 리스트에 저장하고, 해당 IP에서 여러 번 실패하면 이를 추적합니다.
- MAX_ATTEMPTS = 3: 설정된 시간 동안 3번 이상의 실패가 발생하면 알림을 보냅니다.
- TIME_WINDOW = timedelta(minutes=5): 5분 이내에 발생한 실패만 추적합니다.
5. 디스코드 알림 전송
- send_discord_alert() 함수는 디스코드 웹훅을 통해 경고 메시지를 보냅니다. 실패한 로그 정보와 함께 메시지를 전송하여 실시간으로 의심스러운 시도를 알립니다.
6. 로그 파일 모니터링
- monitor_log() 함수는 /var/log/secure 파일을 실시간으로 감시합니다.
- file.seek(0, 2)를 사용하여 로그 파일의 끝에서부터 읽기 시작하며, 새로 추가되는 로그를 계속 모니터링합니다.
- "Failed password for" 메시지가 감지되면, 해당 로그에서 IP 주소를 추출하고, 설정된 시간 내 실패 횟수를 계산하여 알림을 보냅니다.
7. 메인 실행 부분
- 스크립트의 메인 실행부에서 monitor_log() 함수를 호출하여 SSH 인증 실패를 실시간으로 감시합니다.
- 사용자 활동 모니터링 설정
- SSH 로그인 시도 및 인증 실패를 모니터링합니다.
- 출근 시간 체크
- 이상한 네트워크 트래픽 감지
- 특정 IP 대역 모니터링
- 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.
내부 네트워크 대역 설정 (기존 코드에 추가하여 기능 구현하는 것!)
내부 네트워크에서 사용 중인 IP 대역을 INTERNAL_NETWORKS에 정의하고, 이 대역 이외의 IP를 감시할 수 있습니다.
import ipaddress
# 내부 네트워크 대역 설정 (예: 사설 네트워크 대역)
INTERNAL_NETWORKS = [
"192.168.0.0/24", # 예: 내부 네트워크 대역 (수정 가능)
]
# IP가 내부 네트워크 대역에 속하는지 확인
def is_internal_ip(ip):
try:
ip_obj = ipaddress.ip_address(ip)
for network in INTERNAL_NETWORKS:
if ip_obj in ipaddress.ip_network(network):
return True
return False
except ValueError:
# 잘못된 IP 형식이면 무시
return False
비정상적인 시간대 감시
def is_suspicious_time():
current_hour = datetime.now().hour
# 새벽 12시에서 6시 사이를 의심스러운 시간대로 설정
if 0 <= current_hour <= 6:
return True
return False
로그 감시 함수 수정
def monitor_log():
with open(LOG_FILE, 'r') as file:
file.seek(0, 2)
while True:
line = file.readline()
if not line:
time.sleep(0.1)
continue
if "Failed password for" in line:
print(f"로그 감지: {line.strip()}")
parts = line.split()
ip_index = parts.index("from") + 1 if "from" in parts else -1
if ip_index != -1:
ip = parts[ip_index]
current_time = datetime.now()
failed_attempts[ip].append(current_time)
failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]
if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
if not is_internal_ip(ip):
send_discord_alert(f"🚨 외부 IP에서 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
elif is_suspicious_time():
send_discord_alert(f"⚠️ 의심스러운 시간대에 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
else:
send_discord_alert(f"SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
failed_attempts[ip].clear()
수정된 전체코드 (로그인 실패뿐 아니라 의심스러운 성공시에도 알람이 가게 구현)
1 import time
2 import requests
3 from collections import defaultdict
4 from datetime import datetime, timedelta
5 import ipaddress
6
7 # 디스코드 웹훅 URL 설정
8 DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/1295804148511543356/bV-nIzyT7_y0p7e5c1rGGU0CJO0klTUkKt9391X9HEXzCo-ofBKyWeqtugLLxfOX9QQh'
9
10 # 로그 파일 경로 (CentOS의 경우)
11 LOG_FILE = '/var/log/secure'
12
13 # 실패 및 성공 시도 기록을 위한 딕셔너리 {IP: [시도 시간들]}
14 failed_attempts = defaultdict(list)
15 success_attempts = defaultdict(list)
16
17 # 최대 허용 실패 횟수 및 시간 간격 (예: 5분 ��내 3번 실패)
18 MAX_ATTEMPTS = 3
19 TIME_WINDOW = timedelta(minutes=5)
20
21 # 내부 네트워크 대역 설정 (예: 사설 네트워크 대역)
22 INTERNAL_NETWORKS = [
23 "192.168.0.0/24", # 예시: 내부 네트워크 대역 (수정 가능)
24 "10.0.0.0/8"
25 ]
26
27 # 의심스러운 시간대 (예: 자정부터 6시까지)
28 def is_suspicious_time():
29 current_hour = datetime.now().hour
30 if 0 <= current_hour <= 6:
31 return True
32 return False
33
34 # IP가 내부 네트워크 대역에 속하는지 확인
35 def is_internal_ip(ip):
36 try:
37 ip_obj = ipaddress.ip_address(ip)
38 for network in INTERNAL_NETWORKS:
39 if ip_obj in ipaddress.ip_network(network):
40 return True
41 return False
42 except ValueError:
43 # 잘못된 IP 형식이면 무시
44 return False
45
46 # 디스코드 알림 전송 함수
47 def send_discord_alert(message):
48 data = {
49 "content": message
50 }
51 response = requests.post(DISCORD_WEBHOOK_URL, json=data)
52 if response.status_code == 204:
53 print("디스코드 알림 전송 성공")
54 else:
55 print(f"디스코드 알림 전송 실패: {response.status_code}")
56
57 # SSH 로그를 실시간으로 감시하는 함수
58 def monitor_log():
59 with open(LOG_FILE, 'r') as file:
60 file.seek(0, 2)
61
62 while True:
63 line = file.readline()
64 if not line:
65 time.sleep(0.1)
66 continue
67
68 # 로그인 실패 감지
69 if "Failed password for" in line:
70 print(f"로그 감지: {line.strip()}")
71
72 parts = line.split()
73 ip_index = parts.index("from") + 1 if "from" in parts else -1
74 if ip_index != -1:
75 ip = parts[ip_index]
76
77 current_time = datetime.now()
78 failed_attempts[ip].append(current_time)
79
80 # 지정된 시간 간격 내의 실패 시도만 유지
81 failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]
82
83 # 실패 시도가 기준을 초과했을 때 알림
84 if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
85 if not is_internal_ip(ip):
86 send_discord_alert(f"🚨 외부 IP에서 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
87 elif is_suspicious_time():
88 send_discord_alert(f"⚠️ 의심스러운 시간대에 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
89 else:
90 send_discord_alert(f"SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
91 failed_attempts[ip].clear()
92
93 # 로그인 성공 감지
94 elif "Accepted password for" in line:
95 print(f"로그인 성공 감지: {line.strip()}")
96
97 parts = line.split()
98 ip_index = parts.index("from") + 1 if "from" in parts else -1
99 if ip_index != -1:
100 ip = parts[ip_index]
101
102 current_time = datetime.now()
103 success_attempts[ip].append(current_time)
104
105 # 외부 IP가 로그인 성공했을 때 알림
106 if not is_internal_ip(ip):
107 send_discord_alert(f"🚨 외부 IP에서 SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
108 elif is_suspicious_time():
109 send_discord_alert(f"⚠️의심스러운 시간대에 SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
110 else:
111 send_discord_alert(f"SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
112 success_attempts[ip].clear()
113
114 # 메인 함수: SSH 로그 감시 시작
115 if __name__ == "__main__":
116 print("SSH 로그 감시 시작...")
117 monitor_log()
118
주요 기능 설명:
1. 내부 네트워크 대역 설정:
- INTERNAL_NETWORKS에 내부 네트워크 대역(예: 192.168.0.0/24)을 설정합니다.
- 이 대역에 속하는 IP 주소는 신뢰할 수 있는 내부 네트워크로 간주되며, 외부 네트워크에서 발생한 로그인 시도는 감시 대상으로 간주합니다.
2. 외부 IP 감시:
- is_internal_ip() 함수는 IP가 내부 네트워크 대역에 속하지 않는지 확인합니다.
- 내부 네트워크에 속하지 않는 외부 네트워크에서 발생한 로그인 시도를 의심스러운 것으로 처리하고 감시합니다.
3. 비정상적인 시간대 감시:
- is_suspicious_time() 함수는 자정부터 새벽 6시까지의 로그인 시도를 감시하여, 해당 시간대에 발생한 로그인 시도를 의심스러운 것으로 처리합니다.
4. 디스코드 알림 전송:
- 로그인 성공 또는 실패가 감지되면, 디스코드로 알림을 전송합니다.
- 외부 네트워크에서 발생한 실패 시도나 의심스러운 시간대에서의 로그인 성공/실패 시도에 대해 별도의 경고 메시지를 보냅니다.
5. 실시간 로그 모니터링:
- /var/log/secure 파일을 실시간으로 감시하여 "Failed password for" 또는 "Accepted password for" 로그를 감지합니다.
- IP 주소와 로그인 성공/실패 시도 시간을 기록하며, 설정한 시간 내에 일정 횟수를 초과할 경우 알림을 전송합니다.
사용자 활동 모니터링 설정
- SSH 로그인 시도 및 인증 실패를 모니터링합니다.
- 출근 시간 체크
- 이상한 네트워크 트래픽 감지
- 특정 IP 대역 모니터링
- 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.
+++ 나머지 기능 추가예정
'K쉴드 주니어 수업 정리' 카테고리의 다른 글
프로젝트 최종 (전동 킥보드 무면허 방지 인증 시스템) (0) | 2024.10.14 |
---|---|
진단 실습 - 웹 취약점 (1) | 2024.10.04 |
진단 실습 (2) | 2024.09.28 |
유닉스 보안 진단 가이드: 항목별 설정 체크 (1) | 2024.09.16 |
프로젝트 주제 선정과정 (0) | 2024.09.01 |