Last active
December 11, 2020 08:14
-
-
Save Vyryn/44c40ff7b3c925d334f920dba00a5bc8 to your computer and use it in GitHub Desktop.
Meros cryptocurrency RPC socket interpretation in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # By Vyryn, licensed under GNU GPL. | |
| # Some basic Meros testnet RPC communication to make it a little more convinient to monitor network status and | |
| # test transaction success timing. A few basic async functions and what amounts to terrible examples of their use. | |
| import json | |
| import asyncio | |
| URL = "10.0.0.109" | |
| PORT = 5133 | |
| loop = asyncio.get_event_loop() | |
| async def read_socket_response(reader: asyncio.StreamReader) -> str: | |
| resp = '' | |
| brak = 0 | |
| while True: | |
| resp += str(await reader.read(1), "UTF-8") | |
| if resp[-1] == resp[0]: | |
| brak += 1 | |
| elif resp[0] == '{' and resp[-1] == '}': | |
| brak -= 1 | |
| elif resp[0] == '[' and resp[-1] == ']': | |
| brak -= 1 | |
| if not brak: | |
| break | |
| return resp | |
| async def rpc(method, params=[]): | |
| payload = str(json.dumps({"jsonrpc": "2.0", "id": 0, "params": params, "method": method})) | |
| print('Connecting...') | |
| reader, writer = await asyncio.open_connection(host=URL, port=PORT, loop=loop) | |
| print('Connected.') | |
| writer.write(bytes(payload, "UTF-8")) | |
| print('Sent.') | |
| print(await read_socket_response(reader)) | |
| loop.run_until_complete(rpc('merit_getUnlockedMerit')) | |
| loop.run_until_complete(rpc('transactions_getBalance', | |
| ['mr1qz7959tux0jepjj4ksqc5lzny2mnwyfvsufdvwkgkvzrrcr9u2r050csy4z'])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment