Source code for qtpylib.workflow

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# QTPyLib: Quantitative Trading Python Library
# https://github.com/ranaroussi/qtpylib
#
# Copyright 2016-2018 Ran Aroussi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import time
import sys

import numpy as np
import pandas as pd

import pymysql
from pymysql.constants.CLIENT import MULTI_STATEMENTS

from ezibpy import ezIBpy
from ezibpy.utils import contract_expiry_from_symbol

from qtpylib import tools
from qtpylib.blotter import (
    load_blotter_args, get_symbol_id,
    mysql_insert_tick, mysql_insert_bar
)

_IB_HISTORY_DOWNLOADED = False

# =============================================
# check min, python version
if sys.version_info < (3, 4):
    raise SystemError("QTPyLib requires Python version >= 3.4")

# =============================================
tools.createLogger(__name__)  # .setLevel(logging.DEBUG)
# =============================================


def ibCallback(caller, msg, **kwargs):
    global _IB_HISTORY_DOWNLOADED
    if caller == "handleHistoricalData":
        if kwargs["completed"]:
            _IB_HISTORY_DOWNLOADED = True
        # print(kwargs)


[docs]def get_data_ib(instrument, start, resolution="1 min", blotter=None, output_path=None): """ Downloads historical data from Interactive Brokers :Parameters: instrument : mixed IB contract tuple / string (same as that given to strategy) start : str Backtest start date (YYYY-MM-DD [HH:MM:SS[.MS]) :Optional: resolution : str 1/5/15/30 secs, 1/2/3/5/15/30 min (default 1min), 1 hour, 1 day blotter : str Store MySQL server used by this Blotter (default is "auto detect") output_path : str Path to where the resulting CSV should be saved (optional) :Returns: data : pd.DataFrame Pandas DataFrame in a QTPyLib-compatible format and timezone """ global _IB_HISTORY_DOWNLOADED _IB_HISTORY_DOWNLOADED = False # load blotter settings blotter_args = load_blotter_args( blotter, logger=logging.getLogger(__name__)) # create contract string (no need for connection) ibConn = ezIBpy() ibConn.ibCallback = ibCallback if not ibConn.connected: ibConn.connect(clientId=997, port=int(blotter_args['ibport']), host=str(blotter_args['ibserver'])) # generate a valid ib tuple instrument = tools.create_ib_tuple(instrument) contract_string = ibConn.contractString(instrument) contract = ibConn.createContract(instrument) ibConn.requestHistoricalData(contracts=[contract], data="TRADES", resolution=resolution, lookback=tools.ib_duration_str(start), rth=False) while not _IB_HISTORY_DOWNLOADED: time.sleep(1) ibConn.disconnect() data = ibConn.historicalData[contract_string] data['datetime'] = data.index return prepare_data(instrument, data, output_path=output_path)
# ============================================= # data preparation methods # ============================================= _BARS_COLSMAP = { 'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume', 'opt_price': 'opt_price', 'opt_underlying': 'opt_underlying', 'opt_dividend': 'opt_dividend', 'opt_volume': 'opt_volume', 'opt_iv': 'opt_iv', 'opt_oi': 'opt_oi', 'opt_delta': 'opt_delta', 'opt_gamma': 'opt_gamma', 'opt_vega': 'opt_vega', 'opt_theta': 'opt_theta' } _TICKS_COLSMAP = { 'bid': 'bid', 'bidsize': 'bidsize', 'ask': 'ask', 'asksize': 'asksize', 'last': 'last', 'lastsize': 'lastsize', 'opt_price': 'opt_price', 'opt_underlying': 'opt_underlying', 'opt_dividend': 'opt_dividend', 'opt_volume': 'opt_volume', 'opt_iv': 'opt_iv', 'opt_oi': 'opt_oi', 'opt_delta': 'opt_delta', 'opt_gamma': 'opt_gamma', 'opt_vega': 'opt_vega', 'opt_theta': 'opt_theta' } # --------------------------------------------- def validate_columns(df, kind="BAR", raise_errors=True): global _TICKS_COLSMAP, _BARS_COLSMAP # validate columns if "asset_class" not in df.columns: if raise_errors: raise ValueError('Column asset_class not found') return False is_option = "OPT" in list(df['asset_class'].unique()) colsmap = _TICKS_COLSMAP if kind == "TICK" else _BARS_COLSMAP for el in colsmap: col = colsmap[el] if col not in df.columns: if "opt_" in col and is_option: if raise_errors: raise ValueError('Column %s not found' % el) return False elif "opt_" not in col and not is_option: if raise_errors: raise ValueError('Column %s not found' % el) return False return True # ---------------------------------------------
[docs]def prepare_data(instrument, data, output_path=None, index=None, colsmap=None, kind="BAR", resample="1T"): """ Converts given DataFrame to a QTPyLib-compatible format and timezone :Parameters: instrument : mixed IB contract tuple / string (same as that given to strategy) data : pd.DataFrame Pandas DataDrame with that instrument's market data output_path : str Path to where the resulting CSV should be saved (optional) index : pd.Series Pandas Series that will be used for df's index (optioanl) colsmap : dict Dict for mapping df's columns to those used by QTPyLib (default assumes same naming convention as QTPyLib's) kind : str Is this ``BAR`` or ``TICK`` data resample : str Pandas resolution (defaults to 1min/1T) :Returns: data : pd.DataFrame Pandas DataFrame in a QTPyLib-compatible format and timezone """ global _TICKS_COLSMAP, _BARS_COLSMAP # work on copy df = data.copy() # ezibpy's csv? if set(df.columns) == set([ 'datetime', 'C', 'H', 'L', 'O', 'OI', 'V', 'WAP']): df.rename(columns={ 'datetime': 'datetime', 'O': 'open', 'H': 'high', 'L': 'low', 'C': 'close', 'OI': 'volume', }, inplace=True) df.index = pd.to_datetime(df['datetime']) df.index = df.index.tz_localize(tools.get_timezone()).tz_convert("UTC") index = None # lower case columns df.columns = map(str.lower, df.columns) # set index if index is None: index = df.index # set defaults columns if not isinstance(colsmap, dict): colsmap = {} _colsmap = _TICKS_COLSMAP if kind == "TICK" else _BARS_COLSMAP for el in _colsmap: if el not in colsmap: colsmap[el] = _colsmap[el] # generate a valid ib tuple instrument = tools.create_ib_tuple(instrument) # create contract string (no need for connection) ibConn = ezIBpy() contract_string = ibConn.contractString(instrument) asset_class = tools.gen_asset_class(contract_string) symbol_group = tools.gen_symbol_group(contract_string) # add symbol data df.loc[:, 'symbol'] = contract_string df.loc[:, 'symbol_group'] = symbol_group df.loc[:, 'asset_class'] = asset_class # validate columns valid_cols = validate_columns(df, kind) if not valid_cols: raise ValueError('Invalid Column list') # rename columns to map df.rename(columns=colsmap, inplace=True) # force option columns on options if asset_class == "OPT": df = tools.force_options_columns(df) # remove all other columns known_cols = list(colsmap.values()) + \ ['symbol', 'symbol_group', 'asset_class', 'expiry'] for col in df.columns: if col not in known_cols: df.drop(col, axis=1, inplace=True) # set UTC index df.index = pd.to_datetime(index) df = tools.set_timezone(df, "UTC") df.index.rename("datetime", inplace=True) # resample if resample and kind == "BAR": df = tools.resample(df, resolution=resample, tz="UTC") # add expiry df.loc[:, 'expiry'] = np.nan if asset_class in ("FUT", "OPT", "FOP"): df.loc[:, 'expiry'] = contract_expiry_from_symbol(contract_string) # save csv if output_path is not None: output_path = output_path[ :-1] if output_path.endswith('/') else output_path df.to_csv("%s/%s.%s.csv" % (output_path, contract_string, kind)) # return df return df
# ---------------------------------------------
[docs]def store_data(df, blotter=None, kind="BAR"): """ Store QTPyLib-compatible csv files in Blotter's MySQL. TWS/GW data are required for determining futures/options expiration :Parameters: df : dict Tick/Bar data :Optional: blotter : str Store MySQL server used by this Blotter (default is "auto detect") kind : str Is this ``BAR`` or ``TICK`` data """ # validate columns valid_cols = validate_columns(df, kind) if not valid_cols: raise ValueError('Invalid Column list') # load blotter settings blotter_args = load_blotter_args( blotter, logger=logging.getLogger(__name__)) # blotter not running if blotter_args is None: raise Exception("Cannot connect to running Blotter.") # cannot continue if blotter_args['dbskip']: raise Exception("Cannot continue. Blotter running with --dbskip") # connect to mysql using blotter's settings dbconn = pymysql.connect( client_flag=MULTI_STATEMENTS, host=str(blotter_args['dbhost']), port=int(blotter_args['dbport']), user=str(blotter_args['dbuser']), passwd=str(blotter_args['dbpass']), db=str(blotter_args['dbname']), autocommit=True ) dbcurr = dbconn.cursor() # loop through symbols and save in db for symbol in list(df['symbol'].unique()): data = df[df['symbol'] == symbol] symbol_id = get_symbol_id(symbol, dbconn, dbcurr) # prepare columns for insert data.loc[:, 'timestamp'] = data.index.strftime('%Y-%m-%d %H:%M:%S') data.loc[:, 'symbol_id'] = symbol_id # insert row by row to handle greeks data = data.to_dict(orient="records") if kind == "BAR": for _, row in enumerate(data): mysql_insert_bar(row, symbol_id, dbcurr) else: for _, row in enumerate(data): mysql_insert_tick(row, symbol_id, dbcurr) try: dbconn.commit() except Exception as e: return False return True
# ============================================= # data analyze methods # ============================================= def analyze_portfolio(file): """ analyze portfolio (TBD) """ pass