首页 > 科技周边 > 人工智能 > 使用MLOPS的比特币价格预测

使用MLOPS的比特币价格预测

William Shakespeare
发布: 2025-03-09 11:53:09
原创
423 人浏览过

>对比特币或其价格波动了解不多,而是想做出投资决定来获利吗?该机器学习模型有您的支持。它可以比占星家更好地预测价格。在本文中,我们将使用ZenML和MLFlow构建一个用于预测和预测比特币价格的ML模型。因此,让我们开始我们的旅程,了解任何人如何使用ML和MLOPS工具来预测未来。

学习目标

    学会有效地使用API​​获取实时数据。
  • >
  • 了解Zenml是什么,为什么我们使用MLFLOW以及如何将其与Zenml集成。
  • >探索从想法到生产的机器学习模型的部署过程。
  • >
  • >发现如何为交互式机器学习模型预测创建用户友好的简化应用程序。
  • >

>本文是> > data Science Blogathon的一部分。 内容表>

问题语句
  • >项目实现
  • 步骤1:访问API
    • 步骤2:使用mongodb
  • 5:数据清洁
  • 步骤6:特征工程
  • 步骤7:数据拆分
  • 步骤8:型号培训
  • 步骤9:型号评估
  • 步骤10:模型部署 问题
  • 问题语句
