Developing trading bots is fun and you can get hooked pretty easily — at least I did. Especially if you are interested in the financial markets, stocks, crypto, trading and all finance related stuff like that.. 😅
I’ve been developing trading bots for some time and form my experience and what I’ve seen, I can safely say that there are people making good money with these bots. 💰
To be honest, I am not even sure if one needs that much knowledge to start running bots and making money. What one needs is
Enough rambling, lets get into the strategy for which we’ll be developing a trading bot 👇
price
, stop_loss
and take_profit
For the 1st point, we will use the Exponential Moving Average (EMA) & for the 2nd point we’ll use the Bollinger Bands 👇
To detect a trend, we will get a subset of candles, which are moving below or above the moving average.
Uptrend
📈 = LONG positionsDowntrend
📉 = SHORT positionsThe amount of candles we will be looking at for these trends depends on the backtests we later do. Through them we can find the best amount of candles that provide the most stable results!
So, what’s left is to define our ENTRY/EXIT positions. For that we will be using the Upper & Lower Bollinger Bands 👇
SHORT
position!After the position is set, we still need to setSTOP_LOSS
/ TAKE_PROFIT
prices. To do so, we use the following formulas 👇
STOP_LOSS
→ N (Coefficient Param) * ATR (Average Through Range)TAKE_PROFIT
→ TPSL Ratio (Take-Profit:Stop-Loss Ratio - Coefficient) * STOP_LOSS
Both coefficients are what we’ll be optimizing in order to get the most optimal strategy and best return! 💸 We will be running a optimization algorithm to find out which are the best numbers for these coefs. 🤖
We’ve defined the strategy. Now we need to code it up. Backstest & Optimize It (Again via code)! 👨💻
import yfinance as yf
from datetime import datetime
symbol = "EURUSD=X"
start_date = "2023-01-01"
end_date = datetime.now().strftime('%Y-%m-%d') # Today's date in 'YYYY-MM-DD' format
interval = "1h"
data = yf.download(symbol, start=start_date, end=end_date, interval=interval)
data.to_csv('EURUSD_1H_01.01.2023-Today.csv')
import pandas as pd
import pandas_ta as ta
# Load data from a CSV file
df = pd.read_csv("EURUSD_1H_01.01.2023-Today.csv")
df['Datetime'] = pd.to_datetime(df['Datetime'], utc=True)
df.set_index('Datetime', inplace=True)
# Remove any rows where the High and Low are the same (no price change)
df = df[df['High'] != df['Low']]
df["EMA"]=ta.ema(df.Close, length=30)
df['RSI']=ta.rsi(df.Close, length=10)
my_bbands = ta.bbands(df.Close, length=15, std=1.5)
df['ATR']=ta.atr(df.High, df.Low, df.Close, length=7)
df=df.join(my_bbands)
df # Check out the output 👇
Congrats! 🎉We’ve successfully downloaded and transformed 8141 rows of data into a pandas
DataFrame!
def ema_signal(df, current_candle, backcandles):
df_slice = df.reset_index().copy()
df_slice = df_slice.loc[current_candle-backcandles:current_candle, ["Open", "Close", "EMA"]]
dnt = 0 if (df_slice[["Open", "Close"]].max(axis=1) >= df_slice["EMA"]).any() else 1
upt = 0 if (df_slice[["Open", "Close"]].min(axis=1) <= df_slice["EMA"]).any() else 1
if upt==1 and dnt==1:
return 3
elif upt==1:
return 2
elif dnt==1:
return 1
else:
return 0
from tqdm import tqdm
tqdm.pandas()
df.reset_index(inplace=True)
It basically defines the EMA and then whether we have a given amount of consecutive candles, and based on that output 3
(in case we have both trends — shouldn’t be happening ever — data is sus 🕵️), 2
(for an uptrend 📈), 1
(for a downtrend 📉) or 0
in all other scenarios.
from pandas import DataFrame
def total_signal(df: DataFrame, current_candle, backcandles):
if (ema_signal(df, current_candle, backcandles)==2
and df.Close[current_candle]<=df['BBL_15_1.5'][current_candle]
#and df.RSI[current_candle]<60
):
return 2
if (ema_signal(df, current_candle, backcandles)==1
and df.Close[current_candle]>=df['BBU_15_1.5'][current_candle]
#and df.RSI[current_candle]>40
):
return 1
return 0
# To ensure teh DataFrame wasn't modified or being sliced before the progress apply
df = df.copy()
df['TotalSignal'] = df.progress_apply(lambda row: total_signal(df, row.name, 7), axis=1)
current_candle
& back_candles
in order to test each candleema_signal
func we defined earlier to check is we are in an uptrend or downtrendSHORT
Signal), 2 (For LONG
Signal) or 0 (No Signal), depending on the trend!After running this piece, you will get a progress bar like so 👇
Then you can add this line & check the generated signals for your first 10 candles 😲
df[df.TotalSignal != 0].head(10)
6th Step: Verify Your Signals Visually By Plotting The Results
import plotly.graph_objects as go
st=100
dfpl = df[st:st+350]
#dfpl.reset_index(inplace=True)
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
open=dfpl['Open'],
high=dfpl['High'],
low=dfpl['Low'],
close=dfpl['Close']),
go.Scatter(x=dfpl.index, y=dfpl['BBL_15_1.5'],
line=dict(color='green', width=1),
name="BBL"),
go.Scatter(x=dfpl.index, y=dfpl['BBU_15_1.5'],
line=dict(color='green', width=1),
name="BBU"),
go.Scatter(x=dfpl.index, y=dfpl['EMA'],
line=dict(color='black', width=1),
name="EMA") ])
fig.show()
And you’ll get this graph, you can play with 📈
Firstly, create function that will invoke the TradeSignal
from the DataFrame we’ve created:
def TOTAL_SIGNAL():
return df.TotalSignal
Then create the Backtest, where the slcoef
& TPSLRatio
👇 are the coefficients we spoke about when defining the strategy above! You can modify them, re-runthe backtest and see which number gives the best results!
from backtesting import Strategy
from backtesting import Backtest
class MyStrat(Strategy):
mysize = 0.99
slcoef = 1.2 #1.3
TPSLRatio = 2 # 1.8
def init(self):
super().init()
self.signal1 = self.I(TOTAL_SIGNAL)
def next(self):
super().next()
slatr = self.slcoef*self.data.ATR[-1]
TPSLRatio = self.TPSLRatio
if len(self.trades)>0:
if self.trades[-1].is_long and self.data.RSI[-1]>=90:
self.trades[-1].close()
elif self.trades[-1].is_short and self.data.RSI[-1]<=10:
self.trades[-1].close()
if self.signal1==2 and len(self.trades)==0:
sl1 = self.data.Close[-1] - slatr
tp1 = self.data.Close[-1] + slatr*TPSLRatio
self.buy(sl=sl1, tp=tp1, size=self.mysize)
elif self.signal1==1 and len(self.trades)==0:
sl1 = self.data.Close[-1] + slatr
tp1 = self.data.Close[-1] - slatr*TPSLRatio
self.sell(sl=sl1, tp=tp1, size=self.mysize)
bt = Backtest(df, MyStrat, cash=250, margin=1/30, commission=0.00)
stats, heatmap = bt.optimize(slcoef=[i/10 for i in range(10, 21)],
TPSLRatio=[i/10 for i in range(10, 21)],
maximize='Return [%]', max_tries=300,
random_state=0,
return_heatmap=True)
# To see the generated statistics
stats
# To check the strategy results
stats["_strategy"]
# This will run your backtest
# & give you a plot, where each trade was placed in time
bt.run()
bt.plot()
You will see this backtest progress bar once you run this part of the script:
And after that’s finished, you will get a view of your Backtest stats
🚀
Yo can play around, scroll though the different market times, view at what time a trade was placed and when it exited. It’s honestly amazing to have such tools at our disposal!
As you can see on the bottom-right corner, the final performance of this strategy is 59% with max drawdown of 50% and peak of 105%!
Having such big of a drawdown isn’t good to be honest, but for the sake of simplicity of this example it does the job. Also we’ve ran the backtests on a limited set of data, if we were to run on data from 20–30 years back, the results will be different - better IMO! 🚀
Back to those coefficients we spoke about.. We can plot a heatmap, on which we can spot the most heated places in order to see which combination of slcoef
& TPSLRatio
will provide the most optimal results.
And based on that input, optimize our strategy! 🙏
import seaborn as sns
import matplotlib.pyplot as plt
# Convert multiindex series to dataframe
heatmap_df = heatmap.unstack()
plt.figure(figsize=(10, 8))
sns.heatmap(heatmap_df, annot=True, cmap='viridis', fmt='.0f')
plt.show()