백엔드 개발 파이썬 튜토리얼 400多行Python代码实现了一个FTP服务器

400多行Python代码实现了一个FTP服务器

Jun 06, 2016 am 11:27 AM
FTP 서버

Python版本
实现了比之前的xxftp更多更完善的功能
1、继续支持多用户
2、继续支持虚拟目录
3、增加支持用户根目录以及映射虚拟目录的权限设置
4、增加支持限制用户根目录或者虚拟目录的空间大小

xxftp的特点
1、开源、跨平台
2、简单、易用
3、不需要数据库
4、可扩展性超强
5、你可以免费使用xxftp假设自己的私人FTP服务器

测试地址
ftp://xiaoxia.org
匿名帐号可以使用!
匿名根目录只读,映射了一个虚拟目录,可以上传文件但不允许更改!

使用方法
跟之前用C语言写的xxftp使用方法一样:

1. Create a root directory to hold the user directories.
Configure it in config.xml.
2. Create user directories under the root directory.
If you want to specify a password, create a directory named ".xxftp",
under which create a text file named "password" containing the MD5
code of the password.
3. If you want to specify the welcome and goodbye message, write it in
xxftp.welcome and xxftp.goodbye under the root directory.
4. Configure config.xml.

The structure of your FTP server root may be like:

-/root
-xxftp.welcome
-xxftp.goodbye

-user1
-.xxftp
-password
-...
-user2
-.xxftp
-password
-...
-anonymous源代码

代码如下:



import socket, threading, os, sys, time
import hashlib, platform, stat

listen_ip = "localhost"
listen_port = 21
conn_list = []
root_dir = "./home"
max_connections = 500
conn_timeout = 120

class FtpConnection(threading.Thread):
def __init__(self, fd):
threading.Thread.__init__(self)
self.fd = fd
self.running = True
self.setDaemon(True)
self.alive_time = time.time()
self.option_utf8 = False
self.identified = False
self.option_pasv = True
self.username = ""
def process(self, cmd, arg):
cmd = cmd.upper();
if self.option_utf8:
arg = unicode(arg, "utf8").encode(sys.getfilesystemencoding())
print "# Ftp Command
if cmd == "BYE" or cmd == "QUIT":
if os.path.exists(root_dir + "/xxftp.goodbye"):
self.message(221, open(root_dir + "/xxftp.goodbye").read())
else:
self.message(221, "Bye!")
self.running = False
return
elif cmd == "USER":
# Set Anonymous User
if arg == "": arg = "anonymous"
for c in arg:
if not c.isalpha() and not c.isdigit() and c!="_":
self.message(530, "Incorrect username.")
return
self.username = arg
self.home_dir = root_dir + "/" + self.username
self.curr_dir = "/"
self.curr_dir, self.full_path, permission, self.vdir_list, \
limit_size, is_virtual = self.parse_path("/")
if not os.path.isdir(self.home_dir):
self.message(530, "User " + self.username + " not exists.")
return
self.pass_path = self.home_dir + "/.xxftp/password"
if os.path.isfile(self.pass_path):
self.message(331, "Password required for " + self.username)
else:
self.message(230, "Identified!")
self.identified = True
return
elif cmd == "PASS":
if open(self.pass_path).read() == hashlib.md5(arg).hexdigest():
self.message(230, "Identified!")
self.identified = True
else:
self.message(530, "Not identified!")
self.identified = False
return
elif not self.identified:
self.message(530, "Please login with USER and PASS.")
return

self.alive_time = time.time()
finish = True
if cmd == "NOOP":
self.message(200, "ok")
elif cmd == "TYPE":
self.message(200, "ok")
elif cmd == "SYST":
self.message(200, "UNIX")
elif cmd == "EPSV" or cmd == "PASV":
self.option_pasv = True
try:
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_fd.bind((listen_ip, 0))
self.data_fd.listen(1)
ip, port = self.data_fd.getsockname()
if cmd == "EPSV":
self.message(229, "Entering Extended Passive Mode (|||" + str(port) + "|)")
else:
ipnum = socket.inet_aton(ip)
self.message(227, "Entering Passive Mode (%s,%u,%u)." %
(",".join(ip.split(".")), (port>>8&0xff), (port&0xff)))
except:
self.message(500, "failed to create data socket.")
elif cmd == "EPRT":
self.message(500, "implement EPRT later...")
elif cmd == "PORT":
self.option_pasv = False
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = arg.split(",")
self.data_ip = ".".join(s[:4])
self.data_port = int(s[4])*256 + int(s[5])
self.message(200, "ok")
elif cmd == "PWD" or cmd == "XPWD":
if self.curr_dir == "": self.curr_dir = "/"
self.message(257, '"' + self.curr_dir + '"')
elif cmd == "LIST" or cmd == "NLST":
if arg != "" and arg[0] == "-": arg = "" # omit parameters
remote, local, perm, vdir_list, limit_size, is_virtual = self.parse_path(arg)
if not os.path.exists(local):
self.message(550, "failed.")
return
if not self.establish(): return
self.message(150, "ok")
for v in vdir_list:
f = v[0]
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode("utf8")
if cmd == "NLST":
info = f + "\r\n"
else:
info = "d%s%s------- %04u %8s %8s %8lu %s %s\r\n" % (
"r" if "read" in perm else "-",
"w" if "write" in perm else "-",
1, "0", "0", 0,
time.strftime("%b %d %Y", time.localtime(time.time())),
f)
self.data_fd.send(info)
for f in os.listdir(local):
if f[0] == ".": continue
path = local + "/" + f
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode("utf8")
if cmd == "NLST":
info = f + "\r\n"
else:
st = os.stat(path)
info = "%s%s%s------- %04u %8s %8s %8lu %s %s\r\n" % (
"-" if os.path.isfile(path) else "d",
"r" if "read" in perm else "-",
"w" if "write" in perm else "-",
1, "0", "0", st[stat.ST_SIZE],
time.strftime("%b %d %Y", time.localtime(st[stat.ST_MTIME])),
f)
self.data_fd.send(info)
self.message(226, "Limit size: " + str(limit_size))
self.data_fd.close()
self.data_fd = 0
elif cmd == "REST":
self.file_pos = int(arg)
self.message(250, "ok")
elif cmd == "FEAT":
features = "211-Features:\r\nSITES\r\nEPRT\r\nEPSV\r\nMDTM\r\nPASV\r\n"\
"REST STREAM\r\nSIZE\r\nUTF8\r\n211 End\r\n"
self.fd.send(features)
elif cmd == "OPTS":
arg = arg.upper()
if arg == "UTF8 ON":
self.option_utf8 = True
self.message(200, "ok")
elif arg == "UTF8 OFF":
self.option_utf8 = False
self.message(200, "ok")
else:
self.message(500, "unrecognized option")
elif cmd == "CDUP":
finish = False
arg = ".."
else:
finish = False
if finish: return
# Parse argument ( It's a path )
if arg == "":
self.message(500, "where's my argument?")
return
remote, local, permission, vdir_list, limit_size, is_virtual = \
self.parse_path(arg)
# can not do anything to virtual directory
if is_virtual: permission = "none"
can_read, can_write, can_modify = "read" in permission, "write" in permission, "modify" in permission
newpath = local
try:
if cmd == "CWD":
if(os.path.isdir(newpath)):
self.curr_dir = remote
self.full_path = newpath
self.message(250, '"' + remote + '"')
else:
self.message(550, "failed")
elif cmd == "MDTM":
if os.path.exists(newpath):
self.message(213, time.strftime("%Y%m%d%I%M%S", time.localtime(
os.path.getmtime(newpath))))
else:
self.message(550, "failed")
elif cmd == "SIZE":
self.message(231, os.path.getsize(newpath))
elif cmd == "XMKD" or cmd == "MKD":
if not can_modify:
self.message(550, "permission denied.")
return
os.mkdir(newpath)
self.message(250, "ok")
elif cmd == "RNFR":
if not can_modify:
self.message(550, "permission denied.")
return
self.temp_path = newpath
self.message(350, "rename from " + remote)
elif cmd == "RNTO":
os.rename(self.temp_path, newpath)
self.message(250, "RNTO to " + remote)
elif cmd == "XRMD" or cmd == "RMD":
if not can_modify:
self.message(550, "permission denied.")
return
os.rmdir(newpath)
self.message(250, "ok")
elif cmd == "DELE":
if not can_modify:
self.message(550, "permission denied.")
return
os.remove(newpath)
self.message(250, "ok")
elif cmd == "RETR":
if not os.path.isfile(newpath):
self.message(550, "failed")
return
if not can_read:
self.message(550, "permission denied.")
return
if not self.establish(): return
self.message(150, "ok")
f = open(newpath, "rb")
while self.running:
self.alive_time = time.time()
data = f.read(8192)
if len(data) == 0: break
self.data_fd.send(data)
f.close()
self.data_fd.close()
self.data_fd = 0
self.message(226, "ok")
elif cmd == "STOR" or cmd == "APPE":
if not can_write:
self.message(550, "permission denied.")
return
if os.path.exists(newpath) and not can_modify:
self.message(550, "permission denied.")
return
# Check space size remained!
used_size = 0
if limit_size > 0:
used_size = self.get_dir_size(os.path.dirname(newpath))
if not self.establish(): return
self.message(150, "ok")
f = open(newpath, ("ab" if cmd == "APPE" else "wb") )
while self.running:
self.alive_time = time.time()
data = self.data_fd.recv(8192)
if len(data) == 0: break
if limit_size > 0:
used_size = used_size + len(data)
if used_size > limit_size: break
f.write(data)
f.close()
self.data_fd.close()
self.data_fd = 0
if limit_size > 0 and used_size > limit_size:
self.message(550, "Exceeding user space limit: " + str(limit_size) + " bytes")
else:
self.message(226, "ok")
else:
self.message(500, cmd + " not implemented")
except:
self.message(550, "failed.")