比特币价格高度波动,而做出预测几乎是不可能的。在我们的项目中,我们正在使用Mlops的最佳实践构建LSTM模型来预测比特币和趋势。
  • > 在实施项目之前,让我们看一下项目体系结构。
  • >

    >项目实施

    让我们从访问API开始。

    我们为什么要这样做?您可以从不同的数据集中获取历史比特币价格数据,但是使用API​​,我们可以访问Live Market Data。 使用MLOPS的比特币价格预测>步骤1:访问API

    >

    >注册API访问:

    >一旦您在theccdata api页面上注册后。您可以从此pagehttps://developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days

    获得免费的API密钥

    提取比特币价格数据:
    • 在以下代码的情况下,您可以从CCDATA API获取比特币价格数据,并将其转换为PANDAS DataFrame。另外,将API密钥保留在.env文件中。

    • 步骤2:使用mongodb
    • 连接到数据库 MongoDB是一个NOSQL数据库,以其适应性,可扩展性和以JSON式格式存储非结构化数据的能力。
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      此代码连接到MongoDB,通过API检索比特币价格数据,并在最新记录日期后使用所有新条目更新数据库。

      介绍Zenml

      > Zenmlis是一个针对机器学习操作量身定制的开源平台,支持了灵活和生产就绪的管道的创建。此外,Zenml与多个机器学习工具集成了LokeMlFlow,Bentoml等,以创建无缝的ML Pipelines。

      ⚠️如果您是Windows用户,请尝试在系统上安装WSL。 Zenml不支持Windows。

      在此项目中,我们将实施使用Zenml的传统管道,并将MLFlow与Zenml集成进行实验跟踪。>

      >前提条件和基本zenml命令

      >

      python 3.12或更高:

      您可以从这里获得:https://www.python.org/downloads/
      • 激活您的虚拟环境:
      • >
      > zenml命令:
      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      1. >所有核心Zenml命令及其功能如下:
      2. 步骤3:MLFLOW与Zenml
      的集成

      我们正在使用MLFlow进行实验跟踪,以跟踪我们的模型,工件,指标和超参数值。我们正在注册MLFLOW以进行实验跟踪和模型部署者:

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制
      > zenml堆栈列表

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制
      项目结构

      >在这里,您可以看到项目的布局。现在让我们详细讨论它。

      > 使用MLOPS的比特币价格预测

      步骤4:数据摄入

      我们首先将数据从API摄取到MongoDB,然后将其转换为PANDAS DATAFRAME。

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制

      我们将作为装饰器添加到

      > ingest_data()

      函数中,以将其声明为我们训练管道的一步。以同样的方式,我们将在项目体系结构中为每个步骤编写代码并创建管道。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >要查看我如何使用

      @Step 装饰器,请查看下面的github链接(步骤文件夹)以浏览管道其他步骤的代码,即数据清洁,功能工程,数据拆分,模型培训和模型评估。>>>>>>>>。 步骤5:数据清洁 在此步骤中,我们将创建清洁摄入数据的不同策略。我们将在数据中删除不需要的列和缺失值。>

      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      >步骤6:功能工程

      >此步骤从较早的data_cleaning步骤中获取已清洁的数据。我们正在创建新功能,例如简单的移动平均值(SMA),指数移动平均值(EMA)以及滞后和滚动统计数据,以捕获趋势,减少噪声并从时间序列数据中做出更可靠的预测。此外,我们使用MinMax缩放缩放特征和目标变量。>

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      步骤7:数据拆分

      >现在,我们将处理的数据分为培训和测试数据集的比例为80:20。

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制
      步骤8:模型培训

      在此步骤中,我们以早期停止训练Thelstm模型以防止过度拟合,并使用MLFlow的自动日志记录来跟踪我们的模型和实验,并将受过训练的模型保存为 >步骤9:模型评估
      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制
      这是一个回归问题,我们正在使用评估指标,例如均方误差(MSE),均方根误差(MSE),均值绝对误差(MAE)和R-squared。

      现在,我们将在管道中组织上述所有步骤。让我们创建一个新的文件triench_pipeline.py。

      >

      >在这里,
      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制
      @pipeline

      decorator用于将functionml_pipeline()定义为zenml中的管道。

      要查看训练管道的仪表板,只需运行run_pipeline.py脚本即可。让我们创建一个run_pipeline.py文件。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >

      >现在我们已经完成了创建管道。在下面运行命令以查看管道仪表板。

      在运行上述命令后,它将返回跟踪仪表板URL,看起来像这样。

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  
      登录后复制
      登录后复制

      class DataPreprocessor:
          def __init__(self, data: pd.DataFrame):
              
              self.data = data
              logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)
      
          def clean_data(self) -> pd.DataFrame:
              """
              Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
              and returning the cleaned DataFrame.
      
              Returns:
                  pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
              """
              logging.info("Starting data cleaning process.")
      
              # Drop unnecessary columns, including '_id' if it exists
              columns_to_drop = [
                  'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
                  'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
                  'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
                  'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
                  'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
                  'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
                  'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
              ]
              logging.info("Dropping columns: %s")
              self.data = self.drop_columns(self.data, columns_to_drop)
      
              # Drop columns where the number of missing values is greater than 0
              logging.info("Dropping columns with missing values.")
              self.data = self.drop_columns_with_missing_values(self.data)
      
              logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
              return self.data
      
          def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
              """
              Drops specified columns from the DataFrame.
      
              Returns:
                  pd.DataFrame: The DataFrame with the specified columns removed.
              """
              logging.info("Dropping columns: %s", columns)
              return data.drop(columns=columns, errors='ignore')
      
          def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
              """
              Drops columns with any missing values from the DataFrame.
      
              Parameters:
                  data: pd.DataFrame
                      The DataFrame from which columns with missing values will be removed.
              
              Returns:
                  pd.DataFrame: The DataFrame with columns containing missing values removed.
              """
              missing_columns = data.columns[data.isnull().sum() > 0]
              if not missing_columns.empty:
                  logging.info("Columns with missing values: %s", missing_columns.tolist())
              else:
                  logging.info("No columns with missing values found.")
              return data.loc[:, data.isnull().sum() == 0]
      登录后复制
      培训管道在仪表板上看起来像这样,给出了以下内容:

      >

      使用MLOPS的比特币价格预测

      使用MLOPS的比特币价格预测

      使用MLOPS的比特币价格预测

      步骤10:模型部署使用MLOPS的比特币价格预测 到目前为止,我们已经构建了模型和管道。现在,让我们将管道推入用户可以做出预测的生产中。

      >

      连续部署管道使用MLOPS的比特币价格预测

      该管道负责连续部署训练有素的模型。它首先从

      > triagn_pipeline.py.py

      文件训练模型,然后使用

      mlflow模型exployer

      来部署训练有素的模型,使用推理管道

      >我们使用推理管道使用已部署的模型对新数据进行预测。让我们看一下我们如何在项目中实施该管道的方式。

      >
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      >让我们查看下面的推理管道中调用的每个功能:>

      dynamic_importer()

      此功能加载新数据,执行数据处理并返回数据。

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      prediction_service_loader()

      >此功能以

      @Step

      装饰。我们加载了基于pipeline_name的部署服务W.R.T和step_name,我们的部署模型可以处理新数据的预测查询。 > line 现有_services = mlflow_model_deployer_component.find_model_server()

      >

      >基于诸如pipeline名称和管道名称的给定参数,搜索可用的部署服务。如果没有可用的服务,则表明部署管道要么没有进行或遇到部署管道问题,因此它会抛出RuntimeError。

      preditionor()
      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制

      该函数通过MLFlowDeploymentservice和新数据采用MLFlow部署模型。进一步处理数据以匹配模型的预期格式以进行实时推断。 为了可视化连续部署和推理管道,我们需要运行run_deployment.py脚本,在此将定义部署和预测配置。 (请在下面给出的github中检查run_deployment.py代码)。>

      现在,让我们运行run_deployment.py文件以查看连续部署管道和推理管道的仪表板。

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制

      连续部署管道 - 输出

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制

      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >推理管道 - 输出

      使用MLOPS的比特币价格预测

      运行run_deployment.py文件后,您可以看到看起来像这样的mlflow仪表板链接。

      现在,您需要在命令行中复制并粘贴上述MLFLOW UI链接并运行它。使用MLOPS的比特币价格预测

      这是MLFlow仪表板,您可以在其中看到评估指标和模型参数:

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  
      登录后复制
      登录后复制

      >步骤11:构建简易应用

      > Sparlit是一个令人惊叹的开源,基于Python的框架,用于创建Interactive UI,我们可以使用Sparlit快速构建Web应用程序,而无需知道后端或前端开发。首先,我们需要在系统上安装精简。

      使用MLOPS的比特币价格预测

      再次,您可以在github上找到“简化应用”的代码。

      使用MLOPS的比特币价格预测

      >这是对项目的GitHub代码和视频说明,以便您更好地理解。

      结论

      在本文中,我们成功地建立了一个端到端的,可提供生产的比特币价格预测MLOPS项目。从通过API获取数据并预处理数据到模型培训,评估和部署,我们的项目突出了MLOP在将开发与生产联系起来的关键作用。我们距离实时预测比特币价格的未来更近了一步。 API提供了对外部数据的平稳访问,例如CCDATA API的比特币价格数据,消除了对预先存在的数据集的需求。

      钥匙要点

      API启用对外部数据的无缝访问,例如来自CCDATA API的比特币价格数据,消除了对预先存在的数据集的需求。
        >
      • > zenml和mlflow是可靠的工具,可促进现实世界应用程序中机器学习模型的开发,跟踪和部署。
      • >我们遵循最佳实践,通过正确执行数据摄入,清洁,功能工程,模型培训和评估。
      • >连续部署和推理管道对于确保模型保持有效且在生产环境中可用。
      • 常见问题
      • > Q1。 Zenml可以免费使用吗?是的,Zenml是一个完全开源的MLOPS框架,它使从本地开发到生产管道的过渡与1行代码一样容易。

      Q2。 mlflow用于什么? MLFlow通过提供用于跟踪实验,版本控制模型和部署的工具来使机器学习开发变得更加容易。

      Q3。如何调试服务器守护程序没有运行错误?这是您将在项目中面临的常见错误。只需运行`Zenml注销–Local`然后`Zenml clean'',然后再运行管道。它将得到解决。

    以上是使用MLOPS的比特币价格预测的详细内容。更多信息请关注PHP中文网其他相关文章!

    本站声明
    本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
    作者最新文章
    热门教程
    更多>
    最新下载
    更多>
    网站特效
    网站源码
    网站素材
    前端模板