Ask AI

Source code for dagster_fivetran.resources

import json
import logging
import os
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
from dagster import (
    AssetExecutionContext,
    AssetMaterialization,
    Definitions,
    Failure,
    InitResourceContext,
    MaterializeResult,
    MetadataValue,
    OpExecutionContext,
    __version__,
    _check as check,
    get_dagster_logger,
    resource,
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.translator import (
    DagsterFivetranTranslator,
    FivetranConnector,
    FivetranConnectorScheduleType,
    FivetranConnectorTableProps,
    FivetranDestination,
    FivetranMetadataSet,
    FivetranSchemaConfig,
    FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import (
    get_fivetran_connector_table_name,
    get_fivetran_connector_url,
    get_fivetran_logs_url,
    get_translator_from_fivetran_assets,
    metadata_for_table,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


[docs] class FivetranResource(ConfigurableResource): """This class exposes methods on top of the Fivetran REST API.""" api_key: str = Field(description="The Fivetran API key to use for this resource.") api_secret: str = Field(description="The Fivetran API secret to use for this resource.") disable_schedule_on_trigger: bool = Field( default=True, description=( "Specifies if you would like any connector that is sync'd using this " "resource to be automatically taken off its Fivetran schedule." ), ) request_max_retries: int = Field( default=3, description=( "The maximum number of times requests to the Fivetran API should be retried " "before failing." ), ) request_retry_delay: float = Field( default=0.25, description="Time (in seconds) to wait between each request retry.", ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @property def _auth(self) -> HTTPBasicAuth: return HTTPBasicAuth(self.api_key, self.api_secret) @property @cached_method def _log(self) -> logging.Logger: return get_dagster_logger() @property def api_base_url(self) -> str: return urljoin(FIVETRAN_API_BASE, FIVETRAN_API_VERSION_PATH) @property def api_connector_url(self) -> str: return urljoin(self.api_base_url, FIVETRAN_CONNECTOR_PATH) def make_connector_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: return self.make_request(method, urljoin(FIVETRAN_CONNECTOR_PATH, endpoint), data) def make_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: """Creates and sends a request to the desired Fivetran Connector API endpoint. Args: method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH"). endpoint (str): The Fivetran API endpoint to send this request to. data (Optional[str]): JSON-formatted data string to be included in the request. Returns: Dict[str, Any]: Parsed json data from the response to this request """ url = urljoin(self.api_base_url, endpoint) headers = { "User-Agent": f"dagster-fivetran/{__version__}", "Content-Type": "application/json;version=2", } num_retries = 0 while True: try: response = requests.request( method=method, url=url, headers=headers, auth=self._auth, data=data, timeout=int(os.getenv("DAGSTER_FIVETRAN_CONNECTOR_REQUEST_TIMEOUT", "60")), ) response.raise_for_status() resp_dict = response.json() return resp_dict["data"] if "data" in resp_dict else resp_dict except RequestException as e: self._log.error("Request to Fivetran API failed: %s", e) if num_retries == self.request_max_retries: break num_retries += 1 time.sleep(self.request_retry_delay) raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.") def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: """Gets details about a given connector from the Fivetran Connector API. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. Returns: Dict[str, Any]: Parsed json data from the response to this request """ return self.make_connector_request(method="GET", endpoint=connector_id) def _assert_syncable_connector(self, connector_id: str): """Confirms that a given connector is eligible to sync. Will raise a Failure in the event that the connector is either paused or not fully setup. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. """ connector_details = self.get_connector_details(connector_id) if connector_details["paused"]: raise Failure(f"Connector '{connector_id}' cannot be synced as it is currently paused.") if connector_details["status"]["setup_state"] != "connected": raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup") def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]: """Gets details about the status of the most recent Fivetran sync operation for a given connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. Returns: Tuple[datetime.datetime, bool, str]: Tuple representing the timestamp of the last completeded sync, if it succeeded, and the currently reported sync status. """ connector_details = self.get_connector_details(connector_id) min_time_str = "0001-01-01 00:00:00+00" succeeded_at = parser.parse(connector_details["succeeded_at"] or min_time_str) failed_at = parser.parse(connector_details["failed_at"] or min_time_str) return ( max(succeeded_at, failed_at), succeeded_at > failed_at, connector_details["status"]["sync_state"], ) def update_connector( self, connector_id: str, properties: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: """Updates properties of a Fivetran Connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector). Returns: Dict[str, Any]: Parsed json data representing the API response. """ return self.make_connector_request( method="PATCH", endpoint=connector_id, data=json.dumps(properties) ) def update_schedule_type( self, connector_id: str, schedule_type: Optional[str] = None ) -> Mapping[str, Any]: """Updates the schedule type property of the connector to either "auto" or "manual". Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to turn it off). Returns: Dict[str, Any]: Parsed json data representing the API response. """ if schedule_type not in ["auto", "manual"]: check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'") return self.update_connector(connector_id, properties={"schedule_type": schedule_type}) def get_connector_schema_config(self, connector_id: str) -> Mapping[str, Any]: return self.make_connector_request("GET", endpoint=f"{connector_id}/schemas") def start_sync(self, connector_id: str) -> Mapping[str, Any]: """Initiates a sync of a Fivetran connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. Returns: Dict[str, Any]: Parsed json data representing the connector details API response after the sync is started. """ if self.disable_schedule_on_trigger: self._log.info("Disabling Fivetran sync schedule.") self.update_schedule_type(connector_id, "manual") self._assert_syncable_connector(connector_id) self.make_connector_request(method="POST", endpoint=f"{connector_id}/force") connector_details = self.get_connector_details(connector_id) self._log.info( f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran UI: " + get_fivetran_connector_url(connector_details) ) return connector_details def start_resync( self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None ) -> Mapping[str, Any]: """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 Returns: Dict[str, Any]: Parsed json data representing the connector details API response after the resync is started. """ if self.disable_schedule_on_trigger: self._log.info("Disabling Fivetran sync schedule.") self.update_schedule_type(connector_id, "manual") self._assert_syncable_connector(connector_id) self.make_connector_request( method="POST", endpoint=( f"{connector_id}/schemas/tables/resync" if resync_parameters is not None else f"{connector_id}/resync" ), data=json.dumps(resync_parameters) if resync_parameters is not None else None, ) connector_details = self.get_connector_details(connector_id) self._log.info( f"Sync initialized for connector_id={connector_id}. View this resync in the Fivetran" " UI: " + get_fivetran_connector_url(connector_details) ) return connector_details def poll_sync( self, connector_id: str, initial_last_sync_completion: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: """Given a Fivetran connector and the timestamp at which the previous sync completed, poll until the next sync completes. The previous sync completion time is necessary because the only way to tell when a sync completes is when this value changes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will waited before this operation is timed out. By default, this will never time out. Returns: Dict[str, Any]: Parsed json data representing the API response. """ poll_start = datetime.now() while True: ( curr_last_sync_completion, curr_last_sync_succeeded, curr_sync_state, ) = self.get_connector_sync_status(connector_id) self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]") if curr_last_sync_completion > initial_last_sync_completion: break if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. time.sleep(poll_interval) connector_details = self.get_connector_details(connector_id) if not curr_last_sync_succeeded: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ "connector_details": MetadataValue.json(connector_details), "log_url": MetadataValue.url(get_fivetran_logs_url(connector_details)), }, ) return connector_details def sync_and_poll( self, connector_id: str, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> FivetranOutput: """Initializes a sync operation for the given connector, and polls until it completes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will waited before this operation is timed out. By default, this will never time out. Returns: :py:class:`~FivetranOutput`: Object containing details about the connector and the tables it updates """ schema_config = self.get_connector_schema_config(connector_id) init_last_sync_timestamp, _, _ = self.get_connector_sync_status(connector_id) self.start_sync(connector_id) final_details = self.poll_sync( connector_id, init_last_sync_timestamp, poll_interval=poll_interval, poll_timeout=poll_timeout, ) return FivetranOutput(connector_details=final_details, schema_config=schema_config) def resync_and_poll( self, connector_id: str, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, ) -> FivetranOutput: """Initializes a historical resync operation for the given connector, and polls until it completes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. This should be a dictionary with schema names as the keys and a list of tables to resync as the values. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will waited before this operation is timed out. By default, this will never time out. Returns: :py:class:`~FivetranOutput`: Object containing details about the connector and the tables it updates """ schema_config = self.get_connector_schema_config(connector_id) init_last_sync_timestamp, _, _ = self.get_connector_sync_status(connector_id) self.start_resync(connector_id, resync_parameters) final_details = self.poll_sync( connector_id, init_last_sync_timestamp, poll_interval=poll_interval, poll_timeout=poll_timeout, ) return FivetranOutput(connector_details=final_details, schema_config=schema_config) def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API.""" return self.make_request("GET", f"destinations/{destination_id}")
[docs] @dagster_maintained_resource @resource(config_schema=FivetranResource.to_config_schema()) def fivetran_resource(context: InitResourceContext) -> FivetranResource: """This resource allows users to programatically interface with the Fivetran REST API to launch syncs and monitor their progress. This currently implements only a subset of the functionality exposed by the API. For a complete set of documentation on the Fivetran REST API, including expected response JSON schemae, see the `Fivetran API Docs <https://fivetran.com/docs/rest-api/connectors>`_. To configure this resource, we recommend using the `configured <https://docs.dagster.io/concepts/configuration/configured>`_ method. **Examples:** .. code-block:: python from dagster import job from dagster_fivetran import fivetran_resource my_fivetran_resource = fivetran_resource.configured( { "api_key": {"env": "FIVETRAN_API_KEY"}, "api_secret": {"env": "FIVETRAN_API_SECRET"}, } ) @job(resource_defs={"fivetran":my_fivetran_resource}) def my_fivetran_job(): ... """ return FivetranResource.from_resource_context(context)
# ------------------ # Reworked resources # ------------------ @experimental class FivetranClient: """This class exposes methods on top of the Fivetran REST API.""" def __init__( self, api_key: str, api_secret: str, request_max_retries: int, request_retry_delay: float, disable_schedule_on_trigger: bool, ): self.api_key = api_key self.api_secret = api_secret self.request_max_retries = request_max_retries self.request_retry_delay = request_retry_delay self.disable_schedule_on_trigger = disable_schedule_on_trigger @property def _auth(self) -> HTTPBasicAuth: return HTTPBasicAuth(self.api_key, self.api_secret) @property @cached_method def _log(self) -> logging.Logger: return get_dagster_logger() @property def api_base_url(self) -> str: return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}" @property def api_connector_url(self) -> str: return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}" def _make_connector_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data) def _make_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: """Creates and sends a request to the desired Fivetran API endpoint. Args: method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH"). endpoint (str): The Fivetran API endpoint to send this request to. data (Optional[str]): JSON-formatted data string to be included in the request. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ url = f"{self.api_base_url}/{endpoint}" headers = { "User-Agent": f"dagster-fivetran/{__version__}", "Content-Type": "application/json;version=2", } num_retries = 0 while True: try: response = requests.request( method=method, url=url, headers=headers, auth=self._auth, data=data, timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")), ) response.raise_for_status() resp_dict = response.json() return resp_dict["data"] if "data" in resp_dict else resp_dict except RequestException as e: self._log.error("Request to Fivetran API failed: %s", e) if num_retries == self.request_max_retries: break num_retries += 1 time.sleep(self.request_retry_delay) raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.") def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: """Gets details about a given connector from the Fivetran API. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_connector_request(method="GET", endpoint=connector_id) def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: """Fetches all connectors for a given group from the Fivetran API. Args: group_id (str): The Fivetran Group ID. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_request("GET", f"groups/{group_id}/connectors") def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any]: """Fetches the connector schema config for a given connector from the Fivetran API. Args: connector_id (str): The Fivetran Connector ID. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_request("GET", f"connectors/{connector_id}/schemas") def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. Args: destination_id (str): The Fivetran Destination ID. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_request("GET", f"destinations/{destination_id}") def get_groups(self) -> Mapping[str, Any]: """Fetches all groups from the Fivetran API. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_request("GET", "groups") def update_schedule_type_for_connector( self, connector_id: str, schedule_type: str ) -> Mapping[str, Any]: """Updates the schedule type property of the connector to either "auto" or "manual". Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to turn it off). Returns: Dict[str, Any]: Parsed json data representing the API response. """ schedule_types = {s for s in FivetranConnectorScheduleType} if schedule_type not in schedule_types: check.failed( f"The schedule_type for connector {connector_id} must be in {schedule_types}: " f"got '{schedule_type}'" ) return self._make_connector_request( method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type}) ) def get_columns_config_for_table( self, connector_id: str, schema_name: str, table_name: str ) -> Mapping[str, Any]: """Fetches the source table columns config for a given table from the Fivetran API. Args: connector_id (str): The Fivetran Connector ID. schema_name (str): The Fivetran Schema name. table_name (str): The Fivetran Table name. Returns: Dict[str, Any]: Parsed json data from the response to this request. """ return self._make_connector_request( method="GET", endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns", ) def start_sync(self, connector_id: str) -> None: """Initiates a sync of a Fivetran connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. """ request_fn = partial( self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" ) self._start_sync(request_fn=request_fn, connector_id=connector_id) def start_resync( self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None ) -> None: """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 """ request_fn = partial( self._make_connector_request, method="POST", endpoint=( f"{connector_id}/schemas/tables/resync" if resync_parameters is not None else f"{connector_id}/resync" ), data=json.dumps(resync_parameters) if resync_parameters is not None else None, ) self._start_sync(request_fn=request_fn, connector_id=connector_id) def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) connector.validate_syncable() if self.disable_schedule_on_trigger: self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.") self.update_schedule_type_for_connector(connector_id, "manual") request_fn() self._log.info( f"Sync initialized for connector {connector_id}. View this sync in the Fivetran" " UI: " + connector.url ) def poll_sync( self, connector_id: str, previous_sync_completed_at: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: """Given a Fivetran connector and the timestamp at which the previous sync completed, poll until the next sync completes. The previous sync completion time is necessary because the only way to tell when a sync completes is when this value changes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: Dict[str, Any]: Parsed json data representing the API response. """ poll_start = datetime.now() while True: connector_details = self.get_connector_details(connector_id) connector = FivetranConnector.from_connector_details( connector_details=connector_details ) self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") if connector.last_sync_completed_at > previous_sync_completed_at: break if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. time.sleep(poll_interval) if not connector.is_last_sync_successful: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ "connector_details": MetadataValue.json(connector_details), "log_url": MetadataValue.url(connector.url), }, ) return connector_details def sync_and_poll( self, connector_id: str, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> FivetranOutput: """Initializes a sync operation for the given connector, and polls until it completes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: :py:class:`~FivetranOutput`: Object containing details about the connector and the tables it updates """ return self._sync_and_poll( sync_fn=self.start_sync, connector_id=connector_id, poll_interval=poll_interval, poll_timeout=poll_timeout, ) def resync_and_poll( self, connector_id: str, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, ) -> FivetranOutput: """Initializes a historical resync operation for the given connector, and polls until it completes. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. This should be a dictionary with schema names as the keys and a list of tables to resync as the values. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: :py:class:`~FivetranOutput`: Object containing details about the connector and the tables it updates """ return self._sync_and_poll( sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), connector_id=connector_id, poll_interval=poll_interval, poll_timeout=poll_timeout, ) def _sync_and_poll( self, sync_fn: Callable, connector_id: str, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> FivetranOutput: schema_config_details = self.get_schema_config_for_connector(connector_id) connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) sync_fn(connector_id=connector_id) final_details = self.poll_sync( connector_id=connector_id, previous_sync_completed_at=connector.last_sync_completed_at, poll_interval=poll_interval, poll_timeout=poll_timeout, ) return FivetranOutput(connector_details=final_details, schema_config=schema_config_details)
[docs] @experimental class FivetranWorkspace(ConfigurableResource): """This class represents a Fivetran workspace and provides utilities to interact with Fivetran APIs. """ account_id: str = Field(description="The Fivetran account ID.") api_key: str = Field(description="The Fivetran API key to use for this resource.") api_secret: str = Field(description="The Fivetran API secret to use for this resource.") request_max_retries: int = Field( default=3, description=( "The maximum number of times requests to the Fivetran API should be retried " "before failing." ), ) request_retry_delay: float = Field( default=0.25, description="Time (in seconds) to wait between each request retry.", ) disable_schedule_on_trigger: bool = Field( default=True, description=( "Whether to disable the schedule of a connector when it is synchronized using this resource." "Defaults to True." ), ) _client: FivetranClient = PrivateAttr(default=None) @cached_method def get_client(self) -> FivetranClient: return FivetranClient( api_key=self.api_key, api_secret=self.api_secret, request_max_retries=self.request_max_retries, request_retry_delay=self.request_retry_delay, disable_schedule_on_trigger=self.disable_schedule_on_trigger, ) def fetch_fivetran_workspace_data( self, ) -> FivetranWorkspaceData: """Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object. Future work will cache this data to avoid repeated calls to the Fivetran API. Returns: FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. """ connectors_by_id = {} destinations_by_id = {} schema_configs_by_connector_id = {} client = self.get_client() groups = client.get_groups()["items"] for group in groups: group_id = group["id"] destination_details = client.get_destination_details(destination_id=group_id) destination = FivetranDestination.from_destination_details( destination_details=destination_details ) destinations_by_id[destination.id] = destination connectors_details = client.get_connectors_for_group(group_id=group_id)["items"] for connector_details in connectors_details: connector = FivetranConnector.from_connector_details( connector_details=connector_details, ) if not connector.is_connected: continue connectors_by_id[connector.id] = connector schema_config_details = client.get_schema_config_for_connector( connector_id=connector.id ) schema_config = FivetranSchemaConfig.from_schema_config_details( schema_config_details=schema_config_details ) schema_configs_by_connector_id[connector.id] = schema_config return FivetranWorkspaceData( connectors_by_id=connectors_by_id, destinations_by_id=destinations_by_id, schema_configs_by_connector_id=schema_configs_by_connector_id, ) @cached_method def load_asset_specs( self, dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Fivetran content in the workspace. Args: dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use to convert Fivetran content into :py:class:`dagster.AssetSpec`. Defaults to :py:class:`DagsterFivetranTranslator`. Returns: List[AssetSpec]: The set of assets representing the Fivetran content in the workspace. Examples: Loading the asset specs for a given Fivetran workspace: .. code-block:: python from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs import dagster as dg fivetran_workspace = FivetranWorkspace( account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), api_key=dg.EnvVar("FIVETRAN_API_KEY"), api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), ) fivetran_specs = fivetran_workspace.load_asset_specs() defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} """ return load_fivetran_asset_specs( workspace=self, dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) def _generate_materialization( self, fivetran_output: FivetranOutput, dagster_fivetran_translator: DagsterFivetranTranslator, ): connector = FivetranConnector.from_connector_details( connector_details=fivetran_output.connector_details ) schema_config = FivetranSchemaConfig.from_schema_config_details( schema_config_details=fivetran_output.schema_config ) for schema_source_name, schema in schema_config.schemas.items(): if not schema.enabled: continue for table_source_name, table in schema.tables.items(): if not table.enabled: continue asset_key = dagster_fivetran_translator.get_asset_spec( props=FivetranConnectorTableProps( table=get_fivetran_connector_table_name( schema_name=schema.name_in_destination, table_name=table.name_in_destination, ), connector_id=connector.id, name=connector.name, connector_url=connector.url, schema_config=schema_config, database=None, service=None, ) ).key yield AssetMaterialization( asset_key=asset_key, description=( f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}" ), metadata={ **metadata_for_table( as_dict(table), get_fivetran_connector_url(fivetran_output.connector_details), include_column_info=True, database=None, schema=schema.name_in_destination, table=table.name_in_destination, ), "schema_source_name": schema_source_name, "table_source_name": table_source_name, }, ) def sync_and_poll( self, context: Union[OpExecutionContext, AssetExecutionContext] ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: """Executes a sync and poll process to materialize Fivetran assets. Args: context (Union[OpExecutionContext, AssetExecutionContext]): The execution context from within `@fivetran_assets`. If an AssetExecutionContext is passed, its underlying OpExecutionContext will be used. Returns: Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult or AssetMaterialization. """ assets_def = context.assets_def dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) for spec in assets_def.specs ) client = self.get_client() fivetran_output = client.sync_and_poll( connector_id=connector_id, ) materialized_asset_keys = set() for materialization in self._generate_materialization( fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator ): # Scan through all tables actually created, if it was expected then emit a MaterializeResult. # Otherwise, emit a runtime AssetMaterialization. if materialization.asset_key in context.selected_asset_keys: yield MaterializeResult( asset_key=materialization.asset_key, metadata=materialization.metadata ) materialized_asset_keys.add(materialization.asset_key) else: context.log.warning( f"An unexpected asset was materialized: {materialization.asset_key}. " f"Yielding a materialization event." ) yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys if unmaterialized_asset_keys: context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
[docs] @experimental def load_fivetran_asset_specs( workspace: FivetranWorkspace, dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Fivetran content in the workspace. Args: workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use to convert Fivetran content into :py:class:`dagster.AssetSpec`. Defaults to :py:class:`DagsterFivetranTranslator`. Returns: List[AssetSpec]: The set of assets representing the Fivetran content in the workspace. Examples: Loading the asset specs for a given Fivetran workspace: .. code-block:: python from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs import dagster as dg fivetran_workspace = FivetranWorkspace( account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), api_key=dg.EnvVar("FIVETRAN_API_KEY"), api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), ) fivetran_specs = load_fivetran_asset_specs(fivetran_workspace) defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} """ with workspace.process_config_and_initialize_cm() as initialized_workspace: return check.is_list( FivetranWorkspaceDefsLoader( workspace=initialized_workspace, translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) .build_defs() .assets, AssetSpec, )
@record class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): workspace: FivetranWorkspace translator: DagsterFivetranTranslator @property def defs_key(self) -> str: return f"{FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX}/{self.workspace.account_id}" def fetch_state(self) -> FivetranWorkspaceData: return self.workspace.fetch_fivetran_workspace_data() def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions: all_asset_specs = [ self.translator.get_asset_spec(props) for props in state.to_fivetran_connector_table_props_data() ] return Definitions(assets=all_asset_specs)