3
O6b)                 @   sV  d Z ddlZddlZddlZddlZddlZddlmZmZ erJddl	Z
nddl
Z
ddlmZmZmZ ddlmZ ddlmZ ddlmZmZmZmZmZ ddlmZmZmZmZmZmZm Z  dd	l!m"Z" dd
l#m$Z% ddl&m'Z' ddl(m)Z) ddl*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 dd Z3G dd de4Z5G dd de4Z6dd Z7dd Z8dS )z<Internal class to monitor a topology of one or more servers.    N)
itervaluesPY3)commonhelpersperiodic_executor)IsMaster)PoolOptions)updated_topology_description)_updated_topology_description_srv_pollingTopologyDescriptionSRV_POLLING_TOPOLOGIESTOPOLOGY_TYPE)ConnectionFailureConfigurationErrorNetworkTimeoutNotPrimaryErrorOperationFailureServerSelectionTimeoutError
WriteError)
SrvMonitor)time)Server)ServerDescription)any_server_selectorarbiter_server_selectorsecondary_server_selectorreadable_server_selectorwritable_server_selector	Selection)_ServerSessionPoolc             C   sN   |  }|sdS x:y|j  }W n tjk
r4   P Y qX |\}}||  qW dS )NFT)
get_nowaitQueueEmpty)Z	queue_refqeventfnargs r'   O/var/www/html/sandeepIITI/myenv/lib/python3.6/site-packages/pymongo/topology.pyprocess_events_queue:   s    r)   c               @   s`  e Zd ZdZdd Zdd ZdUddZd	d
 ZdVddZdWddZ	dXddZ
dYddZdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) ZdZd+d,Zd-d. Zd/d0 Zd1d2 Zd3d4 Zed5d6 Zd7d8 Zd9d: Zd;d< Z d=d> Z!d?d@ Z"dAdB Z#dCdD Z$dEdF Z%dGdH Z&dIdJ Z'dKdL Z(dMdN Z)dOdP Z*dQdR Z+dSdT Z,dS )[Topologyz*Monitor a topology of one or more servers.c                s  |j | _ |jj| _| jd k	}|o&| jj| _|o4| jj| _d | _d | _	| jsP| jr^t
j
dd| _| jr|| jj| jj| j ff || _t|j |j |jd d |}|| _| jrttji d d d | j}| jj| jj|| j| j ff x.|jD ]$}| jr| jj| jj|| j ff qW t|j | _d| _tj | _| jj| j| _ i | _!d | _"d | _#t$ | _%| jsf| jr fdd}t&j't(j)d|dd}t*j+| j|j, || _	|j-  d | _.| jj/d k	r| jj0 rt1| | j| _.d S )	Nd   )maxsizeFc                  s   t  S )N)r)   r'   )weakr'   r(   target   s    z!Topology.__init__.<locals>.targetg      ?Zpymongo_events_thread)intervalZmin_intervalr.   name)2_topology_idZ_pool_optionsevent_listeners
_listenersZenabled_for_server_publish_serverZenabled_for_topology_publish_tp_events_Topology__events_executorr!   putZpublish_topology_opened	_settingsr   Zget_topology_typeZget_server_descriptionsreplica_set_name_descriptionr   Unknown$publish_topology_description_changedZseedsZpublish_server_openedlistserver_descriptions_seed_addresses_opened	threadingLock_lockZcondition_class
_condition_servers_pid_max_cluster_timer   _session_poolr   ZPeriodicExecutorr   ZEVENTS_QUEUE_FREQUENCYweakrefrefcloseopen_srv_monitorZfqdnload_balancedr   )selftopology_settingsZpubZtopology_descriptionZ
initial_tdseedr.   executorr'   )r-   r(   __init__M   sj    



zTopology.__init__c             C   sh   | j dkrtj | _ n4tj | j krJtjd | j | jj  W dQ R X | j | j  W dQ R X dS )a  Start monitoring, or restart after a fork.

        No effect if called multiple times.

        .. warning:: Topology is shared among multiple threads and is protected
          by mutual exclusion. Using Topology from a process other than the one
          that initialized it will emit a warning and may result in deadlock. To
          prevent this from happening, MongoClient must be created after any
          forking.

        NzMongoClient opened before fork. Create MongoClient only after forking. See PyMongo's documentation for details: https://pymongo.readthedocs.io/en/stable/faq.html#is-pymongo-fork-safe)	rG   osgetpidwarningswarnrD   rI   reset_ensure_opened)rP   r'   r'   r(   rM      s    