def establish(self):
if self.data_fd == 0:
self.message(500, "no data connection")
return False
if self.option_pasv:
fd = self.data_fd.accept()[0]
self.data_fd.close()
self.data_fd = fd
else:
try:
self.data_fd.connect((self.data_ip, self.data_port))
except:
self.message(500, "failed to establish data connection")
return False
return True

def read_virtual(self, path):
vdir_list = []
path = path + "/.xxftp/virtual"
if os.path.isfile(path):
for v in open(path, "r").readlines():
items = v.split()
items[1] = items[1].replace("$root", root_dir)
vdir_list.append(items)
return vdir_list

def get_dir_size(self, folder):
size = 0
for path, dirs, files in os.walk(folder):
for f in files:
size += os.path.getsize(os.path.join(path, f))
return size

def read_size(self, path):
size = 0
path = path + "/.xxftp/size"
if os.path.isfile(path):
size = int(open(path, "r").readline())
return size

def read_permission(self, path):
permission = "read,write,modify"
path = path + "/.xxftp/permission"
if os.path.isfile(path):
permission = open(path, "r").readline()
return permission

def parse_path(self, path):
if path == "": path = "."
if path[0] != "/":
path = self.curr_dir + "/" + path
s = os.path.normpath(path).replace("\\", "/").split("/")
local = self.home_dir
# reset directory permission
vdir_list = self.read_virtual(local)
limit_size = self.read_size(local)
permission = self.read_permission(local)
remote = ""
is_virtual = False
for name in s:
name = name.lstrip(".")
if name == "": continue
remote = remote + "/" + name
is_virtual = False
for v in vdir_list:
if v[0] == name:
permission = v[2]
local = v[1]
limit_size = self.read_size(local)
is_virtual = True
if not is_virtual: local = local + "/" + name
vdir_list = self.read_virtual(local)
return (remote, local, permission, vdir_list, limit_size, is_virtual)

def run(self):
''' Connection Process '''
try:
if len(conn_list) > max_connections:
self.message(500, "too many connections!")
self.fd.close()
self.running = False
return
# Welcome Message
if os.path.exists(root_dir + "/xxftp.welcome"):
self.message(220, open(root_dir + "/xxftp.welcome").read())
else:
self.message(220, "xxftp(Python) www.xiaoxia.org")
# Command Loop
line = ""
while self.running:
data = self.fd.recv(4096)
if len(data) == 0: break
line += data
if line[-2:] != "\r\n": continue
line = line[:-2]
space = line.find(" ")
if space == -1:
self.process(line, "")
else:
self.process(line[:space], line[space+1:])
line = ""
except:
print "error", sys.exc_info()
self.running = False
self.fd.close()
print "connection end", self.fd, "user", self.username

