首頁 後端開發 Python教學 使用python3實作ftp服務功能實例(客戶端)

使用python3實作ftp服務功能實例(客戶端)

Mar 26, 2017 am 09:54 AM

這篇文章主要為大家詳細介紹了python3實現ftp服務功能,客戶端的相應程式碼,具有一定的參考價值,有興趣的小夥伴們可以參考一下

客戶端main程式碼:


#Author by Andy
#_*_ coding:utf-8 _*_
'''
This program is used to create a ftp client

'''
import socket,os,json,time,hashlib,sys
class Ftp_client(object):
 def init(self):
  self.client = socket.socket()
 def help(self):
  msg = '''useage:
  ls
  pwd
  cd dir(example: / .. . /var)
  put filename
  rm filename
  get filename
  mkdir directory name
  '''
  print(msg)
 def connect(self,addr,port):
  self.client.connect((addr,port))
 def auth(self):
  m = hashlib.md5()
  username = input("请输入用户名:").strip()

  m.update(input("请输入密码:").strip().encode())
  password = m.hexdigest()
  user_info = {
   'action':'auth',
   'username':username,
   'password':password}
  self.client.send(json.dumps(user_info).encode('utf-8'))
  server_response = self.client.recv(1024).decode()
  # print(server_response)
  return server_response
 def interactive(self):
  while True:
   msg = input(">>>:").strip()
   if not msg:
    print("不能发送空内容!")
    continue
   cmd = msg.split()[0]
   if hasattr(self,cmd):
    func = getattr(self,cmd)
    func(msg)
   else:
    self.help()
    continue
 def put(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   if os.path.isfile(filename):
    filesize = os.stat(filename).st_size
    file_info = {
     "action":"put",
     "filename":filename,
     "size":filesize,
     "overriding":'True'
    }
    self.client.send( json.dumps(file_info).encode('utf-8') )
    #防止粘包,等待服务器确认。
    request_code = {
     '200': 'Ready to recceive data!',
     '210': 'Not ready to received data!'
    }
    server_response = self.client.recv(1024).decode()
    if server_response == '200':
     f = open(filename,"rb")
     send_size = 0
     start_time = time.time()
     for line in f:
      self.client.send(line)
      send_size += len(line)
      send_percentage = int((send_size / filesize) * 100)
      while True:
       progress = ('\r已上传%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
        'utf-8')
       os.write(1, progress)
       sys.stdout.flush()
       time.sleep(0.0001)
       break
     else:
      end_time = time.time()
      time_use = int(end_time - start_time)
      print("\nFile %s has been sent successfully!" % filename)
      print('\n平均下载速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
      f.close()
    else:
     print("Sever isn't ready to receive data!")
     time.sleep(10)
     start_time = time.time()
     f = open(filename, "rb")
     send_size = 0
     for line in f:
       self.client.send(line)
       send_size += len(line)
       # print(send_size)
       while True:
        send_percentage = int((send_size / filesize) * 100)
        progress = ('\r已上传%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
         'utf-8')
        os.write(1, progress)
        sys.stdout.flush()
        # time.sleep(0.0001)
        break
     else:
      end_time = time.time()
      time_use = int(end_time - start_time)
      print("File %s has been sent successfully!" % filename)
      print('\n平均下载速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
      f.close()
   else:
    print("File %s is not exit!" %filename)
  else:
   self.help()
 def ls(self,*args):
  cmd_split = args[0].split()
  # print(cmd_split)
  if len(cmd_split) > 1:
   path = cmd_split[1]
  elif len(cmd_split) == 1:
   path = '.'
  request_info = {
   'action': 'ls',
   'path': path
  }
  self.client.send(json.dumps(request_info).encode('utf-8'))
  sever_response = self.client.recv(1024).decode()
  print(sever_response)
 def pwd(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) == 1:
   request_info = {
    'action': 'pwd',
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(1024).decode()
   print(server_response)
  else:
   self.help()
 def get(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   file_info = {
     "action": "get",
     "filename": filename,
     "overriding": 'True'
    }
   self.client.send(json.dumps(file_info).encode('utf-8'))
   server_response = self.client.recv(1024).decode() #服务器反馈文件是否存在
   self.client.send('0'.encode('utf-8'))
   if server_response == '0':
    file_size = int(self.client.recv(1024).decode())
    # print(file_size)
    self.client.send('0'.encode('utf-8')) #确认开始传输数据
    if os.path.isfile(filename):
     filename = filename+'.new'
    f = open(filename,'wb')
    receive_size = 0
    m = hashlib.md5()
    start_time = time.time()
    while receive_size < file_size:
     if file_size - receive_size > 1024: # 还需接收不止1次
      size = 1024
     else:
      size = file_size - receive_size
     data = self.client.recv(size)
     m.update(data)
     receive_size += len(data)
     data_percent=int((receive_size / file_size) * 100)
     f.write(data)
     progress = (&#39;\r已下载%sMB(%s%%)&#39; %(round(receive_size/102400,2),data_percent)).encode(&#39;utf-8&#39;)
     os.write(1,progress)
     sys.stdout.flush()
     time.sleep(0.0001)

    else:
     end_time = time.time()
     time_use = int(end_time - start_time)
     print(&#39;\n平均下载速度%s MB/s&#39;%(round(round(receive_size/102400,2)/time_use,2)))
     Md5_server = self.client.recv(1024).decode()
     Md5_client = m.hexdigest()
     print(&#39;文件校验中,请稍候...&#39;)
     time.sleep(0.3)
     if Md5_server == Md5_client:
      print(&#39;文件正常。&#39;)
     else:
      print(&#39;文件与服务器MD5值不符,请确认!&#39;)
   else:
    print(&#39;File not found!&#39;)
    client.interactive()
  else:
   self.help()
 def rm(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   request_info = {
    &#39;action&#39;:&#39;rm&#39;,
    &#39;filename&#39;: filename,
    &#39;prompt&#39;:&#39;Y&#39;
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(10240).decode()
   request_code = {
    &#39;0&#39;:&#39;confirm to deleted&#39;,
    &#39;1&#39;:&#39;cancel to deleted&#39;
   }

   if server_response == &#39;0&#39;:
    confirm = input("请确认是否真的删除该文件:")
    if confirm == &#39;Y&#39; or confirm == &#39;y&#39;:
     self.client.send(&#39;0&#39;.encode("utf-8"))
     print(self.client.recv(1024).decode())
    else:
     self.client.send(&#39;1&#39;.encode("utf-8"))
     print(self.client.recv(1024).decode())
   else:
    print(&#39;File not found!&#39;)
    client.interactive()


  else:
   self.help()
 def cd(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   path = cmd_split[1]
  elif len(cmd_split) == 1:
   path = &#39;.&#39;
  request_info = {
   &#39;action&#39;:&#39;cd&#39;,
   &#39;path&#39;:path
  }
  self.client.send(json.dumps(request_info).encode("utf-8"))
  server_response = self.client.recv(10240).decode()
  print(server_response)
 def mkdir(self,*args):
  request_code = {
   &#39;0&#39;: &#39;Directory has been made!&#39;,
   &#39;1&#39;: &#39;Directory is aleady exist!&#39;
  }
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   dir_name = cmd_split[1]
   request_info = {
    &#39;action&#39;:&#39;mkdir&#39;,
    &#39;dir_name&#39;: dir_name
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(1024).decode()
   if server_response == &#39;0&#39;:
    print(&#39;Directory has been made!&#39;)
   else:
    print(&#39;Directory is aleady exist!&#39;)
  else:
   self.help()
 # def touch(self,*args):
def run():
 client = Ftp_client()
 # client.connect(&#39;10.1.2.3&#39;,6969)
 Addr = input("请输入服务器IP:").strip()
 Port = int(input("请输入端口号:").strip())
 client.connect(Addr,Port)
 while True:
  if client.auth() == &#39;0&#39;:
   print("Welcome.....")
   client.interactive()
   break
  else:
   print("用户名或密码错误!")
   continue
登入後複製

目錄結構

使用python3實作ftp服務功能實例(客戶端)

#

以上是使用python3實作ftp服務功能實例(客戶端)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何解決Linux終端中查看Python版本時遇到的權限問題? 如何解決Linux終端中查看Python版本時遇到的權限問題? Apr 01, 2025 pm 05:09 PM

Linux終端中查看Python版本時遇到權限問題的解決方法當你在Linux終端中嘗試查看Python的版本時,輸入python...

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? 如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? 在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas庫時,如何在兩個結構不同的DataFrame之間進行整列複製是一個常見的問題。假設我們有兩個Dat...

Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持續監聽HTTP請求的? Uvicorn是一個基於ASGI的輕量級Web服務器,其核心功能之一便是監聽HTTP請求並進�...

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? 如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

在Linux終端中使用python --version命令時如何解決權限問題? 在Linux終端中使用python --version命令時如何解決權限問題? Apr 02, 2025 am 06:36 AM

Linux終端中使用python...

如何繞過Investing.com的反爬蟲機制獲取新聞數據? 如何繞過Investing.com的反爬蟲機制獲取新聞數據? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬蟲策略許多人嘗試爬取Investing.com(https://cn.investing.com/news/latest-news)的新聞數據時,常常�...

See all articles