K쉴드 주니어 수업 정리

동적 권한 관리(1) - 사용자 활동 모니터링 - ssh 로그인 탐지 편

ilsancityboy 2024. 10. 16. 02:50

프로젝트 전체 단계

  1. 사용자 활동 모니터링 설정
    • SSH 로그인 시도 및 인증 실패를 모니터링합니다. 
    • 출근 시간 체크
    • 이상한 네트워크 트래픽 감지
    • 특정 IP 대역 모니터링
    • 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.
  2. 디스코드 알림 시스템 구축
    • 특정 이벤트 발생 시 관리자에게 실시간으로 알림을 보냅니다.
  3. 동적 권한 관리 로직 구현
    • 사용자 활동에 기반하여 권한을 동적으로 할당하거나 회수하는 로직을 개발합니다.
  4. 이상 행위 탐지 및 대응
    • 비정상적인 사용자 행동을 감지하고, 이를 통해 관리자가 신속히 대응할 수 있도록 합니다.

 

1. 사용자 활동 모니터링 설정 - ssh 로그인 탐지 파트

사용자 활동 모니터링 설정 - ssh 로그인 시도 및 인증실패 모니터링 부분 구현

    • SSH 로그인 실패 감지: Failed password for 로그를 감지하여 SSH 로그인 실패 시도를 추적합니다.
    • PAM 추가 인증 실패 처리: PAM 로그에서 추가된 인증 실패를 감지하고 필요시 기록합니다.
    • 알림 전송: 3회 실패 시마다 디스코드로 실시간 알림을 전송합니다.
    • 총 실패 횟수 기록: 하루 동안 총 몇 번의 로그인 실패가 발생했는지 기록하여 알림에 함께 표시합니다.
    • 하루가 지나면 카운트 초기화: 매일 자정이 지나면 실패 횟수가 초기화됩니다.

 

웹후크의 URL을 따로 복사해둔다.

 

 

 

Python 코드 작성

import time
import requests
from collections import defaultdict
from datetime import datetime, timedelta

# 디스코드 웹훅 URL 설정
DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/YOUR_WEBHOOK_URL'

# 로그 파일 경로
LOG_FILE = '/var/log/secure'  # CentOS의 경우

# 실패 시도 기록을 위한 딕셔너리 {IP: [시도시간들]}
failed_attempts = defaultdict(list)

# 최대 허용 실패 횟수 및 시간 간격 (예: 5분 이내 3번 실패)
MAX_ATTEMPTS = 3
TIME_WINDOW = timedelta(minutes=5)

def send_discord_alert(message):
    data = {
        "content": message
    }
    response = requests.post(DISCORD_WEBHOOK_URL, json=data)
    if response.status_code == 204:
        print("디스코드 알림 전송 성공")
    else:
        print(f"디스코드 알림 전송 실패: {response.status_code}")

