无法找到函数‘mse’
13
0
我正在制作一个 steam 市场分析器,试图作为一个业余项目来寻找利润。但是,我遇到了一个无法修复的问题。回溯(最近一次调用):文件 \'c:\U...
我正在制作一个 steam 市场分析器,试图作为一个业余项目来寻找利润。但是,我遇到了这个无法解决的问题。
Traceback (most recent call last):
File "c:\Users\richa\Desktop\config\steam_market_analyzer.py", line 275, in <module>
request_count = search_market(max_price, min_price, max_count, webhook_url, threshold)
File "c:\Users\richa\Desktop\config\steam_market_analyzer.py", line 242, in search_market
model, scaler_X, scaler_y = build_and_train_model(X, y)
File "c:\Users\richa\Desktop\config\steam_market_analyzer.py", line 183, in build_and_train_model
model = load_or_initialize_model((time_steps, features))
File "c:\Users\richa\Desktop\config\steam_market_analyzer.py", line 155, in load_or_initialize_model
model = load_model(MODEL_PATH, custom_objects={'CustomMeanSquaredError': CustomMeanSquaredError})
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\saving\saving_api.py", line 194, in load_model
return legacy_h5_format.load_model_from_hdf5(
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\legacy\saving\legacy_h5_format.py", line 155, in load_model_from_hdf5
**saving_utils.compile_args_from_training_config(
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\legacy\saving\saving_utils.py", line 143, in compile_args_from_training_config
loss = _deserialize_nested_config(losses.deserialize, loss_config)
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\legacy\saving\saving_utils.py", line 202, in _deserialize_nested_config
return deserialize_fn(config)
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\losses\__init__.py", line 149, in deserialize
return serialization_lib.deserialize_keras_object(
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\saving\serialization_lib.py", line 575, in deserialize_keras_object
return deserialize_keras_object(
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\saving\serialization_lib.py", line 678, in deserialize_keras_object
return _retrieve_class_or_fn(
File "C:\Users\richa\AppData\Local\Programs\Python\Python310\lib\site-packages\keras\src\saving\serialization_lib.py", line 812, in _retrieve_class_or_fn
raise TypeError(
TypeError: Could not locate function 'mse'. Make sure custom classes are decorated with `@keras.saving.register_keras_serializable()`. Full object config: {'module': 'keras.metrics', 'class_name': 'function', 'config': 'mse', 'registered_name': 'mse'}
我当前的代码是这样的:
import requests
import time
import datetime
from bs4 import BeautifulSoup
import json
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
import keras
import numpy as np
from scipy.stats import linregress
import tensorflow as tf
@tf.keras.utils.register_keras_serializable(package='Custom')
class CustomMeanSquaredError(tf.keras.losses.Loss):
def __init__(self, name='custom_mean_squared_error'):
super().__init__(name=name)
def call(self, y_true, y_pred):
return tf.reduce_mean(tf.square(y_true - y_pred))
eligible_items = []
MODEL_PATH = 'market_predictor_model.h5'
SCALER_PATH = 'scaler_X.npy'
def send_discord_alert(webhook_url, message):
message = {
"content": message
}
requests.post(webhook_url, json=message)
def save_model(model):
model.save(MODEL_PATH, save_format='h5')
def rate_limited_request(url, request_count, request_time_list, max_requests=10, cooldown=60):
current_time = time.time()
request_time_list.append(current_time)
request_time_list = [t for t in request_time_list if current_time - t < cooldown]
if len(request_time_list) >= max_requests:
sleep_time = cooldown - (current_time - request_time_list[0])
print(f"Rate limit reached at {request_count} requests, sleeping for {sleep_time:.2f} seconds.")
time.sleep(sleep_time)
request_time_list = []
response = requests.get(url)
request_count += 1
if response.status_code == 429:
print(f"Rate limit hit at {request_count} requests, sleeping for 5 minutes.")
time.sleep(300)
return rate_limited_request(url, request_count, request_time_list)
return response, request_count, request_time_list
def get_price_history(listing_url, request_count, request_time_list):
response, request_count, request_time_list = rate_limited_request(listing_url, request_count, request_time_list)
soup = BeautifulSoup(response.text, 'html.parser')
script = soup.find('script', string=lambda x: x and 'line1' in x)
if not script:
print(f"Price history data not found on {listing_url}")
return [], request_count, request_time_list
try:
json_data = script.string.split('var line1=')[1].split(';')[0]
price_history = json.loads(json_data)
price_data = []
for entry in price_history:
try:
date_str = entry[0]
price_str = entry[1]
timestamp = datetime.datetime.strptime(date_str, "%b %d %Y %H: +0")
if isinstance(price_str, str):
price = float(price_str.replace('$', '').replace(',', ''))
else:
price = float(price_str)
price_data.append((timestamp, price))
except ValueError as e:
print(f"Skipping invalid entry in price history: {entry} - Error: {e}")
continue
return price_data, request_count, request_time_list
except Exception as e:
print(f"Error parsing price history data from {listing_url} - Error: {e}")
return [], request_count, request_time_list
def calculate_moving_average(prices, window_size=7):
moving_averages = []
for i in range(len(prices) - window_size + 1):
window = prices[i:i + window_size]
moving_averages.append(sum(window) / window_size)
return moving_averages
def check_for_profitability(price_history, current_price, threshold=0.85):
steam_fee_percentage = 0.15
prices = [price for _, price in price_history]
moving_averages = calculate_moving_average(prices, window_size=7)
if not moving_averages:
return False
avg_price = moving_averages[-1]
effective_price = current_price * (1 - steam_fee_percentage)
return effective_price < avg_price * threshold
def analyze_trend(price_history):
if len(price_history) < 2:
return 0, 0
dates = [date.timestamp() for date, _ in price_history]
prices = [price for _, price in price_history]
slope, intercept, r_value, p_value, std_err = linregress(dates, prices)
return slope, r_value ** 2
def prepare_data_for_model(price_history, time_steps=10):
prices = np.array([price for _, price in price_history])
if len(prices) <= time_steps:
return np.array([]), np.array([])
X, y = [], []
for i in range(len(prices) - time_steps):
X.append(prices[i:i + time_steps])
y.append(prices[i + time_steps])
X = np.array(X)
y = np.array(y)
X = X.reshape((X.shape[0], X.shape[1], 1))
return X, y
def load_or_initialize_model(input_shape):
try:
model = load_model(MODEL_PATH, custom_objects={'CustomMeanSquaredError': CustomMeanSquaredError})
print("Model loaded successfully.")
model.compile(optimizer=Adam(), loss=CustomMeanSquaredError())
except FileNotFoundError:
print("No existing model found. Initializing a new model.")
model = Sequential()
model.add(LSTM(50, input_shape=input_shape))
model.add(Dense(1))
model.compile(optimizer=Adam(), loss=CustomMeanSquaredError())
return model
def build_and_train_model(X, y, n_epochs=10):
samples, time_steps, features = X.shape
scaler_X = MinMaxScaler(feature_range=(0, 1))
X = X.reshape(-1, features)
X = scaler_X.fit_transform(X)
X = X.reshape(samples, time_steps, features)
scaler_y = MinMaxScaler(feature_range=(0, 1))
y = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()
model = load_or_initialize_model((time_steps, features))
model.fit(X, y, epochs=n_epochs, verbose=0)
return model, scaler_X, scaler_y
def predict_future_price(model, scaler_X, scaler_y, last_prices, time_steps=10):
last_prices = np.array(last_prices).reshape(1, time_steps, 1)
last_prices = scaler_X.transform(last_prices.flatten().reshape(-1, 1)).reshape(1, time_steps, 1)
predicted_price = model.predict(last_prices)[0][0]
return scaler_y.inverse_transform(np.array([predicted_price]).reshape(-1, 1))[0][0]
def search_market(max_price, min_price, max_count, webhook_url, threshold=0.85):
start = 0
count = 100
request_count = 0
request_time_list = []
while start < max_count:
url = f"https://steamcommunity.com/market/search/render?norender=1&start={start}&count={count}&appid=730"
response, request_count, request_time_list = rate_limited_request(url, request_count, request_time_list)
if response.status_code == 200:
data = response.json()
items = data.get("results", [])
if not items:
break
for item in items:
name = item.get("name", "")
if any(keyword in name.lower() for keyword in ["sticker", "graffiti", "spray", "case", "key", "master agent"]):
continue
price_str = item.get("sell_price_text", "0").replace("$", "").replace(",", "")
try:
price = float(price_str)
except ValueError:
print(f"Invalid price format: {price_str}")
continue
if min_price <= price <= max_price:
print(f"Item: {name}, Price: {price}")
item_url = f"https://steamcommunity.com/market/listings/730/{name.replace(' ', '%20')}"
price_history, request_count, request_time_list = get_price_history(item_url, request_count, request_time_list)
if not price_history:
continue
if check_for_profitability(price_history, price, threshold):
X, y = prepare_data_for_model(price_history)
print(f"X shape: {X.shape}, y shape: {y.shape}")
if X.size > 0 and y.size > 0:
model, scaler_X, scaler_y = build_and_train_model(X, y)
last_prices = [price for _, price in price_history[-10:]]
predicted_price = predict_future_price(model, scaler_X, scaler_y, last_prices)
if predicted_price > price:
slope, confidence = analyze_trend(price_history)
profit_margin = predicted_price - price
eligible_items.append(name)
send_discord_alert(
webhook_url,
f"Profitable item found: {name} at ${price:.2f}\n"
f"Predicted future price: ${predicted_price:.2f}\n"
f"Profit margin: ${profit_margin:.2f}\n"
f"Confidence: {confidence:.2%}\n"
f"Trend slope: {slope:.2f}"
)
else:
print(f"Failed to fetch data, status code: {response.status_code}")
break
start += count
return request_count
webhook_url = "URL"
max_price = 200.00
min_price = 0.10
max_count = 10000
threshold = 0.85
request_count = search_market(max_price, min_price, max_count, webhook_url, threshold)
send_discord_alert(webhook_url, f"Search complete. Number of profitable items found: {len(eligible_items)}")
我尝试制作自定义 MSE,但没有结果。不知道我是不是太笨了。过了一会儿,我甚至求助于 ChatGPT,但它只是重复说了同样的话,一点帮助都没有。
收藏的用户(0)
X
正在加载信息~