Source code for async_v20.client

import asyncio
import logging
import os
import ujson as json
from functools import partial
from time import time

import aiohttp
from yarl import URL

from .definitions.types import AcceptDatetimeFormat
from .definitions.types import AccountID
from .definitions.types import ArrayTransaction
from .endpoints.annotations import Authorization, SinceTransactionID, LastTransactionID
from .exceptions import InitializationFailure, ResponseTimeout, CloseAllTradesFailure
from .interface import *
from .interface.helpers import too_many_passed_transactions

logger = logging.getLogger(__name__)


async def sleep(s=0.0):
    await asyncio.sleep(s)


__version__ = '8.0.0b1'


[docs]class OandaClient(AccountInterface, InstrumentInterface, OrderInterface, PositionInterface, PricingInterface, TradeInterface, TransactionInterface, UserInterface, HealthInterface): """ Create an API context for v20 access Args: token: User generated token from the online account configuration page account_id: The account id the client will connect to. If None will default to the first account number returned by :meth:`~async_v20.OandaClient.list_accounts` format_order_requests: True=Format all OrderRequests in the context of the orders instrument. False=Do not format OrderRequests, raise :class:`~async_v20.exceptions.InvalidOrderRequest` for values outside of allowed range. max_transaction_history: Maximum past transactions to store rest_host: The hostname of the v20 REST server rest_port: The port of the v20 REST server stream_host: The hostname of the v20 REST server stream_port: The port of the v20 REST server rest_scheme: The scheme of the connection to rest server. stream_scheme: The scheme of the connection to the stream server. health_host: The hostname of the health API server health_port: The port of the health server health_scheme: The scheme of the connection for the health server. datetime_format: The format to request when dealing with times rest_timeout: The timeout to use when making a polling request with the v20 REST server stream_timeout: Period to wait for an new json object during streaming max_requests_per_second: Maximum HTTP requests sent per second max_simultaneous_connections: Maximum concurrent HTTP requests debug: Set to True to log debug messages. """ headers = {'Connection': 'keep-alive', 'OANDA-Agent': 'async_v20_' + __version__} default_parameters = {} initialized = False initializing = False _initialization_step = None # The first step to be called during initialization initialization_sleep = 0.5 # Time to poll initialized when waiting for initialization _account = None instruments = None transactions = ArrayTransaction() session = None # http session will be created during initialization _rest_timeout = None # seconds @property def max_requests_per_second(self): return self._max_requests_per_second @max_requests_per_second.setter def max_requests_per_second(self, value): # Limit maximum concurrent connections self._max_requests_per_second = {True: value, False: 1}[value > 0] self._min_time_between_requests = 1 / self.max_requests_per_second @property def max_simultaneous_connections(self): return self._max_simultaneous_connections @max_simultaneous_connections.setter def max_simultaneous_connections(self, value): # Limit concurrent connections self._max_simultaneous_connections = {True: value, False: 0}[value >= 0] @property def datetime_format(self): return self._datetime_format def __init__(self, token=None, account_id=None, format_order_requests=False, max_transaction_history=100, rest_host='api-fxpractice.oanda.com', rest_port=443, rest_scheme='https', stream_host='stream-fxpractice.oanda.com', stream_port=None, stream_scheme='https', health_host='api-status.oanda.com', health_port=80, health_scheme='http', datetime_format='UNIX', rest_timeout=10, stream_timeout=60, max_requests_per_second=99, max_simultaneous_connections=10, debug=False): self.version = __version__ if token is None: token = os.environ['OANDA_TOKEN'] self.account_id = account_id self.format_order_requests = format_order_requests self.max_transaction_history = max_transaction_history # V20 REST API URL rest_host = partial(URL.build, host=rest_host, port=rest_port, scheme=rest_scheme) # v20 STREAM API URL stream_host = partial(URL.build, host=stream_host, port=stream_port, scheme=stream_scheme) # V20 API health URL health_host = partial(URL.build, host=health_host, port=health_port, scheme=health_scheme) self._hosts = {'REST': rest_host, 'STREAM': stream_host, 'HEALTH': health_host} # The timeout to use when making a polling request with the # v20 REST server self.rest_timeout = rest_timeout # The timeout to use when waiting for the next object when wait for a stream response self.stream_timeout = stream_timeout self.max_requests_per_second = max_requests_per_second self.max_simultaneous_connections = max_simultaneous_connections self._datetime_format = datetime_format # This is the default parameter dictionary. OandaClient Methods that require certain parameters # that are not explicitly passed will try to find it in this dict self.default_parameters.update( {Authorization: 'Bearer {}'.format(token), AcceptDatetimeFormat: datetime_format} ) self.debug = debug
[docs] async def account(self): """Get updated account Returns: :class:`~async_v20.Account` """ logger.info('account()') if too_many_passed_transactions(self): await self.get_account_details() else: await self.account_changes() return self._account
[docs] async def close_all_trades(self): """Close all open trades Returns: :class:`tuple` (:class:`bool`, [:class`~async_v20.interface.response.Response`, ...]) """ # Procedure is as follows: # - get all open trades # - attempt to close all open trades # - get all open trades again and check there there are None # - return close trade responses logger.info('close_all_trades()') response = await self.list_open_trades() if response: close_trade_responses = await asyncio.gather(*[self.close_trade(trade.id) for trade in response.trades]) else: msg = f'Could not get open trades. ' \ f'Server returned status {response.status}' logger.error(msg) raise CloseAllTradesFailure(msg) # After closing all trades check that all trades have indeed been closed response = await self.list_open_trades() if response and len(response.trades) == 0: pass else: msg = f'Unable to confirm all trades have been closed! ' \ f'Server returned status {response.status}' logger.error(msg) raise CloseAllTradesFailure(msg) return close_trade_responses
async def _request_limiter(self): """Wait for a minimum time interval before creating new request""" try: self._next_request_time += self._min_time_between_requests except AttributeError: self._next_request_time = time() return if self._next_request_time - time() > 0: wait_time = self._next_request_time - time() if self.debug: logger.debug('Request waiting for %s seconds', wait_time) await sleep(wait_time) return async def __aenter__(self): await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() def __enter__(self): logger.warning('<with> used rather than <async with>') return self def __exit__(self, exc_type, exc_val, exc_tb): pass async def close(self): try: await self.session.close() except AttributeError: # In case the client was never initialized pass async def initialize_session(self): # Create http session this client will use to sent all requests logger.info('Initializing session') conn = aiohttp.TCPConnector(limit=self.max_simultaneous_connections) self.session = aiohttp.ClientSession( json_serialize=json.dumps, headers=self.headers, connector=conn, read_timeout=0 # async_v20 will handle timeouts to allow dynamic changing of timeout. # after client initialization ) async def initialize(self, initialization_method=False): """Initialize client instance Args: initialization_step: -- Used internally to allow requests to bypass initialization. Returns: True when complete """ if self.initialized or self._initialization_step == initialization_method: # Do not initialize or wait for initialization to complete. # If it did, due to circular logic, initialization would never # complete. pass elif self.initializing: # Wait for current initialization to complete before # continuing with request. while not self.initialized: await sleep(self.initialization_sleep) else: # If it gets this far. An initialization if required. msg = '' # msg is used to create a useful Error msg if Initialization fails try: logger.info('Initializing client') self.initializing = True # immediately set initializing to make sure # Upcoming requests wait for this initialization to complete. # V1 REST API is deprecated # self._initialization_step = self.list_services.__name__ # response = await self.list_services() # if response: # for service in response.services: # if service.current_event.status.name != 'Up': # logger.warning(f'{service.name} {service.current_event.message}') # else: # logging.warning('Server did not return available services') # print(response.json()) # Get the first account listed in in accounts. # If another is desired the account must be configured # manually when instantiating the client if self.account_id: # Allow manual assignment of AccountID self.default_parameters.update({AccountID: self.account_id}) self.account_id = self.account_id else: # Get the corresponding AccountID for the provided token self._initialization_step = self.list_accounts.__name__ response = await self.list_accounts() if response: # Checks is the response status was the expected status as # defined by OANDA spec. self.default_parameters.update({AccountID: response['accounts'][0].id}) else: self.initializing = False msg = f'Server did not return AccountID during initialization' logger.error(msg) raise InitializationFailure(msg) # Get Account snapshot and last transaction id # last transaction is automatically updated when the # response is parsed self._initialization_step = self.get_account_details.__name__ response = await self.get_account_details() if response: self._account = response['account'] else: self.initializing = False msg = f'Server did not return Account Details during initialization.' logger.error(msg) raise InitializationFailure(msg) self._initialization_step = self.account_instruments.__name__ response = await self.account_instruments() if response: self.instruments = response['instruments'] else: self.initializing = False msg = f'Server did not return Account Instruments during initialization' logger.error(msg) raise InitializationFailure(msg) # On initialization the SinceTransactionID needs updated to reflect LastTransactionID self.default_parameters.update({SinceTransactionID: self.default_parameters[LastTransactionID]}) self.initializing = False self.initialized = True except ResponseTimeout: self.initializing = False self.initialized = False msg = f'Initialization step {self._initialization_step} ' \ f'took longer than {self.rest_timeout} seconds' logger.exception(msg) raise InitializationFailure(msg) except InitializationFailure: self.initializing = False self.initialized = False logging.exception(msg) raise InitializationFailure(msg) # Always return True when initialization has complete return True