Project/mini-project

폰 분실 대비 2. (GPS. 이동 경로 보내기)

fullfish 2025. 12. 1. 10:10

요약

핸드폰 분실 했을 시
정해진 트리거가 작동 했을때

GPS를 1분 단위로 계속 저장,

이동 경로를 kml형태로 메일로 전송

 

트리거 . 정해진 패턴이 들어간 문자 수신시

GPS 저장 시작, GPS 저장 중지, 모든 데이터 전송, 정해진 기간 데이터 전송

(@start, @stop, @send all, @send 2025-11-26_19:00 2025-11-26_23:59)

헤맨점. 위치정보의 provider이 GPS가 정확도가 높지만 실내에서는 안됨.

먼저 GPS 탐색 실패시 network 탐색을 했는데 GPS 실패시 1분간의 유휴시간이 발생

(성공후 바로 같은 provider 탐색은 1초 미만으로 걸림)

 유휴시간을 없애고자 GPS 실패시 해당 프로세서 kill하고 바로 실행 시켜 보려고 하였으나 안드로이드 단계에서 막히는 듯

->처음에 GPS 탐색. GPS 탐색 성공시 계속 GPS 탐색,

실패시 그 후로는 계속 network 탐색과 동시에 30분 마다 GPS 다시 한번 탐색하는 방법으로 구현

 

결과

 

git

 

GitHub - full-fish/tracking_GPS

Contribute to full-fish/tracking_GPS development by creating an account on GitHub.

github.com

 

코드

# gps_logger.py

import subprocess
import time
import json
import os
import csv
from datetime import datetime

# =========================
# 설정 (알고리즘 파라미터)
# =========================
# Tasker 오류 방지를 위한 절대 경로
BASE_DIR = "/data/data/com.termux/files/home/dev/tracking_GPS"
LOG_FILE = os.path.join(BASE_DIR, "gps_log.csv")

# 시간 설정 (초 단위)
GPS_TIMEOUT = 20  # GPS 탐색 제한 시간
LONG_NET_TIMEOUT = 120  # GPS 실패 직후 넉넉한 네트워크 탐색 시간 (2분)
SHORT_NET_TIMEOUT = 20  # 평상시 네트워크 탐색 시간
GPS_RETRY_INTERVAL = 1800  # 네트워크 모드일 때 GPS 재시도 간격 (30분)
LOOP_INTERVAL = 60  # 기본 반복 간격 (1분)


def log(msg):
    """터미널에 시간과 함께 로그 출력"""
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")