def message(self, code, s):
''' Send Ftp Message '''
s = str(s).replace("\r", "")
ss = s.split("\n")
if len(ss) > 1:
r = (str(code) + "-") + ("\r\n" + str(code) + "-").join(ss[:-1])
r += "\r\n" + str(code) + " " + ss[-1] + "\r\n"
else:
r = str(code) + " " + ss[0] + "\r\n"
if self.option_utf8:
r = unicode(r, sys.getfilesystemencoding()).encode("utf8")
self.fd.send(r)

def server_listen():
global conn_list
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind((listen_ip, listen_port))
listen_fd.listen(1024)
conn_lock = threading.Lock()
print "ftpd is listening on ", listen_ip + ":" + str(listen_port)

while True:
conn_fd, remote_addr = listen_fd.accept()
print "connection from ", remote_addr, "conn_list", len(conn_list)
conn = FtpConnection(conn_fd)
conn.start()

conn_lock.acquire()
conn_list.append(conn)
# check timeout
try:
curr_time = time.time()
for conn in conn_list:
if int(curr_time - conn.alive_time) > conn_timeout:
if conn.running == True:
conn.fd.shutdown(socket.SHUT_RDWR)
conn.running = False
conn_list = [conn for conn in conn_list if conn.running]
except:
print sys.exc_info()
conn_lock.release()

def main():
server_listen()

if __name__ == "__main__":
main()

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

Video Face Swap

Video Face Swap

완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

FTP 서버 주소를 확인하는 방법 FTP 서버 주소를 확인하는 방법 Jan 29, 2024 pm 03:11 PM

FTP 서버 주소를 확인하는 방법: 1. 브라우저의 주소 표시줄에 FTP 서버의 도메인 이름이나 IP 주소를 입력한 후 Enter 키를 누릅니다. FTP 서버에 성공적으로 연결할 수 있으면 FTP 서버의 주소가 올바른 것입니다. 2. 명령줄 인터페이스에 "ftp" 명령을 입력한 다음 "ftp"의 도메인 이름이나 IP 주소를 입력합니다. 웹사이트. 연결이 성공하면 FTP 서버의 주소가 올바른 것입니다. 3. IP 설정 페이지에서 장치의 FTP 서버 주소 등을 볼 수 있습니다.

PHP를 통해 FTP 서버에서 파일 액세스 및 수정을 모니터링하는 방법 PHP를 통해 FTP 서버에서 파일 액세스 및 수정을 모니터링하는 방법 Jul 28, 2023 pm 08:01 PM

PHP를 통해 FTP 서버에서 파일 액세스 및 수정을 모니터링하는 방법 인터넷의 급속한 발전과 함께 일반적인 파일 전송 도구인 FTP(파일 전송 프로토콜)는 로컬에서 서버로 파일을 업로드하거나 서버에서 서버로 다운로드하는 데 자주 사용됩니다. 현지의. 실제 응용 프로그램에서는 특히 일부 민감한 파일의 경우 FTP 서버의 파일 액세스 및 수정 사항을 모니터링하는 것이 매우 중요합니다. 이 기사에서는 PHP를 사용하여 FTP 서버에서 파일에 대한 액세스 및 수정 모니터링을 구현하는 코드를 작성하는 방법을 소개합니다. 먼저, 서비스가 제대로 작동하는지 확인해야 합니다.

PHP를 통해 FTP 서버에서 파일을 압축 및 압축 해제하는 방법 PHP를 통해 FTP 서버에서 파일을 압축 및 압축 해제하는 방법 Jul 30, 2023 pm 03:15 PM

PHP를 통해 FTP 서버에서 파일을 압축 및 압축 해제하는 방법 소개: 웹사이트를 개발하고 관리하는 과정에서 파일 압축 및 압축 해제 작업을 처리해야 하는 경우가 종종 있습니다. 그리고 웹사이트의 파일 저장소가 FTP 서버를 사용하는 경우, 서버에서 PHP를 통해 어떻게 파일을 압축 및 압축 해제할지가 중요한 문제가 됩니다. 이 기사에서는 PHP를 통해 FTP 서버에서 파일을 압축 및 압축 해제하는 방법을 소개하고 참조할 수 있는 관련 코드 예제를 제공합니다. 파일 압축을 수행하기 위해 FTP 서버에 연결하고

FTP 서버를 설정하는 방법 FTP 서버를 설정하는 방법 Dec 12, 2023 am 10:37 AM

FTP 서버는 적절한 FTP 서버 소프트웨어 선택, FTP 서버 소프트웨어 설치, FTP 서버 소프트웨어 구성, FTP 서버 소프트웨어 시작 및 FTP 서버 작동 테스트를 통해 구축할 수 있습니다. 자세한 소개: 1. vsftpd, FileZilla Server, ProFTPD 등을 포함한 적절한 FTP 서버 소프트웨어를 선택합니다. 2. FTP 서버 소프트웨어 등을 설치합니다.

FTP 서버를 사용하는 방법 FTP 서버를 사용하는 방법 Oct 13, 2023 pm 02:25 PM

FTP 서버 사용: 1. FileZilla Server, vsftpd, ProFTPD 등과 같은 FTP 서버 소프트웨어를 설치합니다. 2. 설치가 완료된 후 FTP 서버의 수신 포트를 설정하고 해당 서버의 액세스 권한을 설정해야 합니다. 액세스가 허용되는 사용자와 파일 업로드 및 다운로드가 허용되는 FTP 서버 등 3. FTP 사용자 생성 FTP 서버에서는 여러 사용자를 생성하고 각 사용자에게 서로 다른 권한을 할당할 수 있습니다. 사용자의 로그인 권한을 확인하고 해당 사용자에게 FTP 액세스 권한이 있는지 확인하십시오.

Linux 운영 체제에서 FTP 서버를 설정하려면 아래 단계를 따르십시오. Linux 운영 체제에서 FTP 서버를 설정하려면 아래 단계를 따르십시오. Jan 26, 2024 pm 10:33 PM

Linux에서 FTP 서버 linux 명령을 작성하려면 다음 단계를 따라야 합니다. 1. FTP 서버 소프트웨어 설치: 다음 명령을 사용하여 Linux 시스템에 FTP 서버 소프트웨어를 설치할 수 있습니다: ``sudoapt-getinstallvsftpd`` 2. 구성 FTP 서버: FTP 서버 구성 옵션을 설정하려면 FTP 서버 구성 파일을 편집해야 합니다. 기본적으로 Linux는 ftp 서버를 설치하며 vsftpd.conf 파일은 /etc/vsftpd/ 디렉터리에 있습니다. 이 파일에서 ``sudonano/etc/vsftpd.conf``` 명령을 사용하여 구성 파일을 열 수 있습니다.

PHP를 사용하여 FTP 서버에서 파일 변경 사항을 모니터링하는 방법 PHP를 사용하여 FTP 서버에서 파일 변경 사항을 모니터링하는 방법 Aug 01, 2023 pm 09:46 PM

PHP를 사용하여 FTP 서버에서 파일 변경 사항을 모니터링하는 방법 소개: FTP(파일 전송 프로토콜)는 파일 업로드, 다운로드 및 관리에 널리 사용되는 일반적으로 사용되는 파일 전송 프로토콜입니다. FTP 서버에서 파일 변경 사항을 모니터링해야 할 때 PHP를 사용하여 이 기능을 구현할 수 있습니다. 이 기사에서는 PHP를 사용하여 FTP 서버의 파일 변경 사항을 모니터링하는 방법을 소개하고 독자가 참조할 수 있는 코드 예제를 제공합니다. 1부: FTP 서버에 연결 먼저 PHP의 FTP 기능을 사용하여 FTP 서버와의 연결을 설정해야 합니다.

Linux 서버를 보호하는 방법 알아보기 Linux 서버를 보호하는 방법 알아보기 Jan 01, 2024 pm 03:56 PM

어떤 Linux 배포판을 사용하든 iptables 기반 방화벽으로 보호해야 합니다. 아하! 첫 번째 Linux 서버를 설정했으며 사용할 준비가 되었습니다! 그렇습니까? 잠깐만요. 기본적으로 Linux 시스템은 공격자로부터 안전하지 않습니다. 물론 WindowsXP보다 훨씬 안전하지만, 큰 의미는 없습니다. Linux 시스템을 진정으로 안전하게 보호하려면 Linode의 서버 보안 가이드를 따라야 합니다. 일반적으로 먼저 필요하지 않은 서비스를 꺼야 합니다. 물론 이렇게 하려면 먼저 어떤 네트워크 서비스를 사용하고 있는지 알아야 합니다. 쉘 명령을 사용하여 어떤 서비스를 찾을 수 있습니다

See all articles