def monitor_log():
    with open(LOG_FILE, 'r') as file:
        # 파일 끝으로 이동
        file.seek(0, 2)

        while True:
            line = file.readline()
            if not line:
                time.sleep(0.1)  # 파일이 업데이트될 때까지 대기
                continue

            if "Failed password for" in line:
                print(f"로그 감지: {line.strip()}")

                # 로그에서 IP 주소 추출
                parts = line.split()
                ip_index = parts.index("from") + 1 if "from" in parts else -1
                if ip_index != -1:
                    ip = parts[ip_index]

                    # 현재 시간 가져오기
                    current_time = datetime.now()
                    failed_attempts[ip].append(current_time)

                    # 지정된 시간 간격 내의 실패 시도만 유지
                    failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]

                    # 실패 시도가 기준을 초과했을 때 알림
                    if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
                        send_discord_alert(f"의심스러운 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
                        # 알림 후 해당 IP의 실패 기록 초기화
                        failed_attempts[ip].clear()

if __name__ == "__main__":
    print("SSH 로그 감시 시작...")
    monitor_log()

 

 

스크립트 설명

1. 라이브러리 임포트

  • time, requests, defaultdict, datetime, timedelta 등 필요한 라이브러리들을 불러옵니다.
  • requests는 디스코드 웹훅에 HTTP 요청을 보내기 위한 라이브러리입니다.

2. 디스코드 웹훅 설정

  • DISCORD_WEBHOOK_URL에 디스코드 웹훅 URL을 설정하여 로그인 실패 시 알림을 받을 수 있습니다.

3. 로그 파일 경로

  • LOG_FILE = '/var/log/secure': CentOS 또는 Amazon Linux에서 SSH 관련 인증 로그를 기록하는 /var/log/secure 파일을 모니터링합니다.

4. 실패 시도 추적

  • failed_attempts = defaultdict(list)는 IP 주소별로 실패 시도를 기록합니다. 각 IP에 대해 실패한 시도 시간을 리스트에 저장하고, 해당 IP에서 여러 번 실패하면 이를 추적합니다.
  • MAX_ATTEMPTS = 3: 설정된 시간 동안 3번 이상의 실패가 발생하면 알림을 보냅니다.
  • TIME_WINDOW = timedelta(minutes=5): 5분 이내에 발생한 실패만 추적합니다.

5. 디스코드 알림 전송

  • send_discord_alert() 함수는 디스코드 웹훅을 통해 경고 메시지를 보냅니다. 실패한 로그 정보와 함께 메시지를 전송하여 실시간으로 의심스러운 시도를 알립니다.

6. 로그 파일 모니터링

  • monitor_log() 함수는 /var/log/secure 파일을 실시간으로 감시합니다.
  • file.seek(0, 2)를 사용하여 로그 파일의 끝에서부터 읽기 시작하며, 새로 추가되는 로그를 계속 모니터링합니다.
  • "Failed password for" 메시지가 감지되면, 해당 로그에서 IP 주소를 추출하고, 설정된 시간 내 실패 횟수를 계산하여 알림을 보냅니다.

7. 메인 실행 부분

  • 스크립트의 메인 실행부에서 monitor_log() 함수를 호출하여 SSH 인증 실패를 실시간으로 감시합니다.

 

 

 


  1. 사용자 활동 모니터링 설정
    • SSH 로그인 시도 및 인증 실패를 모니터링합니다
    • 출근 시간 체크
    • 이상한 네트워크 트래픽 감지
    • 특정 IP 대역 모니터링
    • 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.

 

내부 네트워크 대역 설정 (기존 코드에 추가하여 기능 구현하는 것!)

내부 네트워크에서 사용 중인 IP 대역을 INTERNAL_NETWORKS에 정의하고, 이 대역 이외의 IP를 감시할 수 있습니다.

import ipaddress

# 내부 네트워크 대역 설정 (예: 사설 네트워크 대역)
INTERNAL_NETWORKS = [
    "192.168.0.0/24",  # 예: 내부 네트워크 대역 (수정 가능)
]

# IP가 내부 네트워크 대역에 속하는지 확인
def is_internal_ip(ip):
    try:
        ip_obj = ipaddress.ip_address(ip)
        for network in INTERNAL_NETWORKS:
            if ip_obj in ipaddress.ip_network(network):
                return True
        return False
    except ValueError:
        # 잘못된 IP 형식이면 무시
        return False

 

 

 

비정상적인 시간대 감시 

def is_suspicious_time():
    current_hour = datetime.now().hour
    # 새벽 12시에서 6시 사이를 의심스러운 시간대로 설정
    if 0 <= current_hour <= 6:
        return True
    return False

 

 

 

 

 

로그 감시 함수 수정

 

def monitor_log():
    with open(LOG_FILE, 'r') as file:
        file.seek(0, 2)

        while True:
            line = file.readline()
            if not line:
                time.sleep(0.1)
                continue

            if "Failed password for" in line:
                print(f"로그 감지: {line.strip()}")

                parts = line.split()
                ip_index = parts.index("from") + 1 if "from" in parts else -1
                if ip_index != -1:
                    ip = parts[ip_index]

                    current_time = datetime.now()
                    failed_attempts[ip].append(current_time)

                    failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]

                    if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
                        if not is_internal_ip(ip):
                            send_discord_alert(f"🚨 외부 IP에서 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
                        elif is_suspicious_time():
                            send_discord_alert(f"⚠️ 의심스러운 시간대에 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
                        else:
                            send_discord_alert(f"SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
                        failed_attempts[ip].clear()

 

 

 

 

수정된 전체코드 (로그인 실패뿐 아니라 의심스러운 성공시에도 알람이 가게 구현)

  1 import time
  2 import requests
  3 from collections import defaultdict
  4 from datetime import datetime, timedelta
  5 import ipaddress
  6 
  7 # 디스코드 웹훅 URL 설정
  8 DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/1295804148511543356/bV-nIzyT7_y0p7e5c1rGGU0CJO0klTUkKt9391X9HEXzCo-ofBKyWeqtugLLxfOX9QQh'
  9 
 10 # 로그 파일 경로 (CentOS의 경우)
 11 LOG_FILE = '/var/log/secure'
 12 
 13 # 실패 및 성공 시도 기록을 위한 딕셔너리 {IP: [시도 시간들]}
 14 failed_attempts = defaultdict(list)
 15 success_attempts = defaultdict(list)
 16 
 17 # 최대 허용 실패 횟수 및 시간 간격 (예: 5분 ��내 3번 실패)
 18 MAX_ATTEMPTS = 3
 19 TIME_WINDOW = timedelta(minutes=5)
 20 
 21 # 내부 네트워크 대역 설정 (예: 사설 네트워크 대역)
 22 INTERNAL_NETWORKS = [
 23     "192.168.0.0/24",  # 예시: 내부 네트워크 대역 (수정 가능)
 24     "10.0.0.0/8"
 25 ]
 26 
 27 # 의심스러운 시간대 (예: 자정부터 6시까지)
 28 def is_suspicious_time():
 29     current_hour = datetime.now().hour
 30     if 0 <= current_hour <= 6:
 31         return True
 32     return False
 33 
 34 # IP가 내부 네트워크 대역에 속하는지 확인
 35 def is_internal_ip(ip):
 36     try:
 37         ip_obj = ipaddress.ip_address(ip)
 38         for network in INTERNAL_NETWORKS:
 39             if ip_obj in ipaddress.ip_network(network):
 40                 return True
 41         return False
 42     except ValueError:
 43         # 잘못된 IP 형식이면 무시
 44         return False
 45 
 46 # 디스코드 알림 전송 함수
 47 def send_discord_alert(message):
 48     data = {
 49         "content": message
 50     }
 51     response = requests.post(DISCORD_WEBHOOK_URL, json=data)
 52     if response.status_code == 204:
 53         print("디스코드 알림 전송 성공")
 54     else:
 55         print(f"디스코드 알림 전송 실패: {response.status_code}")
 56 
 57 # SSH 로그를 실시간으로 감시하는 함수
 58 def monitor_log():
 59     with open(LOG_FILE, 'r') as file:
 60         file.seek(0, 2)
 61 
 62         while True:
 63             line = file.readline()
 64             if not line:
 65                 time.sleep(0.1)
 66                 continue
 67 
 68             # 로그인 실패 감지
 69             if "Failed password for" in line:
 70                 print(f"로그 감지: {line.strip()}")
 71 
 72                 parts = line.split()
 73                 ip_index = parts.index("from") + 1 if "from" in parts else -1
 74                 if ip_index != -1:
 75                     ip = parts[ip_index]
 76 
 77                     current_time = datetime.now()
 78                     failed_attempts[ip].append(current_time)
 79 
 80                     # 지정된 시간 간격 내의 실패 시도만 유지
 81                     failed_attempts[ip] = [time for time in failed_attempts[ip] if current_time - time <= TIME_WINDOW]
 82 
 83                     # 실패 시도가 기준을 초과했을 때 알림
 84                     if len(failed_attempts[ip]) >= MAX_ATTEMPTS:
 85                         if not is_internal_ip(ip):
 86                             send_discord_alert(f"🚨  외부 IP에서 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
 87                         elif is_suspicious_time():
 88                             send_discord_alert(f"⚠️ 의심스러운 시간대에 SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
 89                         else:
 90                             send_discord_alert(f"SSH 인증 실패 감지 (IP: {ip}): {line.strip()}")
 91                         failed_attempts[ip].clear()
 92 
 93             # 로그인 성공 감지
 94             elif "Accepted password for" in line:
 95                 print(f"로그인 성공 감지: {line.strip()}")
 96 
 97                 parts = line.split()
 98                 ip_index = parts.index("from") + 1 if "from" in parts else -1
 99                 if ip_index != -1:
100                     ip = parts[ip_index]
101 
102                     current_time = datetime.now()
103                     success_attempts[ip].append(current_time)
104 
105                     # 외부 IP가 로그인 성공했을 때 알림
106                     if not is_internal_ip(ip):
107                         send_discord_alert(f"🚨 외부 IP에서 SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
108                     elif is_suspicious_time():
109                         send_discord_alert(f"⚠️의심스러운 시간대에 SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
110                     else:
111                         send_discord_alert(f"SSH 로그인 성공 감지 (IP: {ip}): {line.strip()}")
112                     success_attempts[ip].clear()
113 
114 # 메인 함수: SSH 로그 감시 시작
115 if __name__ == "__main__":
116     print("SSH 로그 감시 시작...")
117     monitor_log()
118

 

 

주요 기능 설명:

1. 내부 네트워크 대역 설정:

  • INTERNAL_NETWORKS에 내부 네트워크 대역(예: 192.168.0.0/24)을 설정합니다.
  • 이 대역에 속하는 IP 주소는 신뢰할 수 있는 내부 네트워크로 간주되며, 외부 네트워크에서 발생한 로그인 시도는 감시 대상으로 간주합니다.

2. 외부 IP 감시:

  • is_internal_ip() 함수는 IP가 내부 네트워크 대역에 속하지 않는지 확인합니다.
  • 내부 네트워크에 속하지 않는 외부 네트워크에서 발생한 로그인 시도를 의심스러운 것으로 처리하고 감시합니다.

3. 비정상적인 시간대 감시:

  • is_suspicious_time() 함수는 자정부터 새벽 6시까지의 로그인 시도를 감시하여, 해당 시간대에 발생한 로그인 시도를 의심스러운 것으로 처리합니다.

4. 디스코드 알림 전송:

  • 로그인 성공 또는 실패가 감지되면, 디스코드로 알림을 전송합니다.
  • 외부 네트워크에서 발생한 실패 시도나 의심스러운 시간대에서의 로그인 성공/실패 시도에 대해 별도의 경고 메시지를 보냅니다.

5. 실시간 로그 모니터링:

  • /var/log/secure 파일을 실시간으로 감시하여 "Failed password for" 또는 "Accepted password for" 로그를 감지합니다.
  • IP 주소와 로그인 성공/실패 시도 시간을 기록하며, 설정한 시간 내에 일정 횟수를 초과할 경우 알림을 전송합니다.

 

 

 

 

 

 


 

사용자 활동 모니터링 설정

  • SSH 로그인 시도 및 인증 실패를 모니터링합니다. 
  • 출근 시간 체크
  • 이상한 네트워크 트래픽 감지
  • 특정 IP 대역 모니터링
  • 사용자 행동 패턴 분석 등 다양한 사용자 활동을 모니터링합니다.

 

+++ 나머지 기능 추가예정