def save_to_csv(json_str):
    """JSON 데이터를 CSV에 저장"""
    try:
        data = json.loads(json_str)
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        lat = data.get("latitude")
        lon = data.get("longitude")
        acc = data.get("accuracy")
        prov = data.get("provider")

        if not os.path.exists(LOG_FILE):
            with open(LOG_FILE, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(
                    ["timestamp", "latitude", "longitude", "accuracy", "provider"]
                )

        with open(LOG_FILE, "a", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow([timestamp, lat, lon, acc, prov])

        log(f"기록됨: {lat}, {lon} ({prov})")
        return True
    except Exception as e:
        log(f"저장 실패: {e}")
        return False


def try_gps():
    log("GPS 탐색 (최대 20초)...")
    proc = subprocess.Popen(
        ["termux-location", "-p", "gps"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    try:
        stdout, stderr = proc.communicate(timeout=GPS_TIMEOUT)
        if proc.returncode == 0:
            log("GPS 성공!")
            save_to_csv(stdout)
            return True
        return False
    except subprocess.TimeoutExpired:
        log("GPS 시간 초과. Kill.")
        proc.kill()
        proc.wait()
        return False
    except:
        try:
            proc.kill()
        except:
            pass
        return False


def try_network(duration):
    log(f"네트워크 탐색 ({duration}초)...")
    try:
        result = subprocess.run(
            ["termux-location", "-p", "network"],
            capture_output=True,
            text=True,
            timeout=duration,
        )
        if result.returncode == 0:
            log("네트워크 성공!")
            save_to_csv(result.stdout)
            return True
        return False
    except subprocess.TimeoutExpired:
        log(f"네트워크 시간 초과.")
        return False


def main_logic():
    current_mode = "GPS_MODE"
    last_gps_try_time = time.time()

    log(f"스마트 위치 추적 시작")
    subprocess.run(["termux-wake-lock"])  # 백그라운드 유지

    try:
        while True:
            if current_mode == "GPS_MODE":
                if try_gps():
                    log(f"-> GPS 모드 유지. {LOOP_INTERVAL}초 대기.")
                else:
                    log("GPS 실패. 2분간 네트워크 탐색 (Cooling down)...")
                    try_network(LONG_NET_TIMEOUT)
                    current_mode = "NETWORK_MODE"
                    last_gps_try_time = time.time()
                    log(f"-> 네트워크 모드 전환. (GPS 재시도: 30분 뒤)")

            elif current_mode == "NETWORK_MODE":
                time_since_last_gps = time.time() - last_gps_try_time
                if time_since_last_gps >= GPS_RETRY_INTERVAL:
                    log("30분 경과. GPS 재확인...")
                    if try_gps():
                        current_mode = "GPS_MODE"
                        log("GPS 복구됨! GPS 모드로 복귀.")
                    else:
                        log("GPS 실패. 다시 2분간 네트워크 탐색.")
                        try_network(LONG_NET_TIMEOUT)
                        last_gps_try_time = time.time()
                else:
                    try_network(SHORT_NET_TIMEOUT)
                    log(f"-> 네트워크 모드 유지.")

            time.sleep(LOOP_INTERVAL)

    except KeyboardInterrupt:
        log("중지됨.")
    finally:
        subprocess.run(["termux-wake-unlock"])
        log("종료 (Wake Lock 해제)")


if __name__ == "__main__":
    main_logic()

 

# gps_manager.py

import sys
import subprocess
import os
import csv
import smtplib
import configparser
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders

# =========================
# 절대 경로 설정
# =========================
BASE_DIR = "/data/data/com.termux/files/home/dev/tracking_GPS"
LOGGER_SCRIPT = os.path.join(BASE_DIR, "gps_logger.py")
LOG_FILE = os.path.join(BASE_DIR, "gps_log.csv")
CONFIG_FILE = os.path.join(BASE_DIR, "config.ini")

# =========================
# 기능 함수들
# =========================


def start_logging():
    try:
        pid = subprocess.check_output(["pgrep", "-f", "gps_logger.py"]).strip()
        print(f"이미 실행 중입니다! (PID: {pid.decode()})")
    except subprocess.CalledProcessError:
        cmd = f"nohup python {LOGGER_SCRIPT} > /dev/null 2>&1 &"
        os.system(cmd)
        print(f"GPS 수집을 시작했습니다. (백그라운드)")
        print(f"저장 위치: {LOG_FILE}")


def stop_logging():
    try:
        pid = subprocess.check_output(["pgrep", "-f", "gps_logger.py"]).strip()
        os.system(f"kill {pid.decode()}")
        subprocess.run(["termux-wake-unlock"])
        print("GPS 수집을 종료했습니다.")
    except subprocess.CalledProcessError:
        print("실행 중인 GPS 수집기가 없습니다.")


def create_kml(data_rows, output_file):
    kml_header = """<?xml version="1.0" encoding="UTF-8"?>
<kml xmlns="http://www.opengis.net/kml/2.2">
  <Document>
    <name>내 이동 경로</name>
    <Style id="lineStyle">
      <LineStyle>
        <color>ff0000ff</color>
        <width>4</width>
      </LineStyle>
    </Style>
    <Placemark>
      <name>Path</name>
      <styleUrl>#lineStyle</styleUrl>
      <LineString>
        <tessellate>1</tessellate>
        <coordinates>
"""
    kml_footer = """        </coordinates>
      </LineString>
    </Placemark>
  </Document>
</kml>"""

    with open(output_file, "w", encoding="utf-8") as f:
        f.write(kml_header)
        for row in data_rows:
            if len(row) >= 3:
                f.write(f"{row[2]},{row[1]},0 \n")
        f.write(kml_footer)


def send_email_with_files(files, start_t, end_t):
    config = configparser.ConfigParser()

    if os.path.exists(CONFIG_FILE):
        config.read(CONFIG_FILE)
    else:
        print(f"config.ini 파일을 찾을 수 없습니다.\n({CONFIG_FILE})")
        return

    if not config.sections():
        print("설정 파일에 계정 정보가 없습니다.")
        return

    email_sent_flag = False

    for section in config.sections():
        print(f"\n[{section}] 계정으로 전송 시도 중...")
        try:
            settings = config[section]
            SMTP_SERVER = settings.get("smtp_server")
            SMTP_PORT = settings.get("smtp_port")
            SENDER_EMAIL = settings.get("sender_email")
            APP_PASSWORD = settings.get("app_password")
            RECIPIENT_EMAIL = settings.get("recipient_email")

            if not all(
                [SMTP_SERVER, SMTP_PORT, SENDER_EMAIL, APP_PASSWORD, RECIPIENT_EMAIL]
            ):
                print(f"[{section}] 정보 부족. 패스.")
                continue

            msg = MIMEMultipart()
            msg["From"] = SENDER_EMAIL
            msg["To"] = RECIPIENT_EMAIL
            msg["Subject"] = f"이동 동선 데이터 ({start_t} ~ {end_t})"

            body = (
                f"요청하신 기간의 이동 경로 데이터입니다.\n"
                f"- 기간: {start_t} ~ {end_t}\n\n"
                f"첨부파일:\n1. .csv (엑셀)\n2. .kml (지도)"
            )
            msg.attach(MIMEText(body, "plain"))

            for filename in files:
                if os.path.exists(filename):
                    with open(filename, "rb") as f:
                        part = MIMEBase("application", "octet-stream")
                        part.set_payload(f.read())
                    encoders.encode_base64(part)
                    part.add_header(
                        "Content-Disposition",
                        f"attachment; filename={os.path.basename(filename)}",
                    )
                    msg.attach(part)

            server = smtplib.SMTP(SMTP_SERVER, int(SMTP_PORT))
            server.starttls()
            server.login(SENDER_EMAIL, APP_PASSWORD)
            server.sendmail(SENDER_EMAIL, RECIPIENT_EMAIL, msg.as_string())
            server.quit()

            print(f"[{section}] 메일 전송 성공!")
            email_sent_flag = True
            break

        except Exception as e:
            print(f"[{section}] 전송 실패: {e}")
            continue

    for f in files:
        if os.path.exists(f):
            os.remove(f)

    if not email_sent_flag:
        print("\n모든 계정 전송 실패.")


def send_data(arg1, arg2=None):
    """
    데이터 전송 함수
    - arg1: 'all' 또는 시작 시간 (YYYY-MM-DD_HH:MM)
    - arg2: 종료 시간 (YYYY-MM-DD_HH:MM)
    """
    is_all_data = False
    start_dt = None
    end_dt = None

    # 'all' 모드 확인
    if arg1.lower() == "all":
        is_all_data = True
        start_str = "전체 기간"
        end_str = "(ALL)"
        print("전체 기간의 데이터를 조회합니다.")
    else:
        # [수정된 부분]: 언더바를 공백으로 치환하여 datetime 파싱 준비
        raw_start_str = arg1
        raw_end_str = arg2

        start_str = raw_start_str.replace("_", " ")
        end_str = raw_end_str.replace("_", " ")

        try:
            # YYYY-MM-DD HH:MM 형식으로 파싱
            fmt = "%Y-%m-%d %H:%M"
            start_dt = datetime.strptime(start_str, fmt)
            end_dt = datetime.strptime(end_str, fmt)
        except ValueError:
            print("날짜 형식 오류. 'YYYY-MM-DD_HH:MM' 형태로 입력하세요.")
            print(f"입력된 값: 시작='{raw_start_str}', 종료='{raw_end_str}'")
            return

    # 2. CSV 읽기 (이하 동일)
    if not os.path.exists(LOG_FILE):
        print(f"로그 파일({LOG_FILE})이 없습니다.")
        return

    filtered_rows = []
    with open(LOG_FILE, "r", encoding="utf-8") as f:
        reader = csv.reader(f)
        header = next(reader, None)

        for row in reader:
            if not row or len(row) < 3:
                continue

            if is_all_data:
                filtered_rows.append(row)
            else:
                try:
                    row_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
                    if start_dt <= row_dt <= end_dt:
                        filtered_rows.append(row)
                except ValueError:
                    continue

    print(f"총 {len(filtered_rows)}개의 데이터 발견.")

    if not filtered_rows:
        print("전송할 데이터가 없습니다.")
        return

    timestamp_str = datetime.now().strftime("%Y%m%d_%H%M")
    export_csv = os.path.join(BASE_DIR, f"path_{timestamp_str}.csv")
    export_kml = os.path.join(BASE_DIR, f"map_{timestamp_str}.kml")

    with open(export_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        if header:
            writer.writerow(header)
        writer.writerows(filtered_rows)

    create_kml(filtered_rows, export_kml)
    send_email_with_files([export_csv, export_kml], start_str, end_str)


# =========================
# 메인 실행부 (main_logic)
# =========================
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("사용법: python gps_manager.py [start|stop|send '시작' '종료']")
        sys.exit(1)

    mode = sys.argv[1]

    if mode == "start":
        start_logging()
    elif mode == "stop":
        stop_logging()
    elif mode == "send":
        # 인자 개수에 따라 분기
        if len(sys.argv) == 3 and sys.argv[2].lower() == "all":
            send_data("all")
        elif len(sys.argv) >= 4:
            # 2개의 인수를 send_data 함수에 전달
            send_data(sys.argv[2], sys.argv[3])
        else:
            print(
                "사용법 오류: 'send all' 또는 'send 시작일자_시간 종료일자_시간' 형태로 입력하세요."
            )
    else:
        print(f"알 수 없는 명령어: {mode}")
#config.ini

[ACCOUNT_1_GOOGLE]
smtp_server = smtp.gmail.com
smtp_port = 587
sender_email = 본인_구글_이메일@gmail.com
app_password = 구글_앱_비밀번호
recipient_email = 본인_구글_이메일@gmail.com

[ACCOUNT_2_NAVER]
smtp_server = smtp.naver.com
smtp_port = 587
sender_email = 본인_네이버_아이디@naver.com
app_password = 네이버_앱_비밀번호
recipient_email = 본인_네이버_아이디@naver.com