import datetime
from typing import Optional
from BinanceWatch.storage.DataBase import DataBase, SQLConditionEnum
from BinanceWatch.storage import tables
from BinanceWatch.utils.time_utils import datetime_to_millistamp
[docs]class BinanceDataBase(DataBase):
"""
Handles the recording of the binance account in a local database
"""
[docs] def __init__(self, name: str = 'binance_db'):
"""
Initialise a binance database instance
:param name: name of the database
:type name: str
"""
super().__init__(name)
[docs] def add_universal_transfer(self, transfer_id: int, transfer_type: str, transfer_time: int, asset: str,
amount: float, auto_commit: bool = True):
"""
Add a universal transfer to the database
:param transfer_id: id of the transfer
:type transfer_id: int
:param transfer_type: enum of the transfer type (ex: 'MAIN_MARGIN')
:type transfer_type: str
:param transfer_time: millistamp of the operation
:type transfer_time: int
:param asset: asset that got transferred
:type asset: str
:param amount: amount transferred
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
table = tables.UNIVERSAL_TRANSFER_TABLE
row = (transfer_id, transfer_type, transfer_time, asset, amount)
self.add_row(table, row, auto_commit=auto_commit)
[docs] def get_universal_transfers(self, transfer_type: Optional[str] = None, asset: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return universal transfers stored in the database. Transfer type, Asset type and time filters can be used
:param transfer_type: enum of the transfer type (ex: 'MAIN_MARGIN')
:type transfer_type: Optional[str]
:param asset: fetch only interests in this asset
:type asset: Optional[str]
:param start_time: fetch only interests after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only interests before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(1206491332, # transfer id
'MAIN_MARGIN', # transfer type
1589121841000, # time
'BNB', # asset
10.594112), # amount
]
"""
table = tables.UNIVERSAL_TRANSFER_TABLE
conditions_list = []
if transfer_type is not None:
conditions_list.append((table.trfType,
SQLConditionEnum.equal,
transfer_type))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.trfTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.trfTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_universal_transfer_time(self, transfer_type: str) -> int:
"""
Return the latest time when a universal transfer was made
If None, return the millistamp corresponding to 2017/01/01
:param transfer_type: enum of the transfer type (ex: 'MAIN_MARGIN')
:type transfer_type: str
:return: millistamp
:rtype: int
"""
table = tables.UNIVERSAL_TRANSFER_TABLE
conditions_list = [(table.trfType,
SQLConditionEnum.equal,
transfer_type)]
selection = f"MAX({table.trfTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_isolated_transfer(self, transfer_id: int, transfer_type: str, transfer_time: int, isolated_symbol: str,
asset: str, amount: float, auto_commit: bool = True):
"""
Add a universal transfer to the database
:param transfer_id: id of the transfer
:type transfer_id: int
:param transfer_type: enum of the transfer type (ex: 'MAIN_MARGIN')
:type transfer_type: str
:param transfer_time: millistamp of the operation
:type transfer_time: int
:param isolated_symbol: isolated symbol that received or sent the transfer
:type isolated_symbol: str
:param asset: asset that got transferred
:type asset: str
:param amount: amount transferred
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
table = tables.ISOLATED_MARGIN_TRANSFER_TABLE
row = (transfer_id, transfer_type, transfer_time, isolated_symbol, asset, amount)
self.add_row(table, row, auto_commit=auto_commit)
[docs] def get_isolated_transfers(self, isolated_symbol: Optional[str] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None):
"""
Return isolated transfers stored in the database. isolated_symbol and time filters can be used
:param isolated_symbol: for isolated margin, provided the trading symbol otherwise it will be counted a cross
margin data
:type isolated_symbol: Optional[str]
:param start_time: fetch only transfers after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only transfers before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(1206491332, # transfer id
'IN', # transfer type (IN or OUT)
1589121841000, # time
'BTCBUSD', # isolated symbol
'BTC', # asset
10.594112), # amount
]
"""
table = tables.ISOLATED_MARGIN_TRANSFER_TABLE
conditions_list = []
if isolated_symbol is not None:
conditions_list.append((table.symbol,
SQLConditionEnum.equal,
isolated_symbol))
if start_time is not None:
conditions_list.append((table.trfTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.trfTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_isolated_transfer_time(self, isolated_symbol: str) -> int:
"""
Return the latest time when a isolated margin transfer was made
If None, return the millistamp corresponding to 2017/01/01
:param isolated_symbol: isolated symbol that received or sent the transfers
:type isolated_symbol: str
:return: millistamp
:rtype: int
"""
table = tables.ISOLATED_MARGIN_TRANSFER_TABLE
conditions_list = [(table.symbol,
SQLConditionEnum.equal,
isolated_symbol)]
selection = f"MAX({table.trfTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_margin_interest(self, interest_time: int, asset: str, interest: float, interest_type: str,
isolated_symbol: Optional[str] = None, auto_commit: bool = True):
"""
Add a margin interest to the database
:param interest_time: millistamp of the operation
:type interest_time: int
:param asset: asset that got repaid
:type asset: str
:param interest: amount of interest accrued
:type interest: float
:param interest_type: one of (PERIODIC, ON_BORROW, PERIODIC_CONVERTED, ON_BORROW_CONVERTED)
:type interest_type: str
:param isolated_symbol: for isolated margin, provided the trading symbol otherwise it will be counted a cross
margin data
:type isolated_symbol: Optional[str]
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
if isolated_symbol is None:
table = tables.CROSS_MARGIN_INTEREST_TABLE
row = (interest_time, asset, interest, interest_type)
else:
table = tables.ISOLATED_MARGIN_INTEREST_TABLE
row = (interest_time, isolated_symbol, asset, interest, interest_type)
self.add_row(table, row, auto_commit=auto_commit)
[docs] def get_margin_interests(self, margin_type: str, asset: Optional[str] = None, isolated_symbol: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return margin interests stored in the database. Asset type and time filters can be used
:param margin_type: either 'cross' or 'isolated'
:type margin_type:
:param asset: fetch only interests in this asset
:type asset: Optional[str]
:param isolated_symbol: only for isolated margin, provide the trading symbol (otherwise cross data are returned)
:type isolated_symbol: Optional[str]
:param start_time: fetch only interests after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only interests before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
# cross margin
[
1559415215400, # time
'BNB', # asset
0.51561, # interest
'PERIODIC_CONVERTED'), # interest type
]
# isolated margin
[
1559415215400, # time
'BTCBUSD', # symbol
'BUSD', # asset
0.51561, # interest
'PERIODIC'), # interest type
]
"""
conditions_list = []
if margin_type == 'cross':
table = tables.CROSS_MARGIN_INTEREST_TABLE
elif margin_type == 'isolated':
table = tables.ISOLATED_MARGIN_INTEREST_TABLE
if isolated_symbol is not None:
conditions_list.append((table.isolated_symbol,
SQLConditionEnum.equal,
isolated_symbol))
else:
raise ValueError(f"margin type should be 'cross' or 'isolated' but {margin_type} was received")
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.interestTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.interestTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_margin_interest_time(self, asset: Optional[str] = None, isolated_symbol: Optional[str] = None) -> int:
"""
Return the latest time when a margin interest was accured on a defined asset or on all assets
If None, return the millistamp corresponding to 2017/01/01
:param asset: name of the asset charged as interest
:type asset: Optional[str]
:param isolated_symbol: only for isolated margin, provide the trading symbol (otherwise cross data are returned)
:type isolated_symbol: Optional[str]
:return: millistamp
:rtype: int
"""
conditions_list = []
if isolated_symbol is None:
table = tables.CROSS_MARGIN_INTEREST_TABLE
else:
table = tables.ISOLATED_MARGIN_INTEREST_TABLE
conditions_list.append((table.symbol,
SQLConditionEnum.equal,
isolated_symbol))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
selection = f"MAX({table.interestTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_repay(self, tx_id: int, repay_time: int, asset: str, principal: float,
interest: float, isolated_symbol: Optional[str] = None, auto_commit: bool = True):
"""
Add a repay to the database
:param tx_id: binance id for the transaction (uniqueness?)
:type tx_id: int
:param repay_time: millitstamp of the operation
:type repay_time: int
:param asset: asset that got repaid
:type asset: str
:param principal: principal amount repaid for the loan
:type principal: float
:param interest: amount of interest repaid for the loan
:type interest: float
:param isolated_symbol: for isolated margin, provided the trading symbol otherwise it will be counted a cross
margin data
:type isolated_symbol: Optional[str]
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
if isolated_symbol is None:
table = tables.CROSS_MARGIN_REPAY_TABLE
row = (tx_id, repay_time, asset, principal, interest)
else:
table = tables.ISOLATED_MARGIN_REPAY_TABLE
row = (tx_id, repay_time, isolated_symbol, asset, principal, interest)
self.add_row(table, row, auto_commit=auto_commit)
[docs] def get_repays(self, margin_type: str, asset: Optional[str] = None, isolated_symbol: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return repays stored in the database. Asset type and time filters can be used
:param margin_type: either 'cross' or 'isolated'
:type margin_type: str
:param asset: fetch only repays of this asset
:type asset: Optional[str]
:param isolated_symbol: only for isolated margin, provide the trading symbol
:type isolated_symbol: Optional[str]
:param start_time: fetch only repays after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only repays before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
# cross margin
[
(8289451654, # transaction id
1559415215400, # time
'USDT', # asset
145.5491462, # principal
0.51561), # interest
]
# isolated margin
[
(8289451654, # transaction id
1559415215400, # time
'BTCUSDT', # isolated symbol
'USDT', # asset
145.5491462, # principal
0.51561), # interest
]
"""
conditions_list = []
if margin_type == 'cross':
table = tables.CROSS_MARGIN_REPAY_TABLE
elif margin_type == 'isolated':
table = tables.ISOLATED_MARGIN_REPAY_TABLE
if isolated_symbol is not None:
conditions_list.append((table.isolated_symbol,
SQLConditionEnum.equal,
isolated_symbol))
else:
raise ValueError(f"margin type should be 'cross' or 'isolated' but {margin_type} was received")
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.repayTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.repayTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_repay_time(self, asset: str, isolated_symbol: Optional[str] = None) -> int:
"""
Return the latest time when a repay was made on a defined asset
If None, return the millistamp corresponding to 2017/01/01
:param asset: name of the asset repaid
:type asset: str
:param isolated_symbol: only for isolated margin, provide the trading symbol (otherwise cross data are returned)
:type isolated_symbol: Optional[str]
:return: millistamp
:rtype: int
"""
conditions_list = []
if isolated_symbol is None:
table = tables.CROSS_MARGIN_REPAY_TABLE
else:
table = tables.ISOLATED_MARGIN_REPAY_TABLE
conditions_list.append((table.symbol,
SQLConditionEnum.equal,
isolated_symbol))
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
selection = f"MAX({table.repayTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_loan(self, tx_id: int, loan_time: int, asset: str, principal: float,
isolated_symbol: Optional[str] = None, auto_commit: bool = True):
"""
Add a loan to the database
:param tx_id: binance id for the transaction (uniqueness?)
:type tx_id: int
:param loan_time: millitstamp of the operation
:type loan_time: int
:param asset: asset that got loaned
:type asset: str
:param principal: amount of loaned asset
:type principal: float
:param isolated_symbol: for isolated margin, provided the trading symbol otherwise it will be counted a cross
margin data
:type isolated_symbol: Optional[str]
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
if isolated_symbol is None:
table = tables.CROSS_MARGIN_LOAN_TABLE
row = (tx_id, loan_time, asset, principal)
else:
row = (tx_id, loan_time, isolated_symbol, asset, principal)
table = tables.ISOLATED_MARGIN_LOAN_TABLE
self.add_row(table, row, auto_commit=auto_commit)
[docs] def get_loans(self, margin_type: str, asset: Optional[str] = None, isolated_symbol: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return loans stored in the database. Asset type and time filters can be used
:param margin_type: either 'cross' or 'isolated'
:type margin_type:
:param asset: fetch only loans of this asset
:type asset: Optional[str]
:param isolated_symbol: only for isolated margin, provide the trading symbol
:type isolated_symbol: Optional[str]
:param start_time: fetch only loans after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only loans before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
# cross margin
[
(8289451654, # transaction id
1559415215400, # time
'USDT', # asset
145.5491462), # amount
]
# isolated margin
[
(8289451654, # transaction id
1559415215400, # time
'BTCUSDT', # symbol
'USDT', # asset
145.5491462), # amount
]
"""
conditions_list = []
if margin_type == 'cross':
table = tables.CROSS_MARGIN_LOAN_TABLE
elif margin_type == 'isolated':
table = tables.ISOLATED_MARGIN_LOAN_TABLE
if isolated_symbol is not None:
conditions_list.append((table.isolated_symbol,
SQLConditionEnum.equal,
isolated_symbol))
else:
raise ValueError(f"margin type should be 'cross' or 'isolated' but {margin_type} was received")
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.loanTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.loanTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_loan_time(self, asset: str, isolated_symbol: Optional[str] = None) -> int:
"""
Return the latest time when an loan was made on a defined asset
If None, return the millistamp corresponding to 2017/01/01
:param asset: name of the asset loaned
:type asset: str
:param isolated_symbol: only for isolated margin, provide the trading symbol (otherwise cross data are returned)
:type isolated_symbol: Optional[str]
:return: millistamp
:rtype: int
"""
conditions_list = []
if isolated_symbol is None:
table = tables.CROSS_MARGIN_LOAN_TABLE
else:
table = tables.ISOLATED_MARGIN_LOAN_TABLE
conditions_list.append((table.symbol,
SQLConditionEnum.equal,
isolated_symbol))
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
selection = f"MAX({table.loanTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_lending_redemption(self, redemption_time: int, lending_type: str, asset: str, amount: float,
auto_commit: bool = True):
"""
Add a lending redemption to the database
:param redemption_time: millitstamp of the operation
:type redemption_time: int
:param lending_type: either 'DAILY', 'ACTIVITY' or 'CUSTOMIZED_FIXED'
:type lending_type: str
:param asset: asset lent
:type asset: str
:param amount: amount of asset redeemed
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (redemption_time, lending_type, asset, amount)
self.add_row(tables.LENDING_REDEMPTION_TABLE, row, auto_commit=auto_commit)
[docs] def get_lending_redemptions(self, lending_type: Optional[str] = None, asset: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return lending redemptions stored in the database. Asset type and time filters can be used
:param lending_type: fetch only redemptions from this lending type
:type lending_type: Optional[str]
:param asset: fetch only redemptions from this asset
:type asset: Optional[str]
:param start_time: fetch only redemptions after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only redemptions before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
1612841562000, # time
'DAILY', # lending type
'LTC', # asset
1.89151684), # amount
]
"""
conditions_list = []
table = tables.LENDING_REDEMPTION_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.redemptionTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.redemptionTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_lending_redemption_time(self, lending_type: Optional[str] = None) -> int:
"""
Return the latest time when an lending redemption was made.
If None, return the millistamp corresponding to 2017/01/01
:param lending_type: type of lending
:type lending_type: str
:return: millistamp
:rtype: int
"""
conditions_list = []
table = tables.LENDING_REDEMPTION_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
selection = f"MAX({table.redemptionTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_lending_purchase(self, purchase_id: int, purchase_time: int, lending_type: str, asset: str, amount: float,
auto_commit: bool = True):
"""
Add a lending purchase to the database
:param purchase_id: id of the purchase
:type purchase_id: int
:param purchase_time: millitstamp of the operation
:type purchase_time: int
:param lending_type: either 'DAILY', 'ACTIVITY' or 'CUSTOMIZED_FIXED'
:type lending_type: str
:param asset: asset lent
:type asset: str
:param amount: amount of asset lent
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (purchase_id, purchase_time, lending_type, asset, amount)
self.add_row(tables.LENDING_PURCHASE_TABLE, row, auto_commit=auto_commit)
[docs] def get_lending_purchases(self, lending_type: Optional[str] = None, asset: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return lending purchases stored in the database. Asset type and time filters can be used
:param lending_type: fetch only purchases from this lending type
:type lending_type: Optional[str]
:param asset: fetch only purchases from this asset
:type asset: Optional[str]
:param start_time: fetch only purchases after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only purchases before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(58516828, # purchase id
1612841562000, # time
'DAILY', # lending type
'LTC', # asset
1.89151684), # amount
]
"""
conditions_list = []
table = tables.LENDING_PURCHASE_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.purchaseTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.purchaseTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_lending_purchase_time(self, lending_type: Optional[str] = None) -> int:
"""
Return the latest time when an lending purchase was made.
If None, return the millistamp corresponding to 2017/01/01
:param lending_type: type of lending
:type lending_type: str
:return: millistamp
:rtype: int
"""
conditions_list = []
table = tables.LENDING_PURCHASE_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
selection = f"MAX({table.purchaseTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_lending_interest(self, time: int, lending_type: str, asset: str, amount: float,
auto_commit: bool = True):
"""
Add an lending interest to the database
:param time: millitstamp of the operation
:type time: int
:param lending_type: either 'DAILY', 'ACTIVITY' or 'CUSTOMIZED_FIXED'
:type lending_type: str
:param asset: asset that was received
:type asset: str
:param amount: amount of asset received
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (time, lending_type, asset, amount)
self.add_row(tables.LENDING_INTEREST_TABLE, row, auto_commit=auto_commit)
[docs] def get_lending_interests(self, lending_type: Optional[str] = None, asset: Optional[str] = None,
start_time: Optional[int] = None, end_time: Optional[int] = None):
"""
Return lending interests stored in the database. Asset type and time filters can be used
:param lending_type: fetch only interests from this lending type
:type lending_type: Optional[str]
:param asset: fetch only interests from this asset
:type asset: Optional[str]
:param start_time: fetch only interests after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only interests before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(1619846515000, # time
'DAILY', # lending type
'DOT', # asset
0.00490156) # amount
]
"""
conditions_list = []
table = tables.LENDING_INTEREST_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.interestTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.interestTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_lending_interest_time(self, lending_type: Optional[str] = None) -> int:
"""
Return the latest time when an interest was received.
If None, return the millistamp corresponding to 2017/01/01
:param lending_type: type of lending
:type lending_type: str
:return: millistamp
:rtype: int
"""
conditions_list = []
table = tables.LENDING_INTEREST_TABLE
if lending_type is not None:
conditions_list.append((table.lendingType,
SQLConditionEnum.equal,
lending_type))
selection = f"MAX({table.interestTime})"
result = self.get_conditions_rows(table,
selection=selection,
conditions_list=conditions_list)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_spot_dust(self, tran_id: str, time: int, asset: str, asset_amount: float, bnb_amount: float, bnb_fee: float,
auto_commit: bool = True):
"""
Add dust operation to the database
:param tran_id: id of the transaction (non unique)
:type tran_id: str
:param time: millitstamp of the operation
:type time: int
:param asset: asset that got converted to BNB
:type asset: str
:param asset_amount: amount of asset that got converted
:type asset_amount: float
:param bnb_amount: amount received from the conversion
:type bnb_amount: float
:param bnb_fee: fee amount in BNB
:type bnb_fee: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (tran_id, time, asset, asset_amount, bnb_amount, bnb_fee)
self.add_row(tables.SPOT_DUST_TABLE, row, auto_commit=auto_commit)
[docs] def get_spot_dusts(self, asset: Optional[str] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None):
"""
Return dusts stored in the database. Asset type and time filters can be used
:param asset: fetch only dusts from this asset
:type asset: Optional[str]
:param start_time: fetch only dusts after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only dusts before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(82156485284, # transaction id
1605489113400, # time
'TRX', # asset
102.78415879, # asset amount
0.09084498, # bnb amount
0.00171514), # bnb fee
]
"""
conditions_list = []
table = tables.SPOT_DUST_TABLE
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.dustTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.dustTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def add_dividend(self, div_id: int, div_time: int, asset: str, amount: float, auto_commit: bool = True):
"""
Add a dividend to the database
:param div_id: dividend id
:type div_id: int
:param div_time: millistamp of dividend reception
:type div_time: int
:param asset: name of the dividend unit
:type asset: str
:param amount: amount of asset distributed
:type amount: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (div_id, div_time, asset, amount)
self.add_row(tables.SPOT_DIVIDEND_TABLE, row, auto_commit=auto_commit)
[docs] def get_spot_dividends(self, asset: Optional[str] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None):
"""
Return dividends stored in the database. Asset type and time filters can be used
:param asset: fetch only dividends of this asset
:type asset: Optional[str]
:param start_time: fetch only dividends after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only dividends before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
(8945138941, # dividend id
1594513589000, # time
'TRX', # asset
0.18745654), # amount
]
"""
conditions_list = []
table = tables.SPOT_DIVIDEND_TABLE
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.divTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.divTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_spot_dividend_time(self) -> int:
"""
Fetch the latest time a dividend has been distributed on the spot account. If None is found,
return the millistamp corresponding to 2017/1/1
:return:
"""
table = tables.SPOT_DIVIDEND_TABLE
selection = f"MAX({table.divTime})"
result = self.get_conditions_rows(table,
selection=selection)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_withdraw(self, withdraw_id: str, tx_id: str, apply_time: int, asset: str, amount: float, fee: float,
auto_commit: bool = True):
"""
Add a withdraw to the database
:param withdraw_id: binance if of the withdraw
:type withdraw_id: str
:param tx_id: transaction id
:type tx_id: str
:param apply_time: millistamp when the withdraw was requested
:type apply_time: int
:param asset: name of the token
:type asset: str
:param amount: amount of token withdrawn
:type amount: float
:param fee: amount of the asset paid for the withdraw
:type fee: float
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (withdraw_id, tx_id, apply_time, asset, amount, fee)
self.add_row(tables.SPOT_WITHDRAW_TABLE, row, auto_commit=auto_commit)
[docs] def get_spot_withdraws(self, asset: Optional[str] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None):
"""
Return withdraws stored in the database. Asset type and time filters can be used
:param asset: fetch only withdraws of this asset
:type asset: Optional[str]
:param start_time: fetch only withdraws after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only withdraws before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
('84984dcqq5z11gyjfa', # withdraw id
'aazd8949vredqs56dz', # transaction id
1599138389000, # withdraw time
'XTZ', # asset
57.0194, # amount
0.5), # fee
]
"""
conditions_list = []
table = tables.SPOT_WITHDRAW_TABLE
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.applyTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.applyTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_spot_withdraw_time(self) -> int:
"""
Fetch the latest time a withdraw has been made on the spot account. If None is found, return the millistamp
corresponding to 2017/1/1
:return:
"""
table = tables.SPOT_WITHDRAW_TABLE
selection = f"MAX({table.applyTime})"
result = self.get_conditions_rows(table,
selection=selection)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_deposit(self, tx_id: str, insert_time: int, amount: float, asset: str, auto_commit=True):
"""
Add a deposit to the database
:param tx_id: transaction id
:type tx_id: str
:param insert_time: millistamp when the deposit arrived on binance
:type insert_time: int
:param amount: amount of token deposited
:type amount: float
:param asset: name of the token
:type asset: str
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (tx_id, insert_time, asset, amount)
self.add_row(tables.SPOT_DEPOSIT_TABLE, row, auto_commit)
[docs] def get_spot_deposits(self, asset: Optional[str] = None, start_time: Optional[int] = None,
end_time: Optional[int] = None):
"""
Return deposits stored in the database. Asset type and time filters can be used
:param asset: fetch only deposits of this asset
:type asset: Optional[str]
:param start_time: fetch only deposits after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only deposits before this millistamp
:type end_time: Optional[int]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
.. code-block:: python
[
('azdf5e6a1d5z', # transaction id
1589479004000, # deposit time
'LTC', # asset
14.25), # amount
]
"""
conditions_list = []
table = tables.SPOT_DEPOSIT_TABLE
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if start_time is not None:
conditions_list.append((table.insertTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.insertTime,
SQLConditionEnum.lower,
end_time))
return self.get_conditions_rows(table, conditions_list=conditions_list)
[docs] def get_last_spot_deposit_time(self) -> int:
"""
Fetch the latest time a deposit has been made on the spot account. If None is found, return the millistamp
corresponding to 2017/1/1
:return: last deposit millistamp
:rtype: int
"""
table = tables.SPOT_DEPOSIT_TABLE
selection = f"MAX({table.insertTime})"
result = self.get_conditions_rows(table,
selection=selection)
default = datetime_to_millistamp(datetime.datetime(2017, 1, 1, tzinfo=datetime.timezone.utc))
try:
result = result[0][0]
except IndexError:
return default
if result is None:
return default
return result
[docs] def add_trade(self, trade_type: str, trade_id: int, trade_time: int, asset: str, ref_asset: str, qty: float,
price: float, fee: float, fee_asset: str, is_buyer: bool, symbol: Optional[str] = None,
auto_commit: bool = True):
"""
Add a trade to the database
:param trade_type: type trade executed
:type trade_type: string, must be one of {'spot', 'cross_margin', 'isolated_margin'}
:param trade_id: id of the trade (binance id, unique per trading pair)
:type trade_id: int
:param trade_time: millistamp of the trade
:type trade_time: int
:param asset: name of the asset in the trading pair (ex 'BTC' for 'BTCUSDT')
:type asset: string
:param ref_asset: name of the reference asset in the trading pair (ex 'USDT' for 'BTCUSDT')
:type ref_asset: string
:param qty: quantity of asset exchanged
:type qty: float
:param price: price of the asset regarding the ref_asset
:type price: float
:param fee: amount kept by the exchange
:type fee: float
:param fee_asset: token unit for the fee
:type fee_asset: str
:param is_buyer: if the trade is a buy or a sell
:type is_buyer: bool
:param symbol: trading symbol, mandatory if thr trade_type is isolated margin
:type symbol: Optional[str]
:param auto_commit: if the database should commit the change made, default True
:type auto_commit: bool
:return: None
:rtype: None
"""
row = (trade_id, trade_time, asset, ref_asset, qty, price, fee, fee_asset, int(is_buyer))
if trade_type == 'spot':
table = tables.SPOT_TRADE_TABLE
elif trade_type == 'cross_margin':
table = tables.CROSS_MARGIN_TRADE_TABLE
elif trade_type == 'isolated_margin':
table = tables.ISOLATED_MARGIN_TRADE_TABLE
if symbol is None:
raise ValueError("trade_type was isolated margin but symbol was not provided")
row = (trade_id, trade_time, symbol, asset, ref_asset, qty, price, fee, fee_asset, int(is_buyer))
else:
msg = f"trade type should be one of ('spot', 'cross_margin', 'isolated_margin') but {trade_type} was" \
f" received"
raise ValueError(msg)
self.add_row(table, row, auto_commit)
[docs] def get_trades(self, trade_type: str, start_time: Optional[int] = None, end_time: Optional[int] = None,
asset: Optional[str] = None, ref_asset: Optional[str] = None):
"""
Return trades stored in the database. asset type, ref_asset type and time filters can be used
:param trade_type: type trade executed
:type trade_type: string, must be one of ('spot', 'cross_margin', 'isolated_margin')
:param start_time: fetch only trades after this millistamp
:type start_time: Optional[int]
:param end_time: fetch only trades before this millistamp
:type end_time: Optional[int]
:param asset: fetch only trades with this asset
:type asset: Optional[str]
:param ref_asset: fetch only trades with this ref_asset
:type ref_asset: Optional[str]
:return: The raw rows selected as saved in the database
:rtype: List[Tuple]
Return for spot and cross margin:
.. code-block:: python
[
(384518832, # trade_id
1582892988052, # trade time
'BTC', # asset
'USDT', # ref asset
0.0015, # asset quantity
9011.2, # asset price to ref asset
0.01425, # fee
'USDT', # fee asset
0), # is_buyer
]
Return for isolated margin:
.. code-block:: python
[
(384518832, # trade_id
1582892988052, # trade time
'BTCUSDT', # symbol
'BTC', # asset
'USDT', # ref asset
0.0015, # asset quantity
9011.2, # asset price to ref asset
0.01425, # fee
'USDT', # fee asset
0), # is_buyer
]
"""
if trade_type == 'spot':
table = tables.SPOT_TRADE_TABLE
elif trade_type == 'cross_margin':
table = tables.CROSS_MARGIN_TRADE_TABLE
elif trade_type == 'isolated_margin':
table = tables.ISOLATED_MARGIN_TRADE_TABLE
else:
msg = f"trade type should be one of ('spot', 'cross_margin', 'isolated_margin') but {trade_type} was" \
f" received"
raise ValueError(msg)
conditions_list = []
if start_time is not None:
conditions_list.append((table.tdTime,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.tdTime,
SQLConditionEnum.lower,
end_time))
if asset is not None:
conditions_list.append((table.asset,
SQLConditionEnum.equal,
asset))
if ref_asset is not None:
conditions_list.append((table.refAsset,
SQLConditionEnum.equal,
ref_asset))
return self.get_conditions_rows(table, conditions_list=conditions_list, order_list=[table.tdTime])
[docs] def get_max_trade_id(self, asset: str, ref_asset: str, trade_type: str) -> int:
"""
Return the latest trade id for a trading pair. If none is found, return -1
:param asset: name of the asset in the trading pair (ex 'BTC' for 'BTCUSDT')
:type asset: string
:param ref_asset: name of the reference asset in the trading pair (ex 'USDT' for 'BTCUSDT')
:type ref_asset: string
:param trade_type: type trade executed
:type trade_type: string, must be one of {'spot', 'cross_margin'}
:return: latest trade id
:rtype: int
"""
if trade_type == 'spot':
table = tables.SPOT_TRADE_TABLE
elif trade_type == 'cross_margin':
table = tables.CROSS_MARGIN_TRADE_TABLE
elif trade_type == 'isolated_margin':
table = tables.ISOLATED_MARGIN_TRADE_TABLE
else:
msg = f"trade type should be one of ('spot', 'cross_margin', 'isolated_margin') but {trade_type} was" \
f" received"
raise ValueError(msg)
selection = f"MAX({table.tradeId})"
conditions_list = [
(table.asset,
SQLConditionEnum.equal,
asset),
(table.refAsset,
SQLConditionEnum.equal,
ref_asset)
]
result = self.get_conditions_rows(table, selection=selection, conditions_list=conditions_list)
try:
result = result[0][0]
except IndexError:
return -1
if result is None:
return -1
return result