kopia lustrzana https://github.com/bugout-dev/moonstream
				
				
				
			now, logs querries to node is done with batches
							rodzic
							
								
									36233447c5
								
							
						
					
					
						commit
						e294cfa926
					
				| 
						 | 
				
			
			@ -220,13 +220,13 @@ def get_nft_transfers(
 | 
			
		|||
 | 
			
		||||
    if to_block is not None:
 | 
			
		||||
        filter_params["toBlock"] = to_block
 | 
			
		||||
 | 
			
		||||
    if contract_address is not None:
 | 
			
		||||
        filter_params["address"] = w3.toChecksumAddress(contract_address)
 | 
			
		||||
 | 
			
		||||
    logs = w3.eth.get_logs(filter_params)
 | 
			
		||||
 | 
			
		||||
    nft_transfers: List[NFTTransfer] = []
 | 
			
		||||
    for log in tqdm(logs, desc="Crawling NFT transfers from Ethereum node"):
 | 
			
		||||
    for log in tqdm(logs, desc=f"Processing logs for blocks {from_block}-{to_block}"):
 | 
			
		||||
        nft_transfer = decode_nft_transfer_data(w3, log)
 | 
			
		||||
        if nft_transfer is not None:
 | 
			
		||||
            kwargs = {
 | 
			
		||||
| 
						 | 
				
			
			@ -385,8 +385,8 @@ def add_labels(
 | 
			
		|||
    db_session: Session,
 | 
			
		||||
    from_block: Optional[int] = None,
 | 
			
		||||
    to_block: Optional[int] = None,
 | 
			
		||||
    address: Optional[str] = None,
 | 
			
		||||
    batch_size: int = 100,
 | 
			
		||||
    contract_address: Optional[str] = None,
 | 
			
		||||
    batch_size: int = 50,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    Crawls blocks between from_block and to_block checking for NFT mints and transfers.
 | 
			
		||||
| 
						 | 
				
			
			@ -435,17 +435,21 @@ def add_labels(
 | 
			
		|||
    assert batch_size > 0, f"Batch size must be positive (received {batch_size})"
 | 
			
		||||
 | 
			
		||||
    start, end = get_block_bounds(w3, from_block, to_block)
 | 
			
		||||
    transfers = get_nft_transfers(w3, start, end, address)
 | 
			
		||||
 | 
			
		||||
    batch_start = 0
 | 
			
		||||
    batch_end = batch_size
 | 
			
		||||
    batch_start = start
 | 
			
		||||
    batch_end = start + batch_size - 1
 | 
			
		||||
 | 
			
		||||
    address_ids: Dict[str, int] = {}
 | 
			
		||||
 | 
			
		||||
    pbar = tqdm(total=len(transfers))
 | 
			
		||||
    pbar.set_description("Processing NFT transfer")
 | 
			
		||||
    while batch_start < batch_end:
 | 
			
		||||
        job = transfers[batch_start:batch_end]
 | 
			
		||||
    pbar = tqdm(total=(end - start + 1))
 | 
			
		||||
    pbar.set_description("Processing blocks")
 | 
			
		||||
    while batch_start <= batch_end:
 | 
			
		||||
        job = get_nft_transfers(
 | 
			
		||||
            w3,
 | 
			
		||||
            from_block=batch_start,
 | 
			
		||||
            to_block=batch_end,
 | 
			
		||||
            contract_address=contract_address,
 | 
			
		||||
        )
 | 
			
		||||
        contract_addresses = {transfer.contract_address for transfer in job}
 | 
			
		||||
        updated_address_ids = ensure_addresses(db_session, contract_addresses)
 | 
			
		||||
        for address, address_id in updated_address_ids.items():
 | 
			
		||||
| 
						 | 
				
			
			@ -473,9 +477,9 @@ def add_labels(
 | 
			
		|||
        label_transfers(db_session, job, updated_address_ids)
 | 
			
		||||
 | 
			
		||||
        # Update batch at end of iteration
 | 
			
		||||
        pbar.update(batch_end - batch_start)
 | 
			
		||||
        batch_start = batch_end
 | 
			
		||||
        batch_end = min(batch_end + batch_size, len(transfers))
 | 
			
		||||
        pbar.update(batch_end - batch_start + 1)
 | 
			
		||||
        batch_start = batch_end + 1
 | 
			
		||||
        batch_end = min(batch_end + batch_size, end)
 | 
			
		||||
    pbar.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Ładowanie…
	
		Reference in New Issue