zTopology.openNc                sH   |dkr j j}n|} j"  j|||} fdd|D S Q R X dS )aL  Return a list of Servers matching selector, or time out.

        :Parameters:
          - `selector`: function that takes a list of Servers and returns
            a subset of them.
          - `server_selection_timeout` (optional): maximum seconds to wait.
            If not provided, the default value common.SERVER_SELECTION_TIMEOUT
            is used.
          - `address`: optional server address to select.

        Calls self.open() if needed.

        Raises exc:`ServerSelectionTimeoutError` after
        `server_selection_timeout` if no matching servers are found.
        Nc                s   g | ]} j |jqS r'   )get_server_by_addressaddress).0sd)rP   r'   r(   
<listcomp>   s   z+Topology.select_servers.<locals>.<listcomp>)r9   server_selection_timeoutrD   _select_servers_loop)rP   selectorr`   r\   Zserver_timeoutr?   r'   )rP   r(   select_servers   s    


zTopology.select_serversc             C   s   t  }|| }| jj||| jjd}xv|s|dks:||krTtd| j||| jf | j  | j	  | j
jtj | jj  t  }| jj||| jjd}q&W | jj  |S )z7select_servers() guts. Hold the lock when calling this.)Zcustom_selectorr   z*%s, Timeout: %ss, Topology Description: %r)_timer;   Zapply_selectorr9   Zserver_selectorr   _error_messagedescriptionrZ   _request_check_allrE   waitr   ZMIN_HEARTBEAT_INTERVALZcheck_compatible)rP   rb   timeoutr\   nowend_timer?   r'   r'   r(   ra      s&    

zTopology._select_servers_loopc             C   s   t j| j|||S )zALike select_servers, but choose a random server if several match.)randomchoicerc   )rP   rb   r`   r\   r'   r'   r(   select_server   s    
zTopology.select_serverc             C   s   | j t||S )a  Return a Server for "address", reconnecting if necessary.

        If the server's type is not known, request an immediate check of all
        servers. Time out after "server_selection_timeout" if the server
        cannot be reached.

        :Parameters:
          - `address`: A (host, port) pair.
          - `server_selection_timeout` (optional): maximum seconds to wait.
            If not provided, the default value
            common.SERVER_SELECTION_TIMEOUT is used.

        Calls self.open() if needed.

        Raises exc:`ServerSelectionTimeoutError` after
        `server_selection_timeout` if no matching servers are found.
        )rn   r   )rP   r\   r`   r'   r'   r(   select_server_by_address   s    z!Topology.select_server_by_addressFc             C   s  | j }|j|j }t||r dS | js,| jo2||k}| jr`| r`| jj| jj	|||j| j
ff t| j || _ | j  | j|j | jr| r| jj| jj|| j | j
ff | jr|jtjkr| j jtkr| jj  |r| jj|j}|r|jj  | jj  dS )ziProcess a new ServerDescription on an opened topology.

        Hold the lock when calling this.
        N)r;   Z_server_descriptionsr\   _is_stale_server_descriptionr4   r5   r6   r8   r3   Z"publish_server_description_changedr1   r	   _update_servers_receive_cluster_time_no_lockcluster_timer=   rN   topology_typer   r<   r   rL   rF   getpoolrY   rE   
notify_all)rP   server_description
reset_pooltd_oldZsd_oldZsuppress_eventserverr'   r'   r(   _process_change  s8    


zTopology._process_changec          
   C   s6   | j & | jr(| jj|jr(| j|| W dQ R X dS )z=Process a new ServerDescription after a hello call completes.N)rD   rA   r;   
has_serverr\   r|   )rP   rx   ry   r'   r'   r(   	on_change>  s    	zTopology.on_changec             C   sD   | j }t| j || _ | j  | jr@| jj| jj|| j | jff dS )z_Process a new seedlist on an opened topology.
        Hold the lock when calling this.
        N)	r;   r
   rq   r5   r6   r8   r3   r=   r1   )rP   seedlistrz   r'   r'   r(   _process_srv_updateN  s    zTopology._process_srv_updatec          	   C   s&   | j  | jr| j| W dQ R X dS )z?Process a new list of nodes obtained from scanning SRV records.N)rD   rA   r   )rP   r   r'   r'   r(   on_srv_update]  s    zTopology.on_srv_updatec             C   s   | j j|S )aJ  Get a Server or None.

        Returns the current version of the server immediately, even if it's
        Unknown or absent from the topology. Only use this in unittests.
        In driver code, use select_server_by_address, since then you're
        assured a recent view of the server's type and wire protocol version.
        )rF   ru   )rP   r\   r'   r'   r(   r[   d  s    zTopology.get_server_by_addressc             C   s
   || j kS )N)rF   )rP   r\   r'   r'   r(   r}   n  s    zTopology.has_serverc          	   C   s:   | j * | jj}|tjkrdS t| j d jS Q R X dS )z!Return primary's address or None.Nr   )rD   r;   rt   r   ReplicaSetWithPrimaryr   _new_selectionr\   )rP   rt   r'   r'   r(   get_primaryq  s
    
zTopology.get_primaryc             C   sJ   | j : | jj}|tjtjfkr&t S tdd || j D S Q R X dS )z+Return set of replica set member addresses.c             S   s   g | ]
}|j qS r'   )r\   )r]   r^   r'   r'   r(   r_     s    z5Topology._get_replica_set_members.<locals>.<listcomp>N)rD   r;   rt   r   r   ReplicaSetNoPrimarysetr   )rP   rb   rt   r'   r'   r(   _get_replica_set_members{  s    
z!Topology._get_replica_set_membersc             C   s
   | j tS )z"Return set of secondary addresses.)r   r   )rP   r'   r'   r(   get_secondaries  s    zTopology.get_secondariesc             C   s
   | j tS )z Return set of arbiter addresses.)r   r   )rP   r'   r'   r(   get_arbiters  s    zTopology.get_arbitersc             C   s   | j S )z1Return a document, the highest seen $clusterTime.)rH   )rP   r'   r'   r(   max_cluster_time  s    zTopology.max_cluster_timec             C   s(   |r$| j  s|d | j d kr$|| _ d S )NZclusterTime)rH   )rP   rs   r'   r'   r(   rr     s
    z&Topology._receive_cluster_time_no_lockc          	   C   s    | j  | j| W d Q R X d S )N)rD   rr   )rP   rs   r'   r'   r(   receive_cluster_time  s    zTopology.receive_cluster_time   c          	   C   s*   | j  | j  | jj| W dQ R X dS )z=Wake all monitors, wait for at least one to check its server.N)rD   rg   rE   rh   )rP   Z	wait_timer'   r'   r(   request_check_all  s    zTopology.request_check_allc             C   sR   t |d|d}| j2 | jj|}|rD| jt||dd |j  W dQ R X dS )z@Clear our pool for a server, mark it Unknown, and check it soon.i{'  )codeerrmsg)errorTN)r   rD   rF   ru   r|   r   request_check)rP   r\   	error_msgr   r{   r'   r'   r(   handle_getlasterror  s    zTopology.handle_getlasterrorc             C   s   | j jtjkr| j jS | j jS )z~Return a list of all data-bearing servers.

        This includes any server that might be selected for an operation.
        )r;   rt   r   Singleknown_serversreadable_servers)rP   r'   r'   r(   data_bearing_servers  s    zTopology.data_bearing_serversc             C   sn   g }| j : x2| j D ]&}| j|j }|j||jjj f qW W d Q R X x|D ]\}}|jj|| qPW d S )N)	rD   r   rF   r\   appendrv   genZget_overallZremove_stale_sockets)rP   Zall_credentialsserversr^   r{   Z
generationr'   r'   r(   update_pool  s    zTopology.update_poolc             C   s   | j v x| jj D ]}|j  qW | jj | _x0| jj j D ]\}}|| jkr@|| j| _q@W | j	rr| j	j  d| _
W dQ R X | jr| jj| jj| jff | js| jr| jj  dS )z?Clear pools and terminate monitors. Topology reopens on demand.FN)rD   rF   valuesrL   r;   rY   r?   itemsrf   rN   rA   r5   r6   r8   r3   Zpublish_topology_closedr1   r4   r7   )rP   r{   r\   r^   r'   r'   r(   rL     s    

zTopology.closec             C   s   | j S )N)r;   )rP   r'   r'   r(   rf     s    zTopology.descriptionc          	   C   s   | j  | jj S Q R X dS )z"Pop all session ids from the pool.N)rD   rI   pop_all)rP   r'   r'   r(   pop_all_sessions  s    zTopology.pop_all_sessionsc             C   sp   | j j}|dkrl| j jtjkr:| j jsT| jt| jj	d n| j j
sT| jt| jj	d | j j}|dkrltd|S )zAInternal check for session support on non-load balanced clusters.Nz5Sessions are not supported by this MongoDB deployment)r;   logical_session_timeout_minutesrt   r   r   Zhas_known_serversra   r   r9   r`   r   r   r   )rP   session_timeoutr'   r'   r(   _check_session_support  s$    zTopology._check_session_supportc          	   C   s8   | j ( | jjs| j }ntd}| jj|S Q R X dS )z>Start or resume a server session, or raise ConfigurationError.infN)rD   r9   rO   r   floatrI   get_server_session)rP   r   r'   r'   r(   r     s
    
zTopology.get_server_sessionc          
   C   s:   |r*| j  | jj|| jj W d Q R X n| jj| d S )N)rD   rI   return_server_sessionr;   r   Zreturn_server_session_no_lock)rP   Zserver_sessionlockr'   r'   r(   r     s    zTopology.return_server_sessionc             C   s   t j| jS )zmA Selection object, initially including all known servers.

        Hold the lock when calling this.
        )r   Zfrom_topology_descriptionr;   )rP   r'   r'   r(   r     s    zTopology._new_selectionc             C   s   | j srd| _ | j  | js | jr*| jj  | jrF| jjt	krF| jj  | j
jrr| jt| jd td| jdd xt| jD ]}|j  q~W dS )z[Start monitors, or restart after a fork.

        Hold the lock when calling this.
        Tr         )okZ	serviceIdZmaxWireVersionN)rA   rq   r5   r4   r7   rM   rN   rf   rt   r   r9   rO   r|   r   r@   r   r1   r   rF   )rP   r{   r'   r'   r(   rZ     s    

zTopology._ensure_openedc             C   sp   | j j|}|d krdS |jj|j|jr.dS |jj}|j}d }|rft	|drft
|jtrf|jjd}t||S )NTdetailsZtopologyVersion)rF   ru   _poolZstale_generationsock_generation
service_idrf   topology_versionr   hasattr
isinstancer   dict _is_stale_error_topology_version)rP   r\   err_ctxr{   Zcur_tvr   error_tvr'   r'   r(   _is_stale_error<  s    zTopology._is_stale_errorc       	      C   s,  | j ||rd S | j| }|j}t|}|j}t|trB|jrBd S t|trPd S t|t	r|j
jdd}|tjk}| jjs| jt||d |s|jdkr|j| |j  nzt|tr| jjs| jt||d |j| |jj  n@t|tr(|jtjkr(| jjs| jt||d |j| d S )Nr   r   )r      )r   rF   r   typer   
issubclassr   completed_handshaker   r   r   ru   r   Z_SHUTDOWN_CODESr9   rO   r|   r   max_wire_versionrY   r   r   _monitorZcancel_checkr   r   Z_NOT_MASTER_CODES)	rP   r\   r   r{   r   exc_typer   Zerr_codeZis_shutting_downr'   r'   r(   _handle_errorQ  s<    



	





zTopology._handle_errorc          
   C   s"   | j  | j|| W dQ R X dS )zHandle an application error.

        May reset the server to Unknown, clear the pool, and request an
        immediate check depending on the error and the context.
        N)rD   r   )rP   r\   r   r'   r'   r(   handle_error  s    zTopology.handle_errorc             C   s    x| j j D ]}|j  qW dS )z3Wake all monitors. Hold the lock when calling this.N)rF   r   r   )rP   r{   r'   r'   r(   rg     s    zTopology._request_check_allc          	   C   s  x| j j j D ]\}}|| jkr| jj|| | j|| jd}d}| jrTtj	| j
}t|| j||| j| j|d}|| j|< |j  q| j| jj}|| j| _||jkr| j| jj|j qW x:t| jj D ](\}}| j j|s|j  | jj| qW dS )zrSync our Servers from TopologyDescription.server_descriptions.

        Hold the lock while calling this.
        )rx   Ztopologyrv   rQ   N)rx   rv   monitorZtopology_idZ	listenersevents)r;   r?   r   rF   r9   Zmonitor_class_create_pool_for_monitorr4   rJ   rK   r6   r   _create_pool_for_serverr1   r3   rM   rf   is_writablerv   Zupdate_is_writabler>   r}   rL   pop)rP   r\   r^   r   r-   r{   Zwas_writabler'   r'   r(   rq     s8    




zTopology._update_serversc             C   s   | j j|| j jS )N)r9   
pool_classpool_options)rP   r\   r'   r'   r(   r     s    z Topology._create_pool_for_serverc          
   C   sB   | j j}t|j|j|j|j|j|j|j|j	d}| j j
||ddS )N)connect_timeoutsocket_timeoutssl_contextssl_match_hostnamer2   appnamedriver
server_apiF)Z	handshake)r9   r   r   r   r   r   r2   r   r   r   r   )rP   r\   optionsZmonitor_pool_optionsr'   r'   r(   r     s    

z!Topology._create_pool_for_monitorc                s$  | j jtjtjfk}|rd}n| j jtjkr2d}nd}| j jrf|tkrX|rNdS d| S nd||f S nt| j j	 }t| j j	 j
 }|s|rd|| jjf S d| S |d	 j t fd
d|dd D }|r dkrd| S |ot|j| j rd| S t S djdd |D S dS )zeFormat an error message if server selection fails.

        Hold the lock when calling this.
        zreplica set membersZmongosesr   zNo primary available for writeszNo %s available for writeszNo %s match selector "%s"z)No %s available for replica set name "%s"zNo %s availabler   c             3   s   | ]}|j  kV  qd S )N)r   )r]   r{   )r   r'   r(   	<genexpr>  s    z*Topology._error_message.<locals>.<genexpr>r   NzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,c             s   s   | ]}|j rt|j V  qd S )N)r   str)r]   r{   r'   r'   r(   r     s    )r;   rt   r   r   r   ZShardedr   r   r>   r?   r   r9   r:   r   allr   intersectionr@   r   join)rP   rb   Zis_replica_setZserver_plural	addressesr   Zsamer'   )r   r(   re     s@    


zTopology._error_messagec             C   s"   d}| j sd}d| jj|| jf S )N zCLOSED z	<%s %s%r>)rA   	__class____name__r;   )rP   msgr'   r'   r(   __repr__  s    zTopology.__repr__)NN)NN)N)F)F)r   )-r   
__module____qualname____doc__rT   rM   rc   ra   rn   ro   r|   r~   r   r   r[   r}   r   r   r   r   r   rr   r   r   r   r   r   rL   propertyrf   r   r   r   r   r   rZ   r   r   r   rg   rq   r   r   re   r   r'   r'   r'   r(   r*   K   sV   G 
! 


/




	
9	)9r*   c               @   s   e Zd ZdZdd ZdS )_ErrorContextz.An error with context for SDAM error handling.c             C   s"   || _ || _|| _|| _|| _d S )N)r   r   r   r   r   )rP   r   r   r   r   r   r'   r'   r(   rT     s
    z_ErrorContext.__init__N)r   r   r   r   rT   r'   r'   r'   r(   r     s   r   c             C   s8   | dks|dkrdS | d |d kr(dS | d |d kS )z9Return True if the error's topologyVersion is <= current.NF	processIdcounterr'   )
current_tvr   r'   r'   r(   r   #  s
    r   c             C   sF   | j |j  }}|dks|dkr"dS |d |d kr6dS |d |d kS )z4Return True if the new topologyVersion is < current.NFr   r   )r   )Z
current_sdZnew_sdr   Znew_tvr'   r'   r(   rp   ,  s    rp   )9r   rU   rl   rB   rW   rJ   Zbson.py3compatr   r   queuer!   Zpymongor   r   r   Zpymongo.ismasterr   Zpymongo.poolr   Zpymongo.topology_descriptionr	   r
   r   r   r   Zpymongo.errorsr   r   r   r   r   r   r   Zpymongo.monitorr   Zpymongo.monotonicr   rd   Zpymongo.serverr   Zpymongo.server_descriptionr   Zpymongo.server_selectorsr   r   r   r   r   r   Zpymongo.client_sessionr   r)   objectr*   r   r   rp   r'   r'   r'   r(   <module>   s<   
$